# INTRODUCTION
In this notebook we will be using the following packages:
  * [pyspark](https://spark.apache.org/docs/latest/api/python/pyspark.html)
  * [pandas](https://pandas.pydata.org/pandas-docs/stable/index.html)
  * [pyarrow](https://arrow.apache.org/docs/python/index.html)

We will be using the following data:
  * dressipi_recsys2022

The dataset is split in 4 csv files:
   * candidate_items : contains the candidate items for each user
   * item_features : contains the features for each item (items can have multiple features)
   * train_purchases : contains the purchases for each user
   * train_sessions : contains the sessions for each user



In [1]:
# Uncomment to install the required packages
# %pip install pyspark
# %pip install pyarrow


## Loading the dataset

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import numpy as np
import os

# start spark session
os.environ['PYSPARK_SUBMIT_ARGS'] ="--conf spark.driver.memory=2g  pyspark-shell"
# spark = SparkSession.builder.appName("Python Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()
spark = SparkSession \
    .builder \
    .master("local[2]") \
    .appName("AppName") \
    .config('spark.ui.port', '4050')\
    .getOrCreate()

candidate_items, item_features, train_purchases, train_sessions = [None] * 4
datasets = [candidate_items, item_features, train_purchases, train_sessions]

#load all datasets
def load_candidate_items():
    global candidate_items
    candidate_items = spark.read.csv("dressipi_recsys2022/candidate_items.csv", header=True)
    candidate_items = candidate_items.withColumn("item_id", candidate_items["item_id"].cast("int"))

def load_item_features():
    global item_features
    item_features = spark.read.csv("dressipi_recsys2022/item_features.csv", header=True)
    item_features = item_features.withColumn("item_id", item_features["item_id"].cast("int"))
    item_features = item_features.withColumn("feature_category_id", item_features["feature_category_id"].cast("int"))
    item_features = item_features.withColumn("feature_value_id", item_features["feature_value_id"].cast("int"))

def load_train_purchases():
    global train_purchases
    train_purchases = spark.read.csv("dressipi_recsys2022/train_purchases.csv", header=True)
    train_purchases = train_purchases.withColumn("session_id", train_purchases["session_id"].cast("int"))
    train_purchases = train_purchases.withColumn("item_id", train_purchases["item_id"].cast("int"))
    train_purchases = train_purchases.withColumn("date", train_purchases["date"].cast("timestamp"))

def load_train_sessions():
    global train_sessions
    train_sessions = spark.read.csv("dressipi_recsys2022/train_sessions.csv", header=True)
    train_sessions = train_sessions.withColumn("session_id", train_sessions["session_id"].cast("int"))
    train_sessions = train_sessions.withColumn("item_id", train_sessions["item_id"].cast("int"))
    train_sessions = train_sessions.withColumn("date", train_sessions["date"].cast("timestamp"))

def load_datasets():
    global datasets
    load_candidate_items()
    load_item_features()
    load_train_purchases()
    load_train_sessions()
    datasets = [candidate_items, item_features, train_purchases, train_sessions]

load_datasets()


## Quick look at the data

### train_sessions

In [None]:
train_sessions.printSchema()
train_sessions.show(5)

### train_purchases

In [None]:
train_purchases.show(5)

### item_features

In [None]:
item_features.show(5)

### candidate_items

In [None]:
candidate_items.show(5)

# Part 1 : Pipeline

## Checking for null values

In [None]:
# Are there missing values (na or null) in any of those dataframes ? print the number of missing values
print("candidate_items:", candidate_items.filter(candidate_items["item_id"].isNull()).count())
print("item_features:", item_features.filter(item_features["item_id"].isNull()).count())
print("train_purchases:", train_purchases.filter(train_purchases["item_id"].isNull()).count())
print("train_sessions:", train_sessions.filter(train_sessions["item_id"].isNull()).count())

## Feature engineering

1) Month

In [None]:
month = train_sessions.rdd.map(lambda x: (x["session_id"], int(x["date"].strftime("%m")))).reduceByKey(min)
print(month.take(5))

2) Day of month

In [None]:
day_of_month = train_sessions.rdd.map(lambda x: (x["session_id"], int(x["date"].strftime("%d")))).reduceByKey(min)
print(day_of_month.take(5))

3) Weekday

In [None]:
weekday = train_sessions.rdd.map(lambda x: (x["session_id"], int(x["date"].strftime("%w")))).reduceByKey(min)
print(weekday.take(5))

4) Hour period

In [None]:
hour_period = train_sessions.rdd.map(lambda x: (x["session_id"], int(x["date"].strftime("%H")))).reduceByKey(min)
print(hour_period.take(5))

5) Season (Meteorological)

In [None]:
def get_season(month):
    if month == 12 or month <= 2: return 0
    elif 2 < month <= 5: return 1
    elif 5 < month <= 8: return 2
    elif 8 < month <= 11: return 3

