---

**<center>Installations and Imports**

---



In [1]:
!pip install pyspark py4j
!pip install lenskit
!pip install lightfm

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 KB[0m [31m22.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m24.7 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=a1fa1cbb16c8ef0e9bb3d0ffb78d64cb6bef8f04f0921e9fa9411735b1d9aa4a
 

In [26]:
import os
from google.colab import drive

from pyspark.sql import SparkSession
from pyspark.sql.functions import array, avg, coalesce, col, collect_list, count, flatten, lit, monotonically_increasing_id
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.ml.evaluation import RankingEvaluator
from pyspark.ml.recommendation import ALS

import pandas as pd 

#from lenskit import batch, topn, util, topn
#from lenskit import crossfold as xf
#from lenskit.algorithms import Recommender, als as als_slow, item_knn as knn

from timeit import default_timer as timer
from scipy import sparse
from numpy import array as np_arr
import numpy as np
from lightfm import LightFM
from lightfm.evaluation import precision_at_k

import time

--- 

**<center>Manually Set User-Specific File Paths <center>** 

--- 







In [3]:
#download and save both dataset folders('ml_latest_small' and 'ml_latest') to a Google Drive.  
#set MOUNT POINT/ROOT to point to the folder where both files are saved.
MOUNT_POINT = '/content/drive'
ROOT = f'{MOUNT_POINT}/MyDrive/Big Data Final Project/'

#to work with the small file, set SMALL to True. To work with the large file, set small to False.
SMALL = True

In [4]:
#sets ML_PATH to the the path of either the small or large file
ML_DIR = 'ml_latest' if not SMALL else 'ml_latest_small'
ML_PATH = f'{ROOT}/{ML_DIR}'

--- 

**<center> Finalize Set Up <center>** 

--- 

In [5]:
#mount drive
drive.mount(MOUNT_POINT)

#start spark session
spark = SparkSession.builder.appName('Recommender').getOrCreate()

#take first spark checkpoint
spark.sparkContext.setCheckpointDir(ROOT)

Mounted at /content/drive


In [6]:
#download ratings file
RATINGS_SCHEMA = 'user INT, product INT, rating FLOAT, timestamp INT'
ratings = spark.read.csv(f'{ML_PATH}/ratings.csv', schema=RATINGS_SCHEMA).dropna()

--- 

**<center> Split The Ratings File <center>** 

--- 

In [7]:
#function that takes in a ratings dataframe, groups ratings by user ID, then takes a random sample corresponding to a set fraction of each users ratings
#the function then takes a checkpoint to prevent future unnecessary recalculations
def sample(df, frac):
  return df.groupBy('user').applyInPandas(lambda group: group.sample(frac=frac), RATINGS_SCHEMA).checkpoint()

#function to trim the ratings that were randomly sampled from the original ratings dataframe
def exclude(df1, df2):
  return df1.subtract(df2)

#function that checks to see if a 70-15-15 train-val-test split of the ratings file already exists in the cached folder. If not, it executes a split.
#set overwrite to True if you want a fresh split no matter what
cache_path = f'{ROOT}/cached'
def split(ratings, overwrite=False):
  size = '-small' if SMALL else '-large'
  trainingPath = f'{cache_path}/training{size}.parquet'
  validationPath = f'{cache_path}/validation{size}.parquet'
  testPath = f'{cache_path}/test{size}.parquet'

  if (overwrite or not os.path.exists(trainingPath) or not os.path.exists(validationPath) or not os.path.exists(testPath)):
    nontraining = sample(ratings, 0.3)

    training = exclude(ratings, nontraining)
    test = sample(nontraining, 0.5)
    validation = exclude(nontraining, test)

    training.write.parquet(trainingPath, 'overwrite')
    validation.write.parquet(validationPath, 'overwrite')
    test.write.parquet(testPath, 'overwrite')
  else:
    training = spark.read.parquet(trainingPath)
    validation = spark.read.parquet(validationPath)
    test = spark.read.parquet(testPath)

  return training, validation, test


In [8]:
#execute a split on the ratings file
training, validation, test = split(ratings)

In [9]:
train_size = training.count()
val_size = validation.count()
test_size = validation.count()

In [10]:
print("Number of rows in the training dataset:" , train_size)
print("Number of rows in the validation dataset:", val_size)
print("Number of rows in the testing dataset:", test_size)

Number of rows in the training dataset: 70595
Number of rows in the validation dataset: 15116
Number of rows in the testing dataset: 15116


In [11]:
orig_total_rows = ratings.count()
print(round(train_size/orig_total_rows*100,2), "% of the original dataset is in training")
print(round(val_size/orig_total_rows*100,2), "% of the original dataset is in validation")
print(round(test_size/orig_total_rows*100,2), "% of the original dataset is in testing")

70.01 % of the original dataset is in training
14.99 % of the original dataset is in validation
14.99 % of the original dataset is in testing


--- 

**<center> Create a Model Class<center>** 

--- 




In [12]:
class Model():
  model = None #stores the trained model
  params = {} #stores a model's hyperparamaters
  n_recs = 100 #set the default number of recommendations for each model to 100
  
  def __init__(self, train, val, test):
    self.train = train
    self.val = val
    self.test = test
  
  #build specifies the functionality behind how a model should make recommendations 
  def build(self):
    pass

  #recommend applies the model's functionality to a set of users that we want to make recommendations for 
  def recommend(self, dataset):
    pass

  #scores the model using ranking metrics - precisionAtK, meanAveragePrecisionAtK, ndcgAtK, meanAveragePrecision
  def score(self, test=False):
    dataset = self.test if test else self.val
    rec_rel = self.recommend(dataset).select(col('recommended').cast('array<double>'), col('relevant').cast('array<double>'))
    
    evaluator = RankingEvaluator(predictionCol='recommended', labelCol='relevant')
    
    metrics = (
        evaluator.evaluate(rec_rel, { evaluator.metricName: 'precisionAtK', evaluator.k: self.n_recs}),
        evaluator.evaluate(rec_rel, { evaluator.metricName: 'meanAveragePrecisionAtK', evaluator.k: self.n_recs}),
        evaluator.evaluate(rec_rel, { evaluator.metricName: 'ndcgAtK', evaluator.k: self.n_recs}),
        evaluator.evaluate(rec_rel, { evaluator.metricName: 'meanAveragePrecision'}),
    )
    return metrics

--- 

**<center> Create a Tuning Function** 

--- 

In [13]:
def tuner(model, param_name, values_to_try):

  #initialize lists that will store the score values as we try each new parameter value
  p_at_ks = []
  map_at_ks = [] 
  ndcg_at_ks = []
  maps = []
  
  for value in values_to_try:
    model.params = { **model.params, param_name: value }
    model.build()
    scores = model.score()

    p_at_ks.append(scores[0])
    map_at_ks.append(scores[1])
    ndcg_at_ks.append(scores[2])
    maps.append(scores[3])

  norm_p_at_ks = list(map(lambda i: i/max(p_at_ks), p_at_ks))
  norm_map_at_ks = list(map(lambda i: i/max(map_at_ks), map_at_ks))
  norm_ndcg_at_ks = list(map(lambda i: i/max(ndcg_at_ks), ndcg_at_ks))
  norm_maps = list(map(lambda i: i/max(maps), maps))

  norm_scores = list(map(lambda i: round((norm_p_at_ks[i] + norm_map_at_ks[i] + norm_ndcg_at_ks[i] + norm_maps[i]) / 4,4), range(len(values_to_try))))

  i = norm_scores.index(max(norm_scores))

  best_value = values_to_try[i]
  scores_with_best_value = p_at_ks[i], map_at_ks[i], ndcg_at_ks[i],maps[i]

  return best_value, scores_with_best_value, norm_scores

--- 

**<center> Create a Popularity Model Class** 

--- 

In [14]:
class PopularityModel(Model):
  params = { 'min_rating': 0 }

  def build(self):
    "Creates a list containing the 100 most popular movies. Popular movies here are those with the most views that maintained an average minimum ratings above 'min_rating'."
    popular_movies = self.train.groupBy('product').agg({'rating': 'mean', 'product': 'count'}).filter(col('avg(rating)') >= self.params['min_rating']).sort('count(product)', ascending=False).limit(self.n_recs).select('product').rdd.flatMap(lambda x: x)  
    self.model = popular_movies

  def recommend(self, dataset):
    "Returns an RRD with two lists for each user. One list contains the movie ID's that were recommended to that user. The other contains the movie ID's that the user viewed and rated above a 3.0."
    recommended = self.model.collect()
    relevant = dataset.filter(col('rating') > 3.0).groupBy('user').agg(collect_list('product').alias('relevant'))
    rec_rel = relevant.withColumn('recommended', array(list(map(lit, recommended)))).select('recommended', 'relevant')
    return rec_rel

--- 

**<center> Find the Optimal Cutoff for our Popularity Model** 

--- 

In [15]:
#create a popularity model
pop_model = PopularityModel(training, validation, test)

#use tuner to try different values for min_rating
cutoffs_to_try = list(map(lambda r: r/100, range(300, 400, 1)))
best_cutoff, scores_with_cutoff, pop_norm_scores = tuner(pop_model,'min_rating', cutoffs_to_try)
pop_scores_df = pd.DataFrame([pop_norm_scores], columns=cutoffs_to_try, index=["Adjusted Scores"])
pop_scores_df.index.name = 'Min Avg Rating'

print('Performance Scores for Tuning Minimum Average Movie Rating :')
print(pop_scores_df)
print()
print('Best Performing Average Movie Rating Cutoff:', best_cutoff,'\n')


Performance Scores for Tuning Minimum Average Movie Rating :
                   3.00    3.01    3.02    3.03    3.04    3.05    3.06  \
Min Avg Rating                                                            
Adjusted Scores  0.9869  0.9869  0.9869  0.9868  0.9868  0.9898  0.9882   

                   3.07    3.08    3.09  ...    3.90    3.91    3.92    3.93  \
Min Avg Rating                           ...                                   
Adjusted Scores  0.9879  0.9892  0.9872  ...  0.9495  0.9472  0.9172  0.9099   

                   3.94    3.95    3.96    3.97    3.98    3.99  
Min Avg Rating                                                   
Adjusted Scores  0.9097  0.9072  0.8822  0.8796  0.8655  0.8522  

[1 rows x 100 columns]

Best Performing Average Movie Rating Cutoff: 3.46 



--- 

**<center> Final Results of the Poplarity Model on The Validation Set** 

--- 

In [16]:
def pretty_print_metrics(metrics):
  print('Precision at K', round(metrics[0], 4))
  print('MAP at K', round(metrics[1], 4))
  print('NDCG at K', round(metrics[2], 4))
  print('MAP', round(metrics[3], 4))

In [17]:
print('Results on Validation Set')
pretty_print_metrics(scores_with_cutoff)

Results on Validation Set
Precision at K 0.0304
MAP at K 0.0398
NDCG at K 0.1464
MAP 0.0398


--- 

**<center> Final Results of the Poplarity Model on The Test Set** 

--- 

In [18]:
pop_model.params = {'min_rating': best_cutoff}
pop_model.build()
test_scores = pop_model.score(True)

print('Results on Test Set')
pretty_print_metrics(test_scores)

Results on Test Set
Precision at K 0.0308
MAP at K 0.036
NDCG at K 0.1423
MAP 0.0359


--- 

**<center> Create a Latent Factor Model Class** 

--- 

In [20]:
class LFModel(Model):
  params = {
      'Num Iterations': 3,
      'Rank': 3,
      'Reg Param': 0.1,
      'Min Count': 50
  }

  def build(self):
    counts = self.train.groupBy('product').agg({'product': 'count'})
    train = self.train.join(counts, on='product', how='left').where(col('count(product)') >= self.params['Min Count']).drop('timestamp').drop('count(product)').select('user', 'product', 'rating').withColumnRenamed('product', 'item')

    als = ALS(rank=self.params['Rank'], maxIter=self.params['Num Iterations'], regParam=self.params['Reg Param'])
    start = timer()
    model = als.fit(train)
    end = timer()
    self.last_build_time = end - start
    self.model = model
  
  def recommend(self, dataset):
    recommended = self.model.recommendForAllUsers(Model.n_recs)
    relevant = dataset.where(col('rating') > 3.0).groupBy('user').agg(collect_list('product').alias('relevant'))
    rec_rel = recommended.join(relevant, on='user', how='left').select(col('recommendations.item').alias('recommended'), 'relevant').withColumn('relevant', coalesce(col('relevant'), array()))

    return rec_rel

--- 

**<center> Find the Optimal Parameters for our Latent Factor Model** 

--- 

To find the optimal parameters, default values are randomly assigned and the tuning function is used to tune each parameter one at a time. Once the best performing value for a parameter has been found, it will be set as the new default value, and the next parameter will be tuned. The process of tuning each of the four parameters in sequence, will be referred to in totality as a complete tuning iteration. 

This process is implemented to avoid having to run an unreasonable number of parameter variations on an already large model. Because not every possible parameter variation can be run, several techniques are implemented to reinforce that the parameters located are likely optimal. 

First, the model will continually tune parameter by parameter until two complete iterations are in total agreement about what the optimal parameters are. 

Second, to account for the fact that the previous and current iterations might be more inclined to agree (as the results of the first iterations were used as the starting parameter settings for the second iteration), the entire tuning process will be run again with different default parameter values and with a different parameter tuning order. If both rounds are in agreement about the optimal parameter values, the tuning process is complete and final metrics on the validation and test sets will be collected. 


Round One

In [21]:
#create a latent factor model 
latent = LFModel(training, validation, test)

#use tuner to try different values for each parameter
tries = {
    'Rank': [5, 10, 50, 100, 200],
    'Num Iterations': [1, 5, 10, 15, 20],
    'Min Count': [60, 70, 80, 90, 100],
    'Reg Param': [0.15, 0.20, 0.25, 0.30, 0.35],
}

latent.params['Num Iterations'] = 5
latent.params['Rank'] = 1
latent.params['Min Count'] = 60
latent.params['Reg Param'] = 0.15

param_names = [name for name in tries]

previous_results = []

for param_name in tries:
  previous_results.append(LFModel.params[param_name])

while True:
  results = []

  for param_name in tries:
    best, scores, norm_scores = tuner(latent, param_name, tries[param_name])

    scores_df = pd.DataFrame([norm_scores], columns=tries[param_name], index=["Adjusted Scores"])
    scores_df.index.name = param_name

    print('\n---------------------------------------------------------\n') 
    print('Tuning Parameter:', param_name)
    for name in param_names:
      if name == param_name:
        continue
      print('Current %s: %.2f' % (name, latent.params[name]))
    print('\nPerformance Scores for Tuning',param_name,':')
    print(scores_df)
    print(f'\nBest Performing {param_name}: {best}\n')
    print('Results on Validation Set:')
    print('Precision at K', round(scores[0],4))
    print('MAP at K', round(scores[1],4))
    print('NDCG at K', round(scores[2],4))
    print('MAP', round(scores[3],4))
    latent.params[param_name] = best
    results.append(best)
  
  print('\n---------------------------------------------------------\n')
  print('Iteration Complete')
  print('\nPrevious Iteration Optimal Parameters')
  print('Rank:', previous_results[0],' Num Iterations:', previous_results[1],' Min Count:', previous_results[2],' Reg Param:', previous_results[3])
  print('Current Iteration Optimal Parameters')
  print('Rank:', results[0],' Num Iterations:', results[1],' Min Count:', results[2],' Reg Param:', results[3])

  done = True
  for i in range(len(results)):
    if results[i] != previous_results[i]:
      done = False

  if done:
    break

  previous_results = results



---------------------------------------------------------

Tuning Parameter: Rank
Current Num Iterations: 5.00
Current Min Count: 60.00
Current Reg Param: 0.15

Performance Scores for Tuning Rank :
                    5    10      50      100     200
Rank                                                
Adjusted Scores  0.9721  1.0  0.9575  0.9351  0.9259

Best Performing Rank: 10

Results on Validation Set:
Precision at K 0.0325
MAP at K 0.041
NDCG at K 0.1539
MAP 0.041

---------------------------------------------------------

Tuning Parameter: Num Iterations
Current Rank: 10.00
Current Min Count: 60.00
Current Reg Param: 0.15

Performance Scores for Tuning Num Iterations :
                     1    5       10      15      20
Num Iterations                                      
Adjusted Scores  0.6177  1.0  0.9196  0.9181  0.9207

Best Performing Num Iterations: 5

Results on Validation Set:
Precision at K 0.0325
MAP at K 0.041
NDCG at K 0.1539
MAP 0.041

---------------------------

Round Two

In [22]:
#create a latent factor model 
latent = LFModel(training, validation, test)

#use tuner to try different values for each parameter
tries = {
    'Reg Param': [0.15, 0.20, 0.25, 0.30, 0.35],
    'Min Count': [60, 70, 80, 90, 100],
    'Num Iterations': [1, 5, 10, 15, 20],
    'Rank': [5, 10, 50, 100, 200],
}

latent.params['Num Iterations'] = 20
latent.params['Rank'] = 200
latent.params['Min Count'] = 100
latent.params['Reg Param'] = 0.35

param_names = [name for name in tries]

previous_results = []

for param_name in tries:
  previous_results.append(latent.params[param_name])

while True:
  results = []

  for param_name in tries:
    best, scores, norm_scores = tuner(latent, param_name, tries[param_name])

    scores_df = pd.DataFrame([norm_scores], columns=tries[param_name], index=["Adjusted Scores"])
    scores_df.index.name = param_name

    print('\n---------------------------------------------------------\n') 
    print('Tuning Parameter:', param_name)
    for name in param_names:
      if name == param_name:
        continue
      print('Current %s: %.2f' % (name, LFModel.params[name]))
    print('\nPerformance Scores for Tuning',param_name,':')
    print(scores_df)
    print(f'\nBest Performing {param_name}: {best}\n')
    print('Results on Validation Set:')
    pretty_print_metrics(scores)
    latent.params[param_name] = best
    results.append(best)
  
  print('\n---------------------------------------------------------\n')
  print('Iteration Complete')
  print('\nPrevious Iteration Optimal Parameters')
  print('Rank:', previous_results[3],' Num Iterations:', previous_results[2],' Min Count:', previous_results[1],' Reg Param:', previous_results[0])
  print('Current Iteration Optimal Parameters')
  print('Rank:', results[3],' Num Iterations:', results[2],' Min Count:', results[1],' Reg Param:', results[0])

  done = True
  for i in range(len(results)):
    if results[i] != previous_results[i]:
      done = False

  if done:
    break

  previous_results = results


---------------------------------------------------------

Tuning Parameter: Reg Param
Current Min Count: 100.00
Current Num Iterations: 20.00
Current Rank: 200.00

Performance Scores for Tuning Reg Param :
                   0.15    0.20    0.25  0.30    0.35
Reg Param                                            
Adjusted Scores  0.8985  0.9467  0.9834   1.0  0.9959

Best Performing Reg Param: 0.3

Results on Validation Set:
Precision at K 0.0194
MAP at K 0.0378
NDCG at K 0.1193
MAP 0.0378

---------------------------------------------------------

Tuning Parameter: Min Count
Current Reg Param: 0.35
Current Num Iterations: 20.00
Current Rank: 200.00

Performance Scores for Tuning Min Count :
                    60      70      80      90      100
Min Count                                              
Adjusted Scores  0.9208  0.9698  0.9966  0.9392  0.8613

Best Performing Min Count: 80

Results on Validation Set:
Precision at K 0.03
MAP at K 0.0382
NDCG at K 0.1432
MAP 0.0382

------

--- 

**<center> Final Results of the Latent Factor Model on The Validation Set** 

--- 

In [23]:
print('Results on Validation Set')
pretty_print_metrics(latent.score())

Results on Validation Set
Precision at K 0.0315
MAP at K 0.0396
NDCG at K 0.1469
MAP 0.0396


--- 

**<center> Final Results of the Latent Factor Model on The Test Set** 

--- 

In [24]:
print('Results on Test Set')
pretty_print_metrics(latent.score(True))

Results on Test Set
Precision at K 0.0315
MAP at K 0.0339
NDCG at K 0.1394
MAP 0.0338


--- 

**<center> Extension: LightFM Implementation <center>** 

--- 

In [62]:
lightfm_model = LightFM()

def build_sparse_matrix(input_dataset):
  dataset_with_id = input_dataset.withColumn("id", monotonically_increasing_id())
  increment = 10**6
  start = 0

  MAT_SIZE = (283229, 193887) if not SMALL else (611, 193610)
  result = sparse.coo_matrix(MAT_SIZE, dtype=np.float16)

  while True:
    chunk = dataset_with_id.where(col('id').between(start, start + increment - 1))
    if chunk.count() == 0:
      break

    I = np_arr(chunk.select('user').toPandas()['user'])
    J = np_arr(chunk.select('product').toPandas()['product'])
    V = np_arr(chunk.select('rating').toPandas()['rating'])

    sparse_chunk = sparse.coo_matrix((V, (I, J)), shape=MAT_SIZE)
    result += sparse_chunk

    start += increment

  return result

sparse_train = build_sparse_matrix(training)
sparse_val = build_sparse_matrix(validation)
sparse_test = build_sparse_matrix(test)

--- 

**<center> LightFM: Hyper-parameter Tuning <center>** 

--- 

In [69]:
def tune_lightfm(default_params, key, tries):
  prec = []
  times = []

  for val in tries:
    default_params[key] = val
    lightfm_model = LightFM(**params)

    start = timer()
    lightfm_model.fit(sparse_train)
    end = timer()

    test_precision = precision_at_k(lightfm_model, sparse_val, k=100).mean()

    prec.append(test_precision)
    times.append(end - start)

    print('LightFM Latent Factor Model')
    print('Parameters', params)
    print('Precision on Test Set: ', test_precision)
    print('Time to Fit: ', end - start)
    print()

  best = tries[prec.index(max(prec))]
  return best

params = {}

best_lr = tune_lightfm(params, 'learning_rate', [1, 0.1, 0.01, 0.001, 0.0001])
params['learning_rate'] = best_lr
print('Best Learning Rate:', best_lr)

best_loss = tune_lightfm(params, 'loss', ['logistic','bpr','warp','warp-kos'])
params['loss'] = best_loss
print('Best Loss:', best_loss)

best_no_comps = tune_lightfm(params, 'no_components', [5,10,50,100,200])
params['no_components'] = best_no_comps
print('Best Number of Components:', best_no_comps)

best_user_alpha = tune_lightfm(params, 'user_alpha', [.01,.05,0.1,0.2,0.3])
params['user_alpha'] = best_user_alpha
print('Best User Alpha:', best_user_alpha)

LightFM Latent Factor Model
Parameters {'learning_rate': 1}
Precision on Test Set:  0.038508195
Time to Fit:  0.055701674998999806

LightFM Latent Factor Model
Parameters {'learning_rate': 0.1}
Precision on Test Set:  0.03880328
Time to Fit:  0.058250105001206975

LightFM Latent Factor Model
Parameters {'learning_rate': 0.01}
Precision on Test Set:  0.03868853
Time to Fit:  0.05534893100048066

LightFM Latent Factor Model
Parameters {'learning_rate': 0.001}
Precision on Test Set:  0.03632787
Time to Fit:  0.05592680199697497

LightFM Latent Factor Model
Parameters {'learning_rate': 0.0001}
Precision on Test Set:  0.00095081964
Time to Fit:  0.05719663700074307

Best Learning Rate: 0.1
LightFM Latent Factor Model
Parameters {'learning_rate': 0.1, 'loss': 'logistic'}
Precision on Test Set:  0.038967215
Time to Fit:  0.055015172001731116

LightFM Latent Factor Model
Parameters {'learning_rate': 0.1, 'loss': 'bpr'}
Precision on Test Set:  0.029852457
Time to Fit:  0.07508795099784038

Ligh

--- 

**<center> PySpark ALS: Fitting Time and Precision on Test Set  <center>** 

--- 

In [65]:
latent = LFModel(training, validation, test)
# latent.params = {'Num Iterations':5,
#                  'Rank':100,
#                  'Reg Param' :0.1,
#                  'Min Count':100}
latent.params = {'Num Iterations':5,
                 'Rank':100,
                 'Reg Param' :0.2,
                 'Min Count':70}
latent.build()
test_metrics = latent.score(True)

print('PySpark Latent Factor Model')
print('Precision on Test Set: ',round(test_metrics[0],4))
print('Time to Fit: ', latent.last_build_time)

PySpark Latent Factor Model
Precision on Test Set:  0.0322
Time to Fit:  3.810257482999077
