# Test dataset pipeline

In this notebook, a pipeline similar to Step 1 will be coded for the test dataset.

Each operation are the same, except operations that require the purchase date, for which the last item visit date will be considered.

Clusters used for item feature dimensionality reduction are imported from the one's computed in the first notebook.

In [1]:
import os 
import warnings
warnings.filterwarnings('ignore')
import pandas as pd
import numpy as np

# launch this cell if you have issues on windows with py4j (think about updating your PATH)
import sys
os.environ['PYSPARK_DRIVER_PYTHON_OPTS']= "notebook"
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['PYSPARK_PYTHON'] = sys.executable

# starts a spark session from notebook

os.environ['PYSPARK_SUBMIT_ARGS'] ="--conf spark.driver.memory=4g  pyspark-shell"
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("load_explore") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/28 08:22:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/05/28 08:22:19 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/05/28 08:22:19 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
22/05/28 08:22:19 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
22/05/28 08:22:19 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.


In [2]:
# loads relevant datas in DataFrames
test_sessions = spark.read.load('../Data/test_leaderboard_sessions.csv', 
                          format='com.databricks.spark.csv', 
                          header='true', 
                          inferSchema='true')

item_features = spark.read.load('../Data/item_features.csv', 
                          format='com.databricks.spark.csv', 
                          header='true', 
                          inferSchema='true')

                                                                                

In [3]:
import datetime

def parse_datetime(timestamp):
    try:
        return datetime.datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S.%f')
    except ValueError:
        return datetime.datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S')    

date_parsed_sessions = (test_sessions.rdd
                        .map(lambda x: (x.session_id, parse_datetime(x.date)))  # Maps rows to (key=session_id, values=(parsed_date)) tuples
                        .cache())

# Reduces by key from the MAX monoid
max_date_sessions = (
    date_parsed_sessions
    .reduceByKey(max)
)

# Reduces by key from the MIN monoid
min_date_sessions = (
    date_parsed_sessions
    .reduceByKey(min)
)

# Computes delta time in seconds
time_sessions_in_seconds = (
    max_date_sessions
    .join(min_date_sessions)  # Joins the max dates with the min dates from the session_id (on pair per session)
    .mapValues(lambda x: (x[0] - x[1]).seconds)  # Computes delta time for each session 
)

def get_season(date_time):
    '''Converts the date_time into a season
    
    :returns: an integer
        0 -> Winter
        1 -> Spring
        2 -> Summer
        3 -> Autumn
    '''    
    season = (date_time.month - 1) // 3
    season += (date_time.month == 3)&(date_time.day>=20)
    season += (date_time.month == 6)&(date_time.day>=21)
    season += (date_time.month == 9)&(date_time.day>=23)
    season -= 3*int(((date_time.month == 12)&(date_time.day>=21)))
    return season

def get_day_period(date_time):
    '''Converts the date_time into the day of the week.
    
    0 -> Morning (from 6am to 12am)
    1 -> Afternoon (from 12am to 6pm)
    2 -> Evening (from 6pm to 12pm)
    3 -> Night (from 12pm to 6am)
    '''
    return date_time.hour // 6

# Assigns a season for each session
season_per_session = (
    min_date_sessions
    .mapValues(get_season)
)

# Assigns a day period (morning, afternoon, evening, night...) for each session
day_period_per_session = (
    min_date_sessions
    .mapValues(get_day_period)
)

# Assigns a month for each session
month_per_session = (
    min_date_sessions
    .mapValues(lambda x: x.month - 1)
)

# Assigns a year for each session
year_per_session = (
    min_date_sessions
    .mapValues(lambda x: x.year - 2020)
)

In [4]:
# As we don't have access to the purchase date on the test dataset
# we will instead take the last item visit date of each session...
session_purchase_date = max_date_sessions

