# Imports

In [1]:
import pandas as pd
import pickle
import time
import numpy as np
from lightfm.data import Dataset
from lightfm import LightFM
from lightfm.evaluation import precision_at_k
from scipy import sparse
from tqdm import tqdm
from pyspark.sql import SparkSession
from pyspark import SparkContext
# SparkContext.setSystemProperty('spark.executor.memory', '2g')
# sc = SparkContext("local", "App Name")
from pyspark.sql.functions import count, udf, collect_set
from pyspark.mllib.evaluation import RankingMetrics

import warnings
warnings.filterwarnings('ignore')

# Building Datasets

In [9]:
trainData = pd.read_parquet('/scratch/hk3820/train_data_all.parquet')
valData = pd.read_parquet('/scratch/hk3820/val_data_all.parquet')

In [16]:
val_cnts = valData[['user_id', 'recording_mbid', 'timestamp']].groupby(['user_id','recording_mbid'])['timestamp'].count()

In [19]:
val_cnts.reset_index().to_parquet('BigDataProject/Datasets/valData_final.parquet', engine='pyarrow')

In [None]:
trainData = trainData[['user_id', 'recording_mbid']]
trainData.columns = ['user_id', 'item_id']

valData = valData[['user_id', 'recording_mbid']]
valData.columns = ['user_id', 'item_id']

In [6]:
td = Dataset()
td.fit(users=trainData['user_id'], items=trainData['item_id'])

In [14]:
maps = td.mapping()
pickle.dump(maps[0], open('user_id_map.pkl', 'wb'))
pickle.dump(maps[2], open('item_id_map.pkl', 'wb'))

In [15]:
num_users, num_items = td.interactions_shape()
print('Num users: {}, num_items {}.'.format(num_users, num_items))

Num users: 7852, num_items 21069152.


In [19]:
train_sparse_dataset = td.build_interactions(data=list(zip(trainData.user_id, trainData.item_id)))

In [20]:
valData = valData[valData['user_id'].isin(trainData['user_id']) & valData['item_id'].isin(trainData['item_id'])]
val_sparse_dataset = td.build_interactions(data=list(zip(valData.user_id, valData.item_id)))

In [22]:
sparse.save_npz("validation.npz", val_sparse_dataset[0])
sparse.save_npz("train.npz", train_sparse_dataset[0])

# Training the Model

In [2]:
def train_model(train_sparse_dataset, 
                item_alpha=0.0, 
                user_alpha=0.0, 
                no_components=10, 
                loss='warp', 
                random_state=42, 
                num_epochs=10, 
                num_threads=8,
                verbose=True):
    """
    Trains a LightFM model given the model parameters
    """
    model = LightFM(loss=loss, 
                    random_state=random_state, 
                    no_components=no_components, 
                    item_alpha=item_alpha, 
                    user_alpha=user_alpha)
    model.fit(train_sparse_dataset, 
              epochs=num_epochs, 
              verbose=verbose, 
              num_threads=num_threads)
    return model

def save_model(model, modelfilename):
    """
    Saves a LightFM model
    """
    pickle.dump(model, open(modelfilename, 'wb'))

# Computing Representations

