# ETL preprocessing - Creazione del session-based dataset

In questa fase si procede a fare una pipeline di preprocessing e feature engineering utilizzando Rapids cuDF e Merlin NVTabular per preparare il dataset al training effettivo del modello di session-based recommendation.

Per farlo è necessario creare delle features sequenziali ed i seguenti passi:


*   Categorify() sulle features di tipo categorico
*   Raggruppamento le features in base alle singole sessioni ordinandole in base al tempo con l'operazione Groupby()
*   Esportazione del dataset preprocessato in diversi file .parquet partizionati in base al giorno, creando train, valid e test set


In [None]:
!pip install cudf-cu11==22.12 dask-cudf-cu11==22.12 --extra-index-url=https://pypi.nvidia.com/.
!rm -rf /usr/local/lib/python3.8/dist-packages/cupy*
!pip install cuml-cu11 --extra-index-url=https://pypi.nvidia.com
!pip install cugraph-cu11 --extra-index-url=https://pypi.nvidia.com

In [None]:
!pip install transformers4rec[pytorch,nvtabular]

In [None]:
import os
import numpy as np 
import gc
import shutil
import glob

import dask_cudf
import cudf
import nvtabular as nvt
from merlin.dag import ColumnSelector
from merlin.schema import Schema, Tags
import pandas as pd 
from nvtabular.ops import Operator

In [None]:
df = cudf.read_parquet("/content/drive/MyDrive/dataset_rees46/2019-Oct.parquet")  
df.head(10)

Unnamed: 0,user_session,event_type,product_id,category_id,category_code,brand,price,user_id,event_time_ts,prod_first_event_time_ts
0,152,view,1801539,2053013554415534427,electronics.video.tv,lg,419.24,514971132,1570986064,1569898645
1,152,view,1801826,2053013554415534427,electronics.video.tv,toshiba,359.6,514971132,1570986081,1569898007
2,152,view,1801539,2053013554415534427,electronics.video.tv,lg,419.24,514971132,1570986090,1569898645
3,152,view,1801881,2053013554415534427,electronics.video.tv,samsung,496.4,514971132,1570986093,1569894423
4,152,view,1801539,2053013554415534427,electronics.video.tv,lg,419.24,514971132,1570986100,1569898645
5,152,view,1801400,2053013554415534427,electronics.video.tv,haier,437.33,514971132,1570986102,1569898200
6,152,view,1801539,2053013554415534427,electronics.video.tv,lg,419.24,514971132,1570986104,1569898645
7,152,view,1801693,2053013554415534427,electronics.video.tv,hisense,452.78,514971132,1570986110,1569902404
8,152,view,1801539,2053013554415534427,electronics.video.tv,lg,419.24,514971132,1570986115,1569898645
9,152,view,1801693,2053013554415534427,electronics.video.tv,hisense,452.78,514971132,1570986115,1569902404


In [None]:
df.shape

(30733301, 10)

In [None]:
df.isnull().any()

user_session                False
event_type                  False
product_id                  False
category_id                 False
category_code                True
brand                        True
price                       False
user_id                     False
event_time_ts               False
prod_first_event_time_ts    False
dtype: bool

### Categorify() sulle features di tipo categorico

In [None]:
# categorify features 
cat_feats = ['user_session', 'category_code', 'brand', 'user_id', 'product_id', 'category_id', 'event_type'] >> nvt.ops.Categorify(start_index=1)

### Estrazione delle feature temporali
Si estraggono le feature temporali a partire dalla feature event_time_ts, in formato datetime. 
Inoltre vengono create delle features relative al weekday: nello specifico si rappresenta il giorno della settimana come una feature ciclica, rappresentabili con seno e coseno, che possono essere rappresentate quindi nello spazio continuo (ore, giorno, settimane o mesi hanno caratteristiche cicliche intrinseche). 

https://ianlondon.github.io/blog/encoding-cyclical-features-24hour-time/



In [None]:
# create time features
session_ts = ['event_time_ts']

session_time = (
    session_ts >> 
    nvt.ops.LambdaOp(lambda col: cudf.to_datetime(col, unit='s')) >> 
    nvt.ops.Rename(name = 'event_time_dt')
)

sessiontime_weekday = (
    session_time >> 
    nvt.ops.LambdaOp(lambda col: col.dt.weekday) >> 
    nvt.ops.Rename(name ='et_dayofweek')
)

