# Install packages

In [11]:
#pip install lightfm

In [10]:
#pip install distributed

In [12]:
#pip install pyspark

In [13]:
#pip install dask

In [16]:
#pip install dask_jobqueue

In [3]:
#pip install pyarrow

# Package and Environment Setup

In [2]:
import os
import sqlite3
import pandas as pd
import numpy as np
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
import datetime
from time import time
from pyspark.sql import SparkSession

In [3]:
import dask
import dask.bag as db
import dask.dataframe as dd
from distributed import Client
from dask_jobqueue import SLURMCluster

In [4]:
import lightfm
from lightfm.data import Dataset
from lightfm import LightFM
from lightfm.evaluation import precision_at_k
from itertools import islice
from collections import Counter
from scipy.sparse import csr_matrix

In [5]:
# Set LOCAL to True for single-machine execution while developing
# Set LOCAL to False for cluster execution
LOCAL = True

if LOCAL:
    # This line creates a single-machine dask client
    client = Client()
else:    
    # This line creates a SLURM cluster dask and dask client
    # Logging outputs will be stored in /scratch/{your-netid}
    
    cluster = SLURMCluster(memory='4GB', cores=2, python='/scratch/work/public/dask/bin/python', 
                               local_directory='/tmp/{}/'.format(os.environ['SLURM_JOB_USER']),
                               job_extra=['--output=/scratch/{}/slurm-%j.out'.format(os.environ['SLURM_JOB_USER'])])

    cluster.submit_command = 'slurm'
    cluster.scale(100)

    display(cluster)
    client = Client(cluster)

display(client)

0,1
Client  Scheduler: tcp://127.0.0.1:36153  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 1  Cores: 1  Memory: 8.00 GiB


In [6]:
# Setup and configure spark session
spark = SparkSession.builder \
  .master("local") \
  .appName("parquet_example") \
  .config('spark.sql.execution.arrow.pyspark.enabled', True) \
  .config('spark.sql.session.timeZone', 'UTC') \
  .config('spark.driver.memory','32G') \
  .config('spark.ui.showConsoleProgress', True) \
  .config('spark.sql.repl.eagerEval.enabled', True) \
  .getOrCreate()

# Enable Arrow-based columnar data
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# Import dataset

In [7]:
# Read data from hdfs
df_val = spark.read.parquet('/scratch/work/courses/DSGA1004-2021/MSD/cf_validation.parquet')
df_test = spark.read.parquet('/scratch/work/courses/DSGA1004-2021/MSD/cf_test.parquet')
df_train = spark.read.parquet('/scratch/work/courses/DSGA1004-2021/MSD/cf_train_new.parquet')

# Data preprocessing

## Transform to interactions matrix

In [8]:
def preprocessing(df_train, df_test):
    
    '''
        Params:
            df: dataframe which contains 'user_id','track_id','count' columns
            
        Return: 
            interactions: 
            weights: 
    '''
    total_item_user = pd.concat([df_train, df_test])
    
    data_obj = Dataset()
    data_obj.fit(users=total_item_user['user_id'].unique(), items=total_item_user['track_id'].unique())
    
    interactions_train, weights_train = data_obj.build_interactions([(df_train['user_id'][i], df_train['track_id'][i], \
                                                                      df_train['count'][i]) for i in range(df_train.shape[0])])
    interactions_test, weights_test = data_obj.build_interactions([(df_test['user_id'][i], \
                                                          df_test['track_id'][i], df_test['count'][i]) for i in range(df_test.shape[0])])
        
    return interactions_train, weights_train, interactions_test, weights_test

# Train LightFM

In [9]:
def lightfm_train(train, rank, regParam, maxIter, model_type, weights):
    
    '''
        Params:
            train: training csr matrix in form of scipy.sparse.COOMatrix
            rank: dimensionality of the feature latent embeddings
            regParam: L2 penalty on user features
            maxIter: number of epochs to run
            model_type: 'warp' - Weighted Approximate-Rank Pairwise Loss 
                        'bpr' - Bayesian Personalised Ranking
                         these 2 model types are proved to work for implicit feedback
        Return: 
            model: lightfm model trained on training set
            
            return type: lightfm instance
    '''
    if model_type == 'bpr':
        model = LightFM(loss='bpr', no_components=rank, user_alpha=regParam)
        
    else:    
        model = LightFM(loss='warp', no_components=rank, user_alpha=regParam)

    model = model.fit(interactions=train, sample_weight=weights, epochs=maxIter, verbose=False)
    
    return model

