# Prelude

In [None]:
import sys
from operator import add
import json
from collections import Counter
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, IDF, CountVectorizer, ChiSqSelector, StringIndexer, Normalizer
from pyspark.ml.classification import LinearSVC, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder
import numpy as np
import re
import math
from pathlib import Path
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType, BooleanType, ArrayType
from pyspark.sql.functions import rand, split, posexplode, explode, col, collect_list, struct
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
import pandas as pd

In [None]:
# Global constants
data_dir = 'recsys19/'

In [None]:
# Initialize the spark session used for all calculations
spark = SparkSession \
    .builder \
    .appName('recsys19') \
    .getOrCreate()

# Data loading


In [None]:
# Load the metadata file
# Construct the schema so the datatypes are set correctly.
# First the file is read without a schema to get all column names
df_meta = spark.read.csv(
    data_dir + 'df_metadata.csv',
    header=True,
)

# Actually construct the schema
schema_meta = StructType() \
    .add('pandas_id', IntegerType()) \
    .add('item_id', StringType())


for subtype in df_meta.schema:
    name = subtype.name
    
    if name in ('_c0', 'item_id'):
        continue
        
    schema_meta = schema_meta.add(
        StructField(name, IntegerType()))

# Read again, this time with schema
df_meta = spark.read.csv(
    data_dir + 'df_metadata.csv',
    header=True,
    schema=schema_meta
)

# This column was added inadvertently during preprocessing, drop it
df_meta = df_meta.drop('pandas_id')

df_meta.first()

In [None]:
# Load the sessions file
# Construct the schema so the datatypes are set correctly.
schema_sessions = StructType([
    StructField("pandas_id", IntegerType()),
    StructField("user_id", StringType()),
    StructField("session_id", StringType()),
    StructField("timestamp", StringType()),
    StructField("step", IntegerType()),
    StructField("action_type", StringType()),
    StructField("reference", StringType()),
    StructField("platform", StringType()),
    StructField("city", StringType()),
    StructField("device", StringType()),
    StructField("current_filters", StringType()),
    StructField("impressions", StringType()),
    StructField("prices", StringType()),
    StructField("is_validation", BooleanType()),
    StructField("is_train", BooleanType()),
])

df_sessions = spark.read.csv(
#     data_dir + 'df_sessions_small.csv',
    data_dir + 'df_sessions_full.csv',
    header=True,
    schema=schema_sessions
)

# This column was added inadvertently during preprocessing, drop it
df_sessions = df_sessions.drop('pandas_id')

df_sessions.first()


# Preprocessing

## Calculate the facets

In [None]:
# Extract as much information as possible from each session.
# We'll call the per-action data facets

def add_metadata(df):
    df = df.selectExpr('session_id', 'reference AS item_id')
    
    df = df.join(
        df_meta,
        'item_id'
    )
    
    df = df.drop('item_id')
    
    return df


# For each facet, define a function called `preprocess_<facet-name>`
# This function is passed the rows with matching action types and should
# output the facet data. This data hast to have a `session_id` so it can
# be joined with the other data.
preprocess_interact_picture = add_metadata
preprocess_clickout = add_metadata
preprocess_interact_rating = add_metadata
preprocess_interact_info = add_metadata
preprocess_interact_deals = add_metadata

    
# Metadata about sessions:
#   action_name : (name_as_in_dataset, short_name)
facets = {
    'interact_picture': ('interaction item image', 'img'),
    'clickout': ('clickout item', 'cout'),
    # '': ('search for destination',),
    # 'search_item': ('search for item',),
    'interact_rating': ('interaction item rating', 'rat'),
    # '': ('search for poi',),
    'interact_info': ('interaction item info', 'info'),
    'interact_deals': ('interaction item deals', 'deal'),
    # '': ('change of sort order',),
    # 'filter': ('filter selection',),
}


# Extract the facets
facet_data = {}
for name, params in facets.items():
    action_name, prefix = params
    
    print(f'Computing facet {name} ("{prefix}")')

    # Select the rows applicable to this facet
    facet_in = df_sessions.where(f"action_type == '{action_name}'")

    # Apply the facet's preprocessing function
    preproc_function = globals()[f'preprocess_{name}']
    facet_out = preproc_function(facet_in)
    
    facet_data[prefix] = facet_out