In [3]:
def generate_recommendations(model_fn, save_fn, user_map_fn='BigDataProject/Maps/user_id_map.pkl', item_map_fn='BigDataProject/Maps/item_id_map.pkl'):
    """
    Generate Recommendations for a given model
    """
    recommendations = pd.DataFrame(columns=['user_id', 'recommendations'])
    
    #Loading files
    print('Loading Model and Map Files...')
    model = pickle.load(open(model_fn, 'rb'))
    user_map = pickle.load(open(user_map_fn, 'rb'))
    item_map = pickle.load(open(item_map_fn, 'rb'))
    
    user_map = {v: k for k, v in user_map.items()}
    item_map = {v: k for k, v in item_map.items()}
    
    #Getting user and item representations
    print('Getting User and Item Representations...')
    item_repr = model.get_item_representations()
    user_repr = model.get_user_representations()
    
    item_bias = item_repr[0]
    item_weights = item_repr[1]

    user_bias = user_repr[0]
    user_weights = user_repr[1]
    
    print("Generating Representations...")
    for i in tqdm(range(user_weights.shape[0])):
        user_embedding = user_weights[i]
        scores = user_embedding@item_weights.T
        scores += user_bias[i]
        scores = np.add(scores, item_bias)
        sorted_indices = np.argpartition(scores, -100)[-100:][::-1]
        user_id = user_map[i]
        item_mbids = [item_map[j] for j in sorted_indices]
        recommendations = recommendations.append({'user_id': user_id, 'recommendations': item_mbids}, ignore_index=True)
        
    print("Saving Recommendations...")
    recommendations.to_parquet(save_fn, engine='pyarrow')

# Computing Metrics

In [13]:
def compute_metrics(validation_fn, recommendation_fn, k=100):
    spark = SparkSession.builder.config('spark.executor.memory', '70g').config('spark.driver.memory', '50g').config('spark.storage.memoryFraction', '0.05').appName("recsys").getOrCreate()
    val_df = val_df = spark.read.parquet(validation_fn, schema="recording_mbid INT, user_id INT, timestamp INT")
    reco_df = spark.read.parquet(recommendation_fn, schema="user_id INT, recommendations ARRAY")
    val_df = val_df.groupby("user_id").agg(collect_set("recording_mbid").alias('true_items'))
    reco_and_labels_df = reco_df.join(val_df, "user_id", "inner")
    reco_and_labels_rdd = reco_and_labels_df.rdd.map(lambda row: (row['recommendations'], row['true_items']))
    metrics = RankingMetrics(reco_and_labels_rdd)
    return metrics.precisionAt(k), metrics.meanAveragePrecisionAt(k), metrics.ndcgAt(k)

# Full Run

In [4]:
def run(train_fn,
        validation_fn,
        item_alpha=0.0, 
        user_alpha=0.0, 
        no_components=10, 
        loss='warp', 
        random_state=42, 
        num_epochs=10, 
        user_map = 'BigDataProject/Maps/user_id_map.pkl',
        item_map = 'BigDataProject/Maps/item_id_map.pkl',
        num_threads=8, 
        verbose=True):
    """
    Full Run
    """
    print('__________________________________________________\n')
    print(f'Item Alpha: {item_alpha}')
    print(f'User Alpha: {user_alpha}')
    print(f'Rank: {no_components}')
    print('Loading Train Data...')
    train_sparse_dataset = sparse.load_npz(train_fn)
    modelfilename = f'BigDataProject/Models/model_item_alpha_{item_alpha}_user_alpha_{user_alpha}_rank_{no_components}.pkl'
    recommendation_fn = f'BigDataProject/Recommendations/recommendation_item_alpha_{item_alpha}_user_alpha_{user_alpha}_rank_{no_components}.parquet'
    print('Beginning Training...')
    train_start = time.time()
    model = train_model(train_sparse_dataset, 
                item_alpha=item_alpha, 
                user_alpha=user_alpha, 
                no_components=no_components, 
                loss=loss, 
                random_state=random_state, 
                num_epochs=num_epochs, 
                num_threads=num_threads,
                verbose=verbose)
    end = time.time()
    time_taken_train = time.time() - train_start
    print(f'Time Taken For Training: {int(time_taken_train//60)} minutes and {int(time_taken_train - (time_taken_train//60)*60)} seconds')
    print('Saving Model...')
    save_model(model, modelfilename)
    print('Generating Recommendations...')
    recommendation_start = time.time()
    generate_recommendations(model_fn=modelfilename, save_fn=recommendation_fn)
    time_taken_recommendation = time.time() - recommendation_start
    print(f'Time Taken For Generating Recommendations: {int(time_taken_recommendation//60)} minutes and {int(time_taken_recommendation - (time_taken_recommendation//60)*60)} seconds')
    print('Computing Metrics...')
    precision, mapk, ndcg = compute_metrics(validation_fn, recommendation_fn)
    print(f'Precision@100: {precision}')
    print(f'MAP@100: {mapk}')
    print(f'ndcg@100: {ndcg}')
    return time_taken_train, time_taken_recommendation, precision, mapk, ndcg, modelfilename, recommendation_fn