In [None]:
def get_cycled_feature_value_sin(col, max_value):
    value_scaled = (col + 0.000001) / max_value
    value_sin = np.sin(2*np.pi*value_scaled)
    return value_sin

def get_cycled_feature_value_cos(col, max_value):
    value_scaled = (col + 0.000001) / max_value
    value_cos = np.cos(2*np.pi*value_scaled)
    return value_cos

In [None]:
weekday_sin = sessiontime_weekday >> (lambda col: get_cycled_feature_value_sin(col+1, 7)) >> nvt.ops.Rename(name = 'et_dayofweek_sin')
weekday_cos= sessiontime_weekday >> (lambda col: get_cycled_feature_value_cos(col+1, 7)) >> nvt.ops.Rename(name = 'et_dayofweek_cos')

Aggiunta della feature Product Recency: si calcola la feature product recency in termini di giorno, attraverso una custom op.

In [None]:
# Compute Item recency: Define a custom Op 
class ItemRecency(nvt.ops.Operator):
    def transform(self, columns, gdf):
        for column in columns.names:
            col = gdf[column]
            item_first_timestamp = gdf['prod_first_event_time_ts']
            delta_days = (col - item_first_timestamp) / (60*60*24)
            gdf[column + "_age_days"] = delta_days * (delta_days >=0)
        return gdf

    def compute_selector(
        self,
        input_schema: Schema,
        selector: ColumnSelector,
        parents_selector: ColumnSelector,
        dependencies_selector: ColumnSelector,
    ) -> ColumnSelector:
        self._validate_matching_cols(input_schema, parents_selector, "computing input selector")
        return parents_selector

    def column_mapping(self, col_selector):
        column_mapping = {}
        for col_name in col_selector.names:
            column_mapping[col_name + "_age_days"] = [col_name]
        return column_mapping

    @property
    def dependencies(self):
        return ["prod_first_event_time_ts"]

    @property
    def output_dtype(self):
        return np.float64

In [None]:
recency_features = ['event_time_ts'] >> ItemRecency() 
recency_features_norm = recency_features >> nvt.ops.LogOp() >> nvt.ops.Normalize(out_dtype=np.float32) >> nvt.ops.Rename(name='product_recency_days_log_norm')

In [None]:
time_features = (
    session_time +
    sessiontime_weekday +
    weekday_sin +
    weekday_cos +
    recency_features_norm
)

###Normalizzazione delle feature continue


In [None]:
# Smoothing price long-tailed distribution and applying standardization
price_log = ['price'] >> nvt.ops.LogOp() >> nvt.ops.Normalize(out_dtype=np.float32) >> nvt.ops.Rename(name='price_log_norm')

Calcola la media del prezzo di ogni categoria e il prezzo di un item relativo al prezzo medio della sua categoria.
Ad esempio, se un item ha prezzo 50 e la sua categoria ha prezzo medio 115, con la formula presente in relative_price_to_avg_categ(), si avrà un risultato di -0.3913. Ciò significa che il prezzo del prodotto sarà il 39% meno del prezzo medio di tutti i prodotti della sua categoria.



In [None]:
# Relative price to the average price for the category_id
def relative_price_to_avg_categ(col, gdf):
    epsilon = 1e-5
    col = ((gdf['price'] - col) / (col + epsilon)) * (col > 0).astype(int)
    return col
    
#avg for each category id
avg_category_id_pr = ['category_id'] >> nvt.ops.JoinGroupby(cont_cols =['price'], stats=["mean"]) >> nvt.ops.Rename(name='avg_category_id_price')

#calculate the relative price of each product to the avg of his category 
relative_price_to_avg_category = avg_category_id_pr >> nvt.ops.LambdaOp(relative_price_to_avg_categ, dependency=['price']) >> nvt.ops.Rename(name="relative_price_to_avg_categ_id")


##Grouping interactions into sessions. Aggregate by session id and creates the sequential features

In [None]:
groupby_feats = ['event_time_ts', 'user_session'] + cat_feats + time_features + price_log + relative_price_to_avg_category

In [None]:
from merlin.schema.tags import Tags
from nvtabular.ops import *


SESSIONS_MAX_LENGTH = 20 
MINIMUM_SESSION_LENGTH = 2