def compute_time_per_item(session_info):
    '''Computes the time spent on each item (in seconds)
    
    :param session_info: The information of the session ([(item_id, date)...], purchase_date)
    :returns: Time per item information [(item_id, visit_time)...]
    '''
    # Unpacks the info
    visited_items, purchase_date = session_info
    item_time_list = []
    
    for idx in range(len(visited_items)):
        item_id, item_visit_date = visited_items[idx]
        
        # Last item, need to check the purchase date
        if idx == len(visited_items) - 1:
            item_time_list.append((item_id, (purchase_date - item_visit_date).seconds))
        # Not the last item, checks the next item 
        else:
            _, next_item_visit_date = visited_items[idx + 1]
            item_time_list.append((item_id, (next_item_visit_date - item_visit_date).seconds))
    
    return item_time_list


# Here, we group the tuples by their keys (session_id) using Map Reduce
items_time_per_session = (
    test_sessions.rdd
    .map(lambda x: (x.session_id, (x.item_id, parse_datetime(x.date))))
    .mapValues(lambda x: [(x[0], x[1])])  # Sets all the values inside a list in order to easily reduce and join the sessions
    .reduceByKey(lambda x, y: x + y)  # Joins all the sessions togheter by reducing and joining the lists
    .join(session_purchase_date)  # Each tuple has now the shape (session_id, ([(item_id, date)...], purchase_date))
    .mapValues(lambda x: (sorted(x[0], key=lambda t:t[1]), x[1]))  # For each session, sorts the (item_id, date) tuples by date 
    .mapValues(compute_time_per_item)
)

# The item on which the most time was spent
most_spent_time = (
    items_time_per_session
    .mapValues(lambda x: sorted(x, reverse=True, key=lambda t: t[1]))  # Sorts the item visits by time
    .mapValues(lambda x: x[0])  # Gets the item with the most time spent on
)

# Session mean and std time per item
mean_std_time_session = (
    items_time_per_session
    .mapValues(lambda x: np.array(x, dtype=np.float32)[:, 1])  # Gets a numpy array for each time and takes only the visit time (drops the item id)
    .mapValues(lambda x: (x.mean(), x.std()))
)

# Takes the number of visits per item in each session
item_visit_counts = (
    test_sessions.rdd
    .map(lambda x: (x.session_id, x.item_id))  # Maps the rows to a (key=session_id, values=item_id) tuple
    .mapValues(lambda x: [x])  # Puts the items inside a list, so they can be reduced easily.
    .reduceByKey(lambda x, y: x + y)  # Joins the lists togheter, grouping the tuples by keys
    .mapValues(lambda x: np.unique(x, return_counts=True))  # Transforms values into two arrays: an item_id array and an occurence_array
    .mapValues(lambda x: np.vstack((x[0], x[1])))
)

def get_revisited_items(count_array):
    '''Gets the number of items revisited at least once'''
    revisited_indices = count_array[1, :] > 1
    return np.count_nonzero(revisited_indices)

def get_item_revisits_info(count_array):
    '''Returns the item_id of the item that
    has been revisited the most as well as the 
    number of times it has been revisited
    
    If no item was revisited return -1, -1, 0
    '''
    number_of_revisits = get_revisited_items(count_array)
    if number_of_revisits == 0:
        return (-1, -1, number_of_revisits)
    
    most_revisited_item_idx = np.argmax(count_array[1, :])
    most_revisits = count_array[1, :].max()
    
    return (count_array[0, most_revisited_item_idx], most_revisits, number_of_revisits)
    

# Returns the most visited item ID, the number of time it has been revisited and the number of items visited at least once
item_revisit_info = (
    item_visit_counts
    .mapValues(get_item_revisits_info) 
)

In [5]:
# Importing all the computed clusters
import pickle

with open('../Data/clusters/feature_category_clusters.np', 'rb') as file:
    feature_category_clusters = pickle.load(file)
    
with open('../Data/clusters/feature_values_clusters.np', 'rb') as file:
    feature_values_clusters = pickle.load(file)
        
