In [1]:
# LOAD METHOD 1
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

In [69]:
conf = SparkConf() \
        .setAppName('analytics') \
        .setMaster("local[*]")

sc = SparkContext.getOrCreate(conf)

# conf = SparkConf().setAppName('analytics')
# sc = SparkContext(conf=conf)

In [70]:
## Column with dots. Upgrade pyspark > 2.0.0

## Loading data as DF
context_df = sqlContext.read.csv('data/test.csv', header = True)

## Loading data as RDD
# rd = sc.textFile('data/test.csv')

In [1]:
# LOAD METHOD 2
from pyspark.sql import SparkSession
import pandas as pd

In [2]:
# To execute operations in cluster
spark = SparkSession \
        .builder \
        .appName('analytics') \
        .master("local[*]") \
        .getOrCreate();

In [4]:
session_df = spark.read \
        .option("header", "true") \
        .option("delimiter", ",") \
        .option("inferSchema", "true") \
        .csv('data/test.csv')

In [16]:
# import redis
# db = redis.StrictRedis(host = 'localhost', port = 6379, db = 1)

In [5]:
from walrus import *
db = Database(host = 'localhost', port = 6379, db = 1)

In [6]:
key_joiner = ':'

recent_key_meta = 'recent'
count_key_meta = 'count'
color_key_meta = 'color'

In [163]:
required_columns = ['id', 'brand', 'colors', 'dateAdded', 'dateUpdated']
min_df = session_df.select(required_columns)

# exclude_columns = [col for col in session_df.columns if col not in required_columns]
# session_df.drop(*exclude_columns)

# # REMOVE DUPLICATE RECORDS AND IF A COLUMN HAS NULL VALUE
# clean_df = session_df.dropna() \
#                     .dropDuplicates() \
#                     .select(required_columns)

clean_df = min_df.dropna()

In [164]:
# API II - /getBrandsCount
# clean_df.withColumn('date_added', clean_df.dateAdded.cast('date')).show()
from pyspark.sql.functions import count, unix_timestamp

count_df = clean_df.withColumn('date_added', unix_timestamp(clean_df.dateAdded.cast('date'))) \
        .groupBy('date_added', 'brand') \
        .agg(count('brand')) \
        .orderBy('date_added', 'count(brand)', ascending=False)

In [165]:
# available_dates = count_df.select('date_added').distinct().collect()

# from pyspark.sql.functions import collect_list, create_map
# df_converted = count_df.groupBy('date_added') \
#                         .agg(collect_list(create_map(col('brand'))).alias('mapped')) \
#                         .collect()

count_dict = count_df.toPandas() \
        .groupby('date_added') \
        .apply(lambda x: dict(zip(x['brand'], x['count(brand)']))) \
        .to_dict()

for epoch_date, data in count_dict.iteritems():
    count_key = count_key_meta + key_joiner + str(epoch_date)

    if db.exists(count_key):
        count_hash = db.get_key(count_key)
    else:
        count_hash = db.Hash(count_key)
    
    count_hash.update(data)

In [166]:
# API III - /getItemsbyColor
# clean_df.groupBy(clean_df.colors)
from pyspark.sql import Window
from pyspark.sql.functions import row_number, col

color_window = Window.partitionBy(clean_df.colors) \
                    .orderBy(clean_df.dateAdded.desc(), clean_df.dateUpdated.desc()) \

color_df = clean_df.select('*', row_number() \
                            .over(color_window) \
                            .alias('row_number')) \
                    .filter(col('row_number') <= 10) \
                    .drop('row_number')

In [127]:
color_dict = color_df.toPandas() \
        .groupby(['colors']) \
        .apply(lambda x: x.to_dict('records'))

for color, data in color_dict.iteritems():
    split_colors = color.split(',')

    for split_color in split_colors:
        color_key = (color_key_meta + key_joiner + split_color).lower()

        if db.exists(color_key):
            color_hash = db.get_key(color_key)
        else:
            color_hash = db.List(color_key)

        color_hash.extend(data)

In [167]:
# API I - /getRecentItem

from pyspark.sql.functions import rank

date_window = Window.partitionBy(clean_df.dateAdded) \
                    .orderBy(clean_df.dateAdded.desc(), clean_df.dateUpdated.desc())

# clean_df.withColumn('rank', rank().over(date_window)) \
#         .withColumn('row_number', row_number().over(date_window)) \
#         .filter((col('rank') == 1) & (col('row_number') == 1)) \
#         .show()

recent_df = clean_df.withColumn('date_added', unix_timestamp(clean_df.dateAdded.cast('date'))) \
        .withColumn('row_number', row_number().over(date_window)) \
        .filter(col('row_number') == 1) \
        .drop('row_number')

In [168]:
recent_dict = recent_df.toPandas().to_dict('records')

for data in recent_dict:
    recent_key = recent_key_meta + key_joiner + str(data['date_added'])

    if db.exists(recent_key):
        recent_hash = db.get_key(recent_key)
    else:
        recent_hash = db.Hash(recent_key)
    
    recent_hash.update(data)

In [169]:
## NOTES

# session_df.count()
# len(session_df.columns)

# session_df.columns
# session_df.printSchema()
# session_df.first().dateAdded

# session_df.head(5)
# session_df.show(1)
# session_df.describe('prices_amountMin').show()
# session_df.select('brand').show()

# session_df.select('brand', 'colors').dropDuplicates().show()
# session_df.where(session_df.asins != '').count()
# session_df.limit(2).toPandas()

## Only spark >= 2.1.0 can load empty strings as null
# session_df.dropna(how='any', thresh=None, subset=None).show()
# session_df.dropna().show()
# session_df.na.drop().count()


# from datetime import datetime
# from pyspark.sql.functions import unix_timestamp

# to_date('2017-02-03T22:06:24Z')
# to_timestamp(min_df.dateAdded, 'yyyy-MM-dd HH:mm:ss')

# from pyspark.sql.types import DateType
# min_df.withColumn('date', min_df['dateAdded'].cast('timestamp')).show()

# from pyspark.sql.functions import to_date, to_timestamp
# min_df.select(to_timestamp(min_df.dateAdded), 'id').show()

# from pyspark.sql import Row
# from pyspark.sql.functions import count, col, rank, dense_rank, max
# min_df.orderBy(min_df.dateAdded.desc()).show()
# min_df.groupBy('dateAdded').count().show()
# session_df.agg(max('dateAdded')).show()

# from pyspark.sql import Window
# w = Window.partitionBy('dateAdded')
# session_df.withColumn('maxDate', max('dateAdded').over(w)) \
#     .where(col('dateAdded') == col('maxDate')) \
#     .drop('maxDate') \
#     .count()

# import datetime
# import numpy as np

# w = Window.partitionBy(clean_df['dateAdded']).orderBy(clean_df.dateAdded.desc(), clean_df.dateUpdated.desc())

# clean_df.select('*', dense_rank().over(w).alias('rank')) \
#   .filter(col('rank') <= 2) \
#   .show(5)

# from datetime import datetime
# datetime.fromtimestamp(float('1430677800'))