# Define Groupby Workflow
groupby_features = groupby_feats >> nvt.ops.Groupby(
    groupby_cols=["user_session"], 
    sort_cols=["event_time_ts"],
    aggs={
        'user_id': ['first'],
        'product_id': ["list", "count"],
        'category_code': ["list"],  
        'brand': ["list"], 
        'category_id': ["list"], 
        'event_time_ts': ["first"],
        'event_time_dt': ["first"],
        'et_dayofweek_sin': ["list"],
        'et_dayofweek_cos': ["list"],
        'price_log_norm': ["list"],
        'relative_price_to_avg_categ_id': ["list"],
        'product_recency_days_log_norm': ["list"]
        },
    name_sep="-")

#truncate the sequence features in length according to sessions_max_length param, which is set as 20.
sequence_features_truncated = (
    groupby_features['category_id-list']
    >> nvt.ops.ListSlice(0,SESSIONS_MAX_LENGTH) #the first 20 
)

sequence_features_truncated_item = (
    groupby_features['product_id-list']
    >> nvt.ops.ListSlice(0,SESSIONS_MAX_LENGTH) #the first 20
    >> TagAsItemID()
) 

sequence_features_truncated_cat = (
    groupby_features['brand-list', 'category_code-list'] 
    >> nvt.ops.ListSlice(0,SESSIONS_MAX_LENGTH) 
    >> nvt.ops.AddMetadata(tags=[Tags.CATEGORICAL])
    
)

sequence_features_truncated_cont = (
    groupby_features['et_dayofweek_sin-list', 'et_dayofweek_cos-list', 'price_log_norm-list', 'relative_price_to_avg_categ_id-list','product_recency_days_log_norm-list'] 
    >> nvt.ops.ListSlice(0,SESSIONS_MAX_LENGTH) 
    >> nvt.ops.AddMetadata(tags=[Tags.CONTINUOUS])
)

In [None]:
# calculate session day index based on 'timestamp-first' column
day_index = ((groupby_features['event_time_dt-first'])  >> 
    nvt.ops.LambdaOp(lambda col: (col - col.min()).dt.days +1) >> 
    nvt.ops.Rename(f = lambda col: "day_index")
)

In [None]:
#Select certain columns to be saved
selected_features = (
    groupby_features['product_id-count', 'user_session'] + 
    sequence_features_truncated_item +
    sequence_features_truncated + 
    sequence_features_truncated_cat +
    sequence_features_truncated_cont +
    day_index
) 

In [None]:
#Filter out the session that have less than 2 interactions.
filtered_sessions = selected_features >> nvt.ops.Filter(f=lambda df: df["product_id-count"] >= MINIMUM_SESSION_LENGTH)

#value_count min and max as property
seq_feats_list = filtered_sessions['product_id-list', 'category_id-list'] >> nvt.ops.ValueCount()

# avoid numba warnings
from numba import config
config.CUDA_LOW_OCCUPANCY_WARNINGS = 0

In [None]:
dataset = nvt.Dataset(df)

workflow = nvt.Workflow(filtered_sessions)
workflow.fit(dataset)
sessions_gdf = workflow.transform(dataset).to_ddf()



In [None]:
sessions_gdf.head(10)