with open('../Data/item_dict.pd', 'rb') as file:
    item_cluster_dict = pickle.load(file)
    
with open('../Data/most_visited_dict.pd', 'rb') as file:
    most_visited_dict = pickle.load(file)
    
with open('../Data/most_time_dict.pd', 'rb') as file:
    most_time_dict = pickle.load(file)


In [6]:
# Feature that contain item ID are clustered to reduce dimensionality

def map_cluster_revisited(x):
    if x[0] != -1:
        return item_cluster_dict[x[0]], x[1], x[2]
    return x

most_spent_time_clustered = most_spent_time.mapValues(lambda x: (item_cluster_dict[x[0]], x[1]))  # Encodes the item id in its cluster
revisited_clustered = item_revisit_info.mapValues(map_cluster_revisited)  # Encodes the item id in its cluster

In [7]:
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()

# Mapstep, to assign clusters
def mapstep(x,br_clusters): 
    """
    This function returns the closest cluster of the current row x
    x : current row
    broadcast_clusters : current value of the clusters
    """
    M = br_clusters.value.shape[0]
    d=np.zeros(M) # distance of the current row with each cluster
    for m in range(M):
        d[m]=sum(abs(np.subtract(x,br_clusters.value[m]))) # compute distance between cluster m and row x
    return np.argmin(d)

# Computing the clustering of session by item category occurence

# Counts the number of unique category IDs
unique_categories_nb = len(
    item_features.rdd
    .map(lambda x: set([x.feature_category_id]))  # Maps the rows to a set containing the feature category
    .reduce(lambda x, y: x.union(y))  # Reduces by joining the sets, keeping only the unique categories values
)

def initialize_vector(category_nb, max_categories=unique_categories_nb):
    '''
    Returns a vector of categories, where each element is 0 except the one at the category's index.
    '''
    vector = np.zeros(max_categories, dtype=np.float32)
    vector[category_nb - 1] = 1.0
    return vector

def normalize_vector(category_vector):
    '''
    Noramlizes a category vector, dividing all the elements by the sum of of occurences
    '''
    total_occurences = np.sum(category_vector)
    return category_vector / total_occurences


normalized_categories_vector_per_session = (
    test_sessions.rdd
    .map(lambda x: (x.item_id, x.session_id))  # Maps the rows to (key=item_id,values=sessions_id) tuples
    .join(item_features.rdd.map(lambda x: (x.item_id, x.feature_category_id)))  # Joins the two datasets togheter by item_id
    .map(lambda x: x[1])  # Only keeps some parts of the tuples, now we have (key=session_id,values=feature_category_id)
    .mapValues(initialize_vector)  # Encodes the feature category inside a counter vector
    .reduceByKey(lambda x, y: x + y)   # Reduces by session_id, and adds the counter vectors
    .mapValues(normalize_vector)  # Noramlizes the vector, so that the sum of every element is equal to 1.0
)

bc_feature_category_clusters = sc.broadcast(feature_category_clusters) # broadcasting is a good practice for parallel computing
clustered_categories = normalized_categories_vector_per_session.map(lambda x : (x[0], mapstep(x[1] ,bc_feature_category_clusters))) # compute associated clusters for each item

                                                                                

In [8]:
# Compute clustered item value counter in sessions (25 features)

unique_feature_values = (
    item_features.rdd
    .map(lambda x: (set([x.feature_value_id])))  # Maps each row to a set containing the feature value
    .reduce(lambda x, y: x.union(y))  # Joins the sets, discarding duplicate values (only keeping unique values)
)

highest_feature_value = max(unique_feature_values)  # Takes the highest value in the set, used as vector sie


def initialize_value_vector(value_nb, max_values=highest_feature_value):
    '''
    Returns a vector of item feature values, where each element is 0 except the one at the values's index.
    '''
    vector = np.zeros(max_values, dtype=np.float32)
    vector[value_nb - 1] = 1.0
    return vector