# Prediction - Test Dataset

In [8]:
test_fn = 'test_data.parquet'
recommendations_fn = 'BigDataProject/Recommendations/recommendation_item_alpha_1e-06_user_alpha_1e-06_rank_25.parquet'
compute_metrics(validation_fn=test_fn, recommendation_fn=recommendations_fn)

                                                                                

(0.1383703568827385, 0.054416945349210835, 0.14042539874020735)

# Small Dataset - 20%

In [14]:
trainData = pd.read_parquet('data_20.parquet', engine='pyarrow')
trainData = trainData[['user_id', 'interactions_count', 'recording_mbid']]
trainData.columns = ['user_id', 'interactions_count', 'item_id']

In [15]:
td = Dataset()
td.fit(users=trainData['user_id'], items=trainData['item_id'])

In [16]:
maps = td.mapping()
pickle.dump(maps[0], open('user_id_map_20.pkl', 'wb'))
pickle.dump(maps[2], open('item_id_map_20.pkl', 'wb'))

In [17]:
num_users, num_items = td.interactions_shape()
print('Num users: {}, num_items {}.'.format(num_users, num_items))

Num users: 7761, num_items 8557723.


In [18]:
train_sparse_dataset = td.build_interactions(data=list(zip(trainData.user_id, trainData.item_id, trainData.interactions_count)))

In [24]:
sparse.save_npz('train_20.npz', train_sparse_dataset[0])

In [27]:
user_alpha_list = [1e-6]
item_alpha_list = [1e-6]

result_df = pd.DataFrame(columns=['user_alpha', 'item_alpha', 'rank', 'train_time', 'modelfilename','recommendation_time', 'recommendationfilename', 'precision@100', 'map@100', 'ndcg@100'])
train_fn = 'train_20.npz'
val_fn = 'BigDataProject/Datasets/valData_final.parquet'

for ua in user_alpha_list:
    for ia in item_alpha_list:
        time_train, time_rec, p, m, n, model_fn, recommendation_fn = run(train_fn=train_fn, validation_fn=val_fn, user_alpha=ua, item_alpha=ia, no_components=25, user_map='user_id_map_20.pkl', item_map='item_id_map_20.pkl')
        result_df = result_df.append({
            'user_alpha': ua,
            'item_alpha': ia,
            'rank': 25,
            'train_time': time_train,
            'modelfilename': model_fn,
            'recommendation_time': time_rec,
            'recommendationfilename': recommendation_fn,
            'precision@100': p,
            'map@100': m,
            'ndcg@100': n
        }, ignore_index=True)

result_df.to_csv('BigDataProject/results_alpha_tuning.csv', index=False)

__________________________________________________

Item Alpha: 1e-06
User Alpha: 1e-06
Rank: 25
Loading Train Data...
Beginning Training...


Epoch: 100%|██████████| 10/10 [01:58<00:00, 11.85s/it]


Time Taken For Training: 2 minutes and 1 seconds
Saving Model...
Generating Recommendations...
Loading Model and Map Files...
Getting User and Item Representations...
Generating Representations...


100%|██████████| 7761/7761 [16:19<00:00,  7.92it/s]


Saving Recommendations...
Time Taken For Generating Recommendations: 16 minutes and 33 seconds
Computing Metrics...


                                                                                

Precision@100: 0.00018940858136838045
MAP@100: 9.549585647220643e-06
ndcg@100: 0.00018718254225636745