Unnamed: 0,product_id-count,user_session,product_id-list,category_id-list,brand-list,category_code-list,et_dayofweek_sin-list,et_dayofweek_cos-list,price_log_norm-list,relative_price_to_avg_categ_id-list,product_recency_days_log_norm-list,day_index
0,1008,2,"[621, 2934, 621, 2763, 11453, 35, 266, 762, 11...","[13, 13, 13, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, ...","[1, 43, 1, 15, 15, 15, 15, 15, 15, 15, 15, 15,...","[8, 8, 8, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, ...","[0.9749277124471076, 0.9749277124471076, 0.974...","[-0.2225218090494724, -0.2225218090494724, -0....","[0.08458537, -0.24322492, 0.08458537, -0.14456...","[-0.643282327368288, -0.7632182638622205, -0.6...","[0.74019545, 0.7369576, 0.74029607, 0.738543, ...",22
1,779,3,"[26848, 111412, 31916, 10389, 28422, 671, 898,...","[3, 3, 3, 3, 3, 20, 20, 20, 20, 20, 20, 20, 20...","[190, 111, 275, 290, 536, 31, 10, 31, 416, 185...","[1, 1, 1, 1, 1, 16, 16, 16, 16, 16, 16, 16, 16...","[0.9749277124471076, 0.9749277124471076, 0.974...","[-0.2225218090494724, -0.2225218090494724, -0....","[-0.5968732, -0.5826312, -0.57691675, -0.94772...","[0.044438490038896716, 0.06332365917531949, 0....","[-3.1486514, -3.1486514, -3.148419, -3.1486514...",1
2,365,4,"[7317, 8858, 551, 2184, 410, 14690, 644, 1745,...","[11, 11, 11, 203, 32, 83, 57, 35, 414, 121, 11...","[18, 25, 2, 1, 1, 28, 1, 1, 463, 1, 18, 18, 1,...","[10, 10, 10, 1, 21, 39, 1, 1, 1, 1, 10, 10, 1,...","[8.975979006501142e-07, 8.975979006501142e-07,...","[0.9999999999995972, 0.9999999999995972, 0.999...","[-0.93344843, -0.6120571, -0.641767, -1.155792...","[-0.7351362412156563, -0.6025327133923205, -0....","[1.015743, 1.0161717, 1.0157927, 1.0131326, 1....",27
3,353,5,"[1491, 2015, 1491, 307, 100, 317, 1866, 544, 3...","[4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, ...","[24, 11, 24, 11, 11, 24, 16, 16, 24, 1, 24, 24...","[4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, ...","[-0.43388454782514785, -0.43388454782514785, -...","[-0.9009684784489228, -0.9009684784489228, -0....","[0.9421038, 0.9506214, 0.9421038, 1.1711547, 0...","[-0.34403762182916375, -0.3370530946772217, -0...","[0.8557798, 0.85758567, 0.85609555, 0.8631584,...",24
4,332,6,"[1190, 3744, 6861, 625, 1190, 205, 195, 208, 3...","[316, 26, 26, 23, 316, 35, 26, 32, 26, 26, 26,...","[1, 1, 1, 1, 1, 42, 1, 1, 29, 1, 1, 29, 42, 1,...","[1, 15, 15, 14, 1, 1, 15, 21, 15, 15, 15, 15, ...","[0.43388293040961884, 0.43388293040961884, 0.4...","[-0.9009692573551896, -0.9009692573551896, -0....","[0.7888444, 0.4279169, 0.2778753, 0.029547354,...","[0.2947556335625865, -0.05620540373901745, -0....","[-2.125648, -2.183616, -2.233572, -2.182449, -...",2
5,329,7,"[2224, 5024, 4235, 1440, 7520, 6876, 8202, 588...","[22, 22, 22, 22, 22, 22, 22, 22, 22, 22, 22, 2...","[13, 37, 13, 13, 1, 1, 37, 37, 37, 37, 1, 470,...","[7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, ...","[0.43388293040961884, 0.43388293040961884, 0.4...","[-0.9009692573551896, -0.9009692573551896, -0....","[-0.17139448, -0.22419876, -0.17139448, -0.090...","[0.18221077494253135, 0.1066088315548633, 0.18...","[-0.32494533, -0.3529406, -0.32888177, -0.3281...",9
6,315,8,"[28, 10, 4, 60, 166, 172, 388, 146, 189, 668, ...","[2, 5, 2, 10, 10, 10, 2, 5, 2, 2, 2, 2, 2, 2, ...","[3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, ...","[2, 5, 2, 3, 3, 3, 2, 5, 2, 2, 2, 2, 2, 2, 4, ...","[-0.7818309228245777, -0.7818309228245777, -0....","[0.6234905036287797, 0.6234905036287797, 0.623...","[1.2530195, 0.052988723, 1.5090804, 0.6058624,...","[0.4381808097286804, 0.6185932022862595, 0.976...","[0.9911432, 0.99096197, 0.9912046, 0.9910791, ...",26
7,300,9,"[526, 442, 1019, 1665, 2863, 4076, 9708, 4250,...","[6, 80, 80, 80, 80, 80, 80, 80, 80, 80, 80, 80...","[2, 8, 8, 8, 10, 54, 8, 54, 10, 10, 10, 54, 54...","[6, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 45, 45...","[-0.7818309228245777, -0.7818309228245777, -0....","[0.6234905036287797, 0.6234905036287797, 0.623...","[1.6082963, -0.12502013, 0.0052444604, 0.62931...","[1.4248649151718304, -0.5245121508853557, -0.4...","[0.027798139, 0.026367055, 0.02467384, 0.03706...",12
8,294,10,"[171, 61, 4394, 857, 2546, 321, 33574, 2546, 8...","[6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, ...","[2, 2, 221, 2, 36, 8, 1, 36, 2, 2, 10, 2, 8, 2...","[6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, ...","[-0.43388454782514785, -0.43388454782514785, -...","[-0.9009684784489228, -0.9009684784489228, -0....","[0.8609703, 0.9505884, 0.72817767, 0.9416715, ...","[-0.042055452414229116, 0.07088232100611737, -...","[1.1877617, 1.1889449, 1.1831262, 1.1877685, 1...",31
9,286,11,"[838, 4742, 7754, 10876, 14169, 17649, 11077, ...","[18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 18, 1...","[161, 32, 1124, 155, 161, 58, 22, 75, 948, 22,...","[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ...","[-0.7818309228245777, -0.7818309228245777, -0....","[0.6234905036287797, 0.6234905036287797, 0.623...","[-0.7193222, -1.5103035, -1.6672652, -0.937035...","[0.13770067180673623, -0.5853870095808679, -0....","[0.0418801, 0.037589844, 0.045151792, 0.026399...",12