encoded_item_values = (
    item_features.rdd
    .map(lambda x: (x.item_id, x.feature_value_id))  # Maps each row to a (key=item_id, values=feature_value_id) tuples
    .mapValues(initialize_value_vector)  # Initializes each value vector (one hot encoding type)
    .reduceByKey(lambda x, y: x + y)  # Reduces on each item by summing the feature value vector
)

bc_feature_values_clusters = sc.broadcast(feature_values_clusters) # broadcasting is a good practice for parallel computing
clustered_item_values = encoded_item_values.map(lambda x : (x[0], mapstep(x[1],bc_feature_values_clusters))) # compute associated clusters for each item

# Encodes the clustered item values in one-hot encoding
encoded_item_values = (
    clustered_item_values
    .mapValues(lambda x: initialize_value_vector(x, max_values=25))
)

# Computes the vector for each session
item_features_session = (
    test_sessions.rdd
    .map(lambda x: (x.item_id, x.session_id))
    .join(encoded_item_values)
    .map(lambda x: (x[1][0], x[1][1]))
    .reduceByKey(lambda x, y: x + y)
    .mapValues(normalize_vector)
    .mapValues(tuple)
)

                                                                                

In [9]:
# First and last visited items

first_last_item = (
    test_sessions.rdd
    .map(lambda x: (x.session_id, (x.item_id, parse_datetime(x.date))))  # Maps the rows into a (key=session_id, value=(item_id, date)) tuple
    .mapValues(lambda x: ([(x)]))  # Puts the (item_id, date) tuple into a list to easily merge the values
    .reduceByKey(lambda x, y: x + y)  # Groups the items per session, merging the session's items and their date
    .mapValues(lambda x: sorted(x, key=lambda x_: x_[1]))  # In each session, sorts the items by date (earliest to latest)
    .mapValues(lambda x: (x[0][0], x[-1][0]))  # Takes the first and last visited items
    .cache()
)

first_last_item_clustered = first_last_item.mapValues(lambda x: (item_cluster_dict[x[0]], item_cluster_dict[x[1]]))

In [10]:
revisited_clustered = revisited_clustered.map(lambda x: (x[0],x[1]+most_visited_dict.get(x[1][0])))
most_spent_time_clustered = most_spent_time_clustered.map(lambda x: (x[0],x[1]+most_time_dict.get(x[1][0])))

In [None]:
def unpack_tuples(row):
    out = []
    for elem in row:
        if type(elem) == tuple:
            out.extend(unpack_tuples(elem))
        else:
            out.append(elem)
    return out

final_engineered_dataset = (
    time_sessions_in_seconds
    .join(season_per_session)
    .join(day_period_per_session)
    .join(month_per_session)
    .join(year_per_session)
    .join(most_spent_time_clustered)
    .join(mean_std_time_session)
    .join(revisited_clustered)
    .join(first_last_item_clustered)
    .join(clustered_categories)
    .join(item_features_session)
    .mapValues(unpack_tuples)
)

tuple_final_engineered_dataset = (
    final_engineered_dataset.map(lambda x: ((x[0],) + tuple(x[1])))
)

columns = ['session_id', 'session_time', 'season', 'day_period', 'month', 'year', 'item_most_time_spent', 'most_time_spent_on_item', 'most_frequently_bought_for_time_spent', 
          'least_frequently_bought_for_time_spent', 'mean_time', 'std_time', 'item_most_visited', 'number_o_visit', 'number_o_revisited_items', 'most_frequently_bought_for_most_revisited',
          'first_item_visited', 'last_item_visited', 'normalized_features_vector']

columns += [str(i+1) for i in range(25)]

(
tuple_final_engineered_dataset
    .map(lambda x : tuple([float(xi) for xi in x]))
    .coalesce(1)
    .toDF(columns)
    .write.option("header",True)
    .csv('../Data/TEMPtrain_sessions.csv')
)

22/05/28 08:33:07 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                