season = train_sessions.rdd.map(lambda x: (x["session_id"], get_season(int(x["date"].strftime("%m"))))).reduceByKey(min)
print(season.take(5))

6) Average time between consecutive item views

In [None]:
def get_average_time(dates):
    import datetime
    dates = sorted(list(dates))
    avgs = [dates[i+1] - dates[i] for i in range(len(dates)-1)]
    return (sum(avgs, datetime.timedelta())/len(avgs)).total_seconds() if len(avgs) > 0 else 0

average_time = train_sessions.rdd.map(lambda x: (x["session_id"], x["date"])).groupByKey().mapValues(get_average_time)

average_time.take(5)

7) Number of distinct items

In [None]:
distinct_nb = train_sessions.rdd.map(lambda x: (x["session_id"], x["item_id"])).groupByKey().mapValues(lambda x: len(set(x)))
distinct_nb.take(5)

8) Number of repetitive items

In [None]:
repetitive_nb = train_sessions.rdd.map(lambda x: (x["session_id"], x["item_id"])).groupByKey().mapValues(lambda x: len(list(x)) - len(set(x)))
repetitive_nb.take(5)

9) Same category

In [None]:
item_features_rdd = item_features.rdd.map(lambda x: (x["item_id"], (x["feature_category_id"], x["feature_value_id"]))).groupByKey().mapValues(lambda x: [(a,b) for a, b in x])
item_features_rdd.take(5)

In [None]:
def get_same_category(x):
    dico = dict()
    for item in x:
        for cat in item:
            if cat in dico:
                dico[cat] += 1
            else:
                dico[cat] = 0

    res = 0
    for val in dico.values():
        if val > 0:
            res += 1

    return res

same_category = item_features_rdd.join(train_sessions.rdd.map(lambda x: (x["item_id"], x["session_id"]))).map(lambda x: (x[1][1], x[1][0])).groupByKey().mapValues(get_same_category)
same_category.take(5)

10) Different category

In [None]:
def get_different_category(x):
    dico = dict()
    for item in x:
        for cat in item:
            if cat in dico:
                dico[cat] += 1
            else:
                dico[cat] = 0

    res = 0
    for val in dico.values():
        if val == 0:
            res += 1

    return res

diff_category = item_features_rdd.join(train_sessions.rdd.map(lambda x: (x["item_id"], x["session_id"]))).map(lambda x: (x[1][1], x[1][0])).groupByKey().mapValues(get_different_category)
diff_category.take(5)

11) Last item

In [None]:
def get_last_item(items):
    return max(list(items), key=lambda i: i[1])[0]

last_item = train_sessions.rdd.map(lambda x: (x["session_id"], (x["item_id"], x["date"]))).groupByKey().mapValues(lambda x: max(x, key=lambda i: i[1])[0])
last_item.take(5)

BIG RDD

In [None]:
session_item_id = train_purchases.rdd.map(lambda x: (x["session_id"], x["item_id"]))
features = [month, day_of_month, weekday, hour_period, season, average_time, distinct_nb, repetitive_nb]
features_name = ["month", "day_of_month", "weekday", "hour_period", "season", "average_time", "distinct_nb", "repetitive_nb"]

def get_BIG_RDD():
    temp = session_item_id
    for feature in features:
        temp = temp.join(feature).mapValues(lambda x: tuple(list(x[0])+[x[1]]) if isinstance(x[0], tuple) else x)
    return temp

BIG_RDD = get_BIG_RDD().cache()

In [None]:
BIG_RDD.take(1)

# Part 2 : Feature selection


## Ranking algorithm

In [None]:
def pearson_reduce(a, b):
    n = a[0] + b[0]
    x = a[1] + b[1]
    y = a[2] + b[2]
    x2 = a[3] + b[3]
    y2 = a[4] + b[4]
    xy = a[5] + b[5]

    return n, x, y, x2, y2, xy

def calculate_pearson(a):
    import math

    n = a[0]
    x = a[1]
    y = a[2]
    x2 = a[3]
    y2 = a[4]
    xy = a[5]

    return (n*xy - x * y) / (math.sqrt((n*x2 - (x**2)) * (n*y2 - (y**2))))

def compute_pearson(rdd):
    temp = rdd.flatMap(lambda x: [(i, (x[1][i+1], x[1][0])) for i in range(len(x[1])-1)]).\
        mapValues(lambda a: (1, a[0], a[1], a[0]**2, a[1]**2, a[0]*a[1])).\
        reduceByKey(pearson_reduce).\
        mapValues(calculate_pearson).collect()
    return temp

In [None]:
features_score = compute_pearson(BIG_RDD)
features_score.sort(key=lambda x: abs(x[1]), reverse=True)

print("Features ranking:")
for feat in features_score:
    print(f"\t- {features_name[feat[0]]}: {feat[1]}")

# Part 3 : Model

In [24]:
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
rdd.filter(lambda x: x % 2 == 0).collect()

[2, 4]