## Merge the facets & multiplex the impressions

In [None]:
# Now that all data has been extracted, merge it into a combined
# dataset holding all the information.
all_cols      = df_sessions.filter('action_type = "clickout item"').cache()
data_cols     = all_cols.select('session_id', 'platform', 'city', 'device', 'is_validation', 'is_train', 'reference')
distinct_cols = all_cols.select('session_id', 'impressions').distinct()

# Add columns from the raw input. They were removed before so the 'distinct' operation doesn't operate on them
df_joined = distinct_cols.join(
    data_cols,
    'session_id'
)

# Add columns supplied by the facets
for prefix, subdata in facet_data.items():
    print(f'Adding {prefix} facet')
    
    # Add a prefix to the column names of the subdata
    subdata = subdata.toDF(*((f'fac_{prefix}_{c}' if c != 'session_id' else c) for c in subdata.columns ))
    
    # Join the dataframes
    df_joined = df_joined.join(
        subdata,
        on='session_id',
        how='left_outer'
    )


# Per-impression processing

# Explode the impressions
df_joined = df_joined.withColumn(
    'impressions',
    split(col('impressions'), '\\|')
)

df_joined = df_joined.selectExpr(
    '*',
    'posexplode(impressions) as (impression_index, impression_id)',
).drop('impressions')


# Add the ground truth
df_joined = df_joined.withColumn(
    'gt',
    (df_joined.impression_id == df_joined.reference).cast('float')
).drop('reference')


# Add the per-impression metadata
prefixed_df_meta = df_meta.toDF(*((f'gt_{c}' if c != 'item_id' else c) for c in df_meta.columns ))


df_joined = df_joined.join(
    prefixed_df_meta,
    on=df_joined.impression_id == prefixed_df_meta.item_id
    how='left_outer'
).drop('item_id')


# Names of all columns that are used for training the ML model
feature_col_names = [cn for cn in df_joined.columns if cn.startswith('gt_') or cn.startswith('fac_')]
print('Feature columns: ', feature_col_names)

# Merge the ML features into a single vector
assembler = VectorAssembler(
    inputCols=feature_col_names,
    outputCol="ml_features",
    handleInvalid='skip'
)

df_joined = assembler.transform(df_joined)

df_joined.cache()

# df_joined.printSchema()


In [None]:
# Print some statistics
print(f'sessions: {distinct_cols.count()}')
print(f'df_joined: {df_joined.count()} rows, {len(df_joined.columns)} columns')

In [None]:
# Train the estimator
train_rows = df_joined.filter(df_joined.gt.isNotNull())

estimator = LinearRegression(featuresCol='ml_features', labelCol='gt', maxIter=10)
estimator_model = estimator.fit(train_rows)

print(estimator_model.intercept, estimator_model.coefficients[:5], '...')

In [None]:
# Generate the predictions DF
# Make predictions
df_preds = df_joined.filter(df_joined.gt.isNull())

df_preds = estimator_model.transform(df_preds)

# Gather the impression rows
df_preds = df_preds.withColumn(
    'pairs',
    struct(['impression_id', 'prediction'])
)
    
df_preds = df_preds.groupBy('session_id').agg(collect_list('pairs').alias('collected'))

# Postprocess with pandas
# Bring the dataframe into the format needed by the ensemble
df_preds = df_preds.toPandas()

df_preds['item_recommendations'] = df_preds.collected.apply(lambda x: ' '.join(str(y[0]) for y in x))
df_preds['item_probs'] = df_preds.collected.apply(lambda x: ' '.join(str(y[1]) for y in x))

df_preds.drop(columns=['collected'], inplace=True)

df_preds.head()


In [None]:
# Add all columns required by the output format
df_results = df_sessions.select(
    'user_id',
    'session_id',
    'timestamp',
    'step',
   ).filter(df_sessions.reference.isNull()) \
    .filter('action_type = "clickout item"') \
    .distinct() \
    .toPandas()
    
print(len(df_results))

df_results = df_results.merge(
    df_preds,
    on='session_id'
)

df_results['timestamp'] = pd.to_datetime(df_results.timestamp).astype(np.int64) // 10 ** 9

df_results.head()

In [None]:
# Dump the results
df_results.to_csv('session_based_full.csv')