In [None]:
workflow.output_schema

Unnamed: 0,name,tags,dtype,is_list,is_ragged,properties.num_buckets,properties.freq_threshold,properties.max_size,properties.start_index,properties.cat_path,properties.domain.min,properties.domain.max,properties.domain.name,properties.embedding_sizes.cardinality,properties.embedding_sizes.dimension
0,product_id-count,(Tags.CATEGORICAL),int32,False,False,,0.0,0.0,1.0,.//categories/unique.product_id.parquet,0.0,166795.0,product_id,166796.0,512.0
1,user_session,(Tags.CATEGORICAL),int64,False,False,,0.0,0.0,1.0,.//categories/unique.user_session.parquet,0.0,9244422.0,user_session,9244423.0,512.0
2,product_id-list,"(Tags.LIST, Tags.ITEM_ID, Tags.ID, Tags.CATEGO...",int64,True,True,,0.0,0.0,1.0,.//categories/unique.product_id.parquet,0.0,166795.0,product_id,166796.0,512.0
3,category_id-list,"(Tags.CATEGORICAL, Tags.LIST)",int64,True,True,,0.0,0.0,1.0,.//categories/unique.category_id.parquet,0.0,625.0,category_id,626.0,59.0
4,brand-list,"(Tags.CATEGORICAL, Tags.LIST)",int64,True,True,,0.0,0.0,1.0,.//categories/unique.brand.parquet,0.0,3445.0,brand,3446.0,153.0
5,category_code-list,"(Tags.CATEGORICAL, Tags.LIST)",int64,True,True,,0.0,0.0,1.0,.//categories/unique.category_code.parquet,0.0,127.0,category_code,128.0,24.0
6,et_dayofweek_sin-list,"(Tags.LIST, Tags.CONTINUOUS)",float64,True,True,,,,,,,,,,
7,et_dayofweek_cos-list,"(Tags.LIST, Tags.CONTINUOUS)",float64,True,True,,,,,,,,,,
8,price_log_norm-list,"(Tags.LIST, Tags.CONTINUOUS)",float32,True,True,,,,,,,,,,
9,relative_price_to_avg_categ_id-list,"(Tags.LIST, Tags.CONTINUOUS)",float64,True,True,,,,,,,,,,


In [None]:
# define output_folder to store the partitioned parquet files
OUTPUT_FOLDER = os.environ.get("OUTPUT_FOLDER", "/content/drive/MyDrive/dataset_rees46/" + "sessions_by_day")
#os.mkdir(OUTPUT_FOLDER) if does not exist


In [None]:
from transformers4rec.data.preprocessing import save_time_based_splits

save_time_based_splits(data=nvt.Dataset(sessions_gdf),
                       output_dir= OUTPUT_FOLDER,
                       partition_col='day_index',
                       timestamp_col='user_session', 
                      )

Creating time-based splits: 100%|██████████| 31/31 [00:18<00:00,  1.66it/s]


In [None]:
workflow.fit_transform(dataset).to_parquet(os.path.join("/content/drive/MyDrive/dataset_rees46/", "processed_nvt"))



In [None]:
workflow_path = os.path.join("/content/drive/MyDrive/dataset_rees46/", 'workflow_etl')
workflow.save(workflow_path)