In [10]:
def train_and_test(train, test, train_weights, rank, regParam, maxIter, top, model_type):
    
    '''
        Params:
            train: training csr matrix in form of scipy.sparse.COOMatrix
            test: test csr matrix in form of scipy_sparse_COOMatrix
            rank: dimensionality of the feature latent embeddings
            regParam: L2 penalty on user features
            maxIter: number of epochs to run
            top: number of top recommendations to evaluate on
            model_type: 'warp' - Weighted Approximate-Rank Pairwise Loss 
                        'bpr' - Bayesian Personalised Ranking
            
        Return: 
            p_at_k: precision at k
            time: time for train and evaluation
    '''
    print('Model with maxIter = {}, reg = {}, rank = {} complete'.format(maxIter,regParam,rank))
    st = time()
    
    model = lightfm_train(train, rank, regParam, maxIter, model_type, train_weights)

    t = round(time()-st, 5)
    print('Time used:', t)
    
    p_at_k = precision_at_k(model, test, k=top).mean()
    print('Precision at K:', p_at_k)
    
    return p_at_k, t

## Use test file

In [11]:
percent_train = [0.001, 0.01]

In [13]:
for pct in percent_train:
    print('Model with {}% of training file size'.format(100*pct))
    sample_train = df_train.sample(False, pct).toPandas()
    sample_train = sample_train.drop('__index_level_0__', axis = 1)
    
    if sample_train.isnull().values.any():
        print('Check null values')
    
    sample_test = df_test.toPandas().drop('__index_level_0__', axis = 1)
    interactions_train, weights_train, interactions_test, weights_test = preprocessing(sample_train, sample_test)
    p_at_k, t = train_and_test(interactions_train, interactions_test, weights_train, rank=30, regParam=0.05, maxIter=5,\
                           top=500, model_type='warp')

Model with 0.1% of training file size
Model with maxIter = 5, reg = 0.05, rank = 30 complete
Precision at K: 0.0048457
Time used: 783.18844
Model with 1.0% of training file size
Model with maxIter = 5, reg = 0.05, rank = 30 complete
Precision at K: 0.0050727203
Time used: 921.37655


In [None]:
percent_train = [0.05]
for pct in percent_train:
    print('Model with {}% of training file size'.format(100*pct))
    sample_train = df_train.sample(False, pct).toPandas()
    sample_train = sample_train.drop('__index_level_0__', axis = 1)
    
    if sample_train.isnull().values.any():
        print('Check null values')
    
    sample_test = df_test.toPandas().drop('__index_level_0__', axis = 1)
    interactions_train, weights_train, interactions_test, weights_test = preprocessing(sample_train, sample_test)
    p_at_k, t = train_and_test(interactions_train, interactions_test, weights_train, rank=30, regParam=0.05, maxIter=5,\
                           top=500, model_type='warp')

Model with 5.0% of training file size
Model with maxIter = 5, reg = 0.05, rank = 30 complete
Precision at K: 0.0051004402
Time used: 1313.00878
Model with 10.0% of training file size


In [11]:
percent_train = [0.1]
for pct in percent_train:
    print('Model with {}% of training file size'.format(100*pct))
    sample_train = df_train.sample(False, pct).toPandas()
    sample_train = sample_train.drop('__index_level_0__', axis = 1)
    
    if sample_train.isnull().values.any():
        print('Check null values')
    
    sample_test = df_test.toPandas().drop('__index_level_0__', axis = 1)
    interactions_train, weights_train, interactions_test, weights_test = preprocessing(sample_train, sample_test)
    p_at_k, t = train_and_test(interactions_train, interactions_test, weights_train, rank=30, regParam=0.05, maxIter=5,\
                           top=500, model_type='warp')

Model with 10.0% of training file size
Model with maxIter = 5, reg = 0.05, rank = 30 complete
Precision at K: 0.00508876
Time used: 1454.60265


## Use validation set, with weights, maxIter=1

In [12]:
percent_train = [0.001, 0.005, 0.01, 0.05, 0.1]