In [28]:
test_fn = 'test_data.parquet'
recommendations_fn = 'BigDataProject/Recommendations/recommendation_item_alpha_1e-06_user_alpha_1e-06_rank_25.parquet'
compute_metrics(validation_fn=test_fn, recommendation_fn=recommendations_fn)

                                                                                

(0.00018774157923799007, 8.313237630545671e-06, 0.0001818402385415233)

# Small Dataset - 50%

In [5]:
trainData = pd.read_parquet('data_50.parquet', engine='pyarrow')
trainData = trainData[['user_id', 'interactions_count', 'recording_mbid']]
trainData.columns = ['user_id', 'interactions_count', 'item_id']

In [6]:
td = Dataset()
td.fit(users=trainData['user_id'], items=trainData['item_id'])

In [7]:
maps = td.mapping()
pickle.dump(maps[0], open('user_id_map_50.pkl', 'wb'))
pickle.dump(maps[2], open('item_id_map_50.pkl', 'wb'))

In [8]:
num_users, num_items = td.interactions_shape()
print('Num users: {}, num_items {}.'.format(num_users, num_items))

Num users: 7852, num_items 16154649.


In [9]:
train_sparse_dataset = td.build_interactions(data=list(zip(trainData.user_id, trainData.item_id, trainData.interactions_count)))

In [10]:
sparse.save_npz('train_50.npz', train_sparse_dataset[0])

In [None]:
user_alpha_list = [1e-6]
item_alpha_list = [1e-6]

result_df = pd.DataFrame(columns=['user_alpha', 'item_alpha', 'rank', 'train_time', 'modelfilename','recommendation_time', 'recommendationfilename', 'precision@100', 'map@100', 'ndcg@100'])
train_fn = 'train_50.npz'
val_fn = 'BigDataProject/Datasets/valData_final.parquet'

for ua in user_alpha_list:
    for ia in item_alpha_list:
        time_train, time_rec, p, m, n, model_fn, recommendation_fn = run(train_fn=train_fn, validation_fn=val_fn, user_alpha=ua, item_alpha=ia, no_components=25, user_map='user_id_map_50.pkl', item_map='item_id_map_50.pkl')
        result_df = result_df.append({
            'user_alpha': ua,
            'item_alpha': ia,
            'rank': 25,
            'train_time': time_train,
            'modelfilename': model_fn,
            'recommendation_time': time_rec,
            'recommendationfilename': recommendation_fn,
            'precision@100': p,
            'map@100': m,
            'ndcg@100': n
        }, ignore_index=True)

result_df.to_csv('BigDataProject/results_alpha_tuning.csv', index=False)

In [14]:
test_fn = 'BigDataProject/Datasets/valData_final.parquet'
recommendations_fn = 'BigDataProject/Recommendations/recommendation_item_alpha_1e-06_user_alpha_1e-06_rank_25.parquet'
compute_metrics(validation_fn=test_fn, recommendation_fn=recommendations_fn)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/16 02:45:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

(0.00012480896586856852, 5.984202228062866e-06, 0.0001221091303719588)

In [15]:
test_fn = 'test_data.parquet'
recommendations_fn = 'BigDataProject/Recommendations/recommendation_item_alpha_1e-06_user_alpha_1e-06_rank_25.parquet'
compute_metrics(validation_fn=test_fn, recommendation_fn=recommendations_fn)

                                                                                

(0.0001584122359796067, 8.441638951592124e-06, 0.00015914219756867998)

In [17]:
test_fn = 'BigDataProject/Datasets/valData_final.parquet'
recommendations_fn = 'BigDataProject/Recommendations/recommendation_item_alpha_1e-08_user_alpha_1e-08_rank_25.parquet'
compute_metrics(validation_fn=test_fn, recommendation_fn=recommendations_fn)

                                                                                

(0.13750636780438097, 0.047864021711044216, 0.13973971457784126)