In [33]:
import pandas as pd
import numpy as np
import datetime
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [8]:
spark = SparkSession.builder.getOrCreate()

In [9]:
ratings = pd.read_csv('./data/ratings.csv')

In [10]:
ratings['timestamp'] = ratings['timestamp'].apply(datetime.datetime.fromtimestamp)
sample = ratings[ratings['timestamp'].apply(lambda x: x.date()) >= datetime.date(2013,3,31)]

In [30]:
df = spark.createDataFrame(sample)
(train_samp, test_samp) = df.randomSplit([0.8, 0.2])
model = ALS(userCol='userId', 
            itemCol='movieId', 
            ratingCol='rating').fit(train_samp)

In [31]:
predictions = model.transform(test_samp)
predictions.toPandas().head()

Unnamed: 0,userId,movieId,rating,timestamp,prediction
0,29100,463,1.0,2015-03-14 11:36:32,2.271202
1,66277,471,3.0,2013-09-01 12:00:35,4.527064
2,62003,471,4.0,2015-02-21 10:15:30,3.237234
3,5936,471,4.5,2013-04-05 13:48:07,4.241895
4,12055,471,5.0,2013-11-12 02:27:46,3.324728


In [32]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
evaluator.evaluate(predictions.na.drop())

0.7998222216539371

In [27]:
def multiALSrun(df, 
                k=3, 
                userCol='userId',
                itemCol='movieId',
                ratingCol='rating',
                metricName='rmse'):
    
    evaluations = np.array([])
    
    for i in range(0, k):
        (train, test) = df.randomSplit([(k-1.0)/k, 1.0/k])
        als = ALS(userCol=userCol, 
                  itemCol=itemCol, 
                  ratingCol=ratingCol)
        model = als.fit(train)
        pred = model.transform(test)
        evaluator = RegressionEvaluator(metricName=metricName, 
                                        labelCol='rating', 
                                        predictionCol='prediction')
        evaluation = evaluator.evaluate(pred.na.drop())
        print(f'k={i} rmse={evaluation}')
        evaluations = np.append(evaluations, evaluation)
    
    return np.mean(evaluations)

In [29]:
multiALSrun(df, k=5)

k=0 rmse=0.8017872301712802
k=1 rmse=0.8021445030888144
k=2 rmse=0.8023496259180257
k=3 rmse=0.8005408759112953
k=4 rmse=0.7999771837296881


0.8013598837638207

In [None]:
(train_set, validation_set) = train_samp.randomSplit([0.8, 0.2])
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
als = ALS(userCol='userId', itemCol='movieId', ratingCol='rating')
params = ParamGridBuilder().addGrid(als.rank, [1,3,5,10])\
                          .addGrid(als.maxIter, [10,20])\
                          .addGrid(als.regParam, [0.01, 0.1, 0.5])\
                          .build()

crossvalidator = CrossValidator(estimator=als, 
                                estimatorParamMaps=params, 
                                evaluator=evaluator,
                                numFolds=10)

modelCV = crossvalidator.fit(train_set)
pred_set = modelCV.transform(validation_set)