In [None]:
for pct in percent_train:
    print('Model with {}% of training file size'.format(100*pct))
    sample_train = df_train.sample(False, pct).toPandas()
    sample_train = sample_train.drop('__index_level_0__', axis = 1)
    
    if sample_train.isnull().values.any():
        print('Check null values')
    
    sample_test = df_val.toPandas().drop('__index_level_0__', axis = 1)
    interactions_train, weights_train, interactions_test, weights_test = preprocessing(sample_train, sample_test)
    p_at_k, t = train_and_test(interactions_train, interactions_test, weights_train, rank=150, regParam=0.05, maxIter=1,\
                           top=500, model_type='warp')

Model with 0.1% of training file size
Model with maxIter = 1, reg = 0.05, rank = 150 complete
Time used: 0.60603
Precision at K: 0.0046532
Model with 0.5% of training file size
Model with maxIter = 1, reg = 0.05, rank = 150 complete
Time used: 3.04343
Precision at K: 0.0048607998
Model with 1.0% of training file size
Model with maxIter = 1, reg = 0.05, rank = 150 complete
Time used: 6.72171
Precision at K: 0.0049933996
Model with 5.0% of training file size
Model with maxIter = 1, reg = 0.05, rank = 150 complete
Time used: 40.43237
Precision at K: 0.0050335997
Model with 10.0% of training file size
Model with maxIter = 1, reg = 0.05, rank = 150 complete
Time used: 78.69662
Precision at K: 0.00505
Model with 20.0% of training file size
Model with maxIter = 1, reg = 0.05, rank = 150 complete


In [11]:
percent_train = [0.15]
for pct in percent_train:
    print('Model with {}% of training file size'.format(100*pct))
    sample_train = df_train.sample(False, pct).toPandas()
    sample_train = sample_train.drop('__index_level_0__', axis = 1)
    
    if sample_train.isnull().values.any():
        print('Check null values')
    
    sample_test = df_val.toPandas().drop('__index_level_0__', axis = 1)
    interactions_train, weights_train, interactions_test, weights_test = preprocessing(sample_train, sample_test)
    p_at_k, t = train_and_test(interactions_train, interactions_test, weights_train, rank=150, regParam=0.05, maxIter=1,\
                           top=500, model_type='warp')

Model with 15.0% of training file size
Model with maxIter = 1, reg = 0.05, rank = 150 complete
Time used: 114.18631
Precision at K: 0.0050488


## use validation set, maxiter=10, with weights

In [None]:
percent_train = [0.001, 0.01, 0.05, 0.1]

In [None]:
for pct in percent_train:
    print('Model with {}% of training file size'.format(100*pct))
    sample_train = df_train.sample(False, pct).toPandas()
    sample_train = sample_train.drop('__index_level_0__', axis = 1)
    
    if sample_train.isnull().values.any():
        print('Check null values')
    
    sample_test = df_val.toPandas().drop('__index_level_0__', axis = 1)
    interactions_train, weights_train, interactions_test, weights_test = preprocessing(sample_train, sample_test)
    p_at_k, t = train_and_test(interactions_train, interactions_test, weights_train, rank=150, regParam=0.05, maxIter=10,\
                           top=500, model_type='warp')

Model with 0.1% of training file size
Model with maxIter = 10, reg = 0.05, rank = 150 complete
Precision at K: 0.0047962
Time used: 66.97906
Model with 1.0% of training file size
Model with maxIter = 10, reg = 0.05, rank = 150 complete
Precision at K: 0.0050112004
Time used: 174.51249
Model with 5.0% of training file size
Model with maxIter = 10, reg = 0.05, rank = 150 complete
Precision at K: 0.005011
Time used: 459.85961
Model with 10.0% of training file size


In [12]:
percent_train = 0.2
print('Model with {}% of training file size'.format(20))
sample_train = df_train.sample(False, percent_train).toPandas()
sample_train = sample_train.drop('__index_level_0__', axis = 1)

Model with 20% of training file size


In [16]:
sample_train['user_id'].unique().shape

(1095030,)

In [17]:
if sample_train.isnull().values.any():
    print('Check null values')
    
sample_test = df_val.toPandas().drop('__index_level_0__', axis = 1)
interactions_train, weights_train, interactions_test, weights_test = preprocessing(sample_train, sample_test)
#p_at_k, t = train_and_test(interactions_train, interactions_test, weights_train, rank=150, regParam=0.05, maxIter=10,\
#                           top=500, model_type='warp')