In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark
!wget "https://files.grouplens.org/datasets/movielens/ml-20m.zip"
!unzip ./ml-20m.zip -d .
!rm -rf ml-20m.zip

In [None]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('MovieRatings').getOrCreate()
spark.conf.set('spark.sql.pivotMaxValues',u'500000')

In [None]:
ratings=spark.read.option("inferSchema",True).option("header",True).csv('./ml-20m/ratings.csv').drop('timestamp')
#movies=spark.read.option("inferSchema",True).option("header",True).csv('./ml-20m/movies.csv')
#tags=spark.read.option("inferSchema",True).option("header",True).csv('./ml-20m/tags.csv')
#links=spark.read.option("inferSchema",True).option("header",True).csv('./ml-20m/links.csv')
#gtags=spark.read.option("inferSchema",True).option("header",True).csv('./ml-20m/genome-tags.csv')
#gscores=spark.read.option("inferSchema",True).option("header",True).csv('./ml-20m/genome-scores.csv')

In [None]:
# Movies content based filtering using genre (Use cosine similarity further)
# from pyspark.sql.functions import col,split,explode,concat_ws
# movies=movies.withColumn('genres',split(col('genres'),'\|'))
# movies_gen=movies.withColumn('genres',explode(col('genres'))).groupby('movieId','title').pivot('genres').count().na.fill(0).withColumnRenamed('(no genres listed)','unlisted')
# genres=movies_gen.columns[2:]
# movies_vecgen=movies_gen.withColumn("genres",concat_ws(",",*[col(x) for x in genres])).drop(*genres)

In [None]:
#remove after testing
ratings=ratings.limit(10000)

In [None]:
train,test=ratings.randomSplit([0.8,0.2])

ALS

In [None]:
from pyspark.ml.recommendation import ALS
model=ALS(coldStartStrategy="NaN",userCol="userId",itemCol="movieId",ratingCol="rating")

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator=RegressionEvaluator(metricName="rmse",predictionCol="prediction",labelCol="rating")

In [None]:
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder
paramGrid=ParamGridBuilder().addGrid(model.rank,[2,4,6,8]).addGrid(model.regParam,[0.1,0.05,0.01]).build()
cv=CrossValidator(estimator=model,estimatorParamMaps=paramGrid,evaluator=evaluator,numFolds=10)

In [None]:
cvmodel=cv.fit(train)
best=cvmodel.bestModel
predictions=best.transform(test)
rmse=evaluator.evaluate(predictions.dropna())
mse=RegressionEvaluator(metricName="mse",predictionCol="prediction",labelCol="rating").evaluate(predictions.dropna())

In [None]:
from pyspark.sql.functions import abs,mean,col
mape=predictions.dropna().withColumn('mean',abs((col('rating')-col('prediction'))/col('rating'))).select(mean('mean')).collect()[0][0]*100

In [None]:
print("ALS\nRMSE =",rmse,"\nMSE =",mse,"\nMAPE =",round(mape,2),"%")

ALS
RMSE = 1.0760670935960883 
MSE = 1.1579203899203325 
MAPE = 30.1 %


Collaborative filtering

In [None]:
from pyspark.sql.functions import first,col #,array
#ratings.groupby('userId').pivot('movieId').agg(first('rating')).show()
matrix=ratings.groupby('movieId').pivot('userId').agg(first('rating')/5).orderBy(['movieId'],ascending=[True]).fillna(0)
userlist=matrix.columns[1:]
#movielist=matrix.withColumn("user_ratings",array(*[col(x) for x in userlist])).drop(*userlist)

In [None]:
from pyspark.ml.feature import VectorAssembler
vecAssembler=VectorAssembler(inputCols=userlist,outputCol="features")
movielist=vecAssembler.transform(matrix).drop(*userlist)

In [None]:
from pyspark.ml.feature import Normalizer
normalizer=Normalizer(inputCol="features",outputCol="ratings")
movielist=normalizer.transform(movielist)

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
dot_udf=udf(lambda x,y: float(x.dot(y)),DoubleType())
pairs=movielist.alias("i").join(movielist.alias("j"),col("i.movieID")!=col("j.movieID")).select(col("i.movieID").alias("i"),col("j.movieID").alias("j"),dot_udf("i.ratings","j.ratings").alias("score")).sort("i", "j")
#similarity=pairs.groupby('i').pivot('j').agg(first('dot')).orderBy(['i'],ascending=[True]).fillna(0)
#movieindex=similarity.columns[1:]

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number,collect_list
window=Window.partitionBy(pairs['i']).orderBy(pairs['score'].desc())
final_pairs=pairs.select('*',row_number().over(window).alias('top10')).filter(col('top10')<=10).drop('score','top10').groupby("i").agg(collect_list("j").alias('matches')).sort('i')

In [None]:
from pyspark.sql.functions import explode
movielookup=test.join(final_pairs,test.movieId==final_pairs.i,"left").drop('rating','i').withColumn('related',explode('matches'))
movie_calc=movielookup.join(train.withColumnRenamed("movieID","related"),["userId","related"],"left").drop('matches','related').sort('userId','movieId')
cf_predictions=movie_calc.groupby("userId","movieId").agg(mean("rating").alias("cf_prediction"))

In [None]:
predictions=predictions.join(cf_predictions,["userId","movieId"],"left")

In [None]:
rmse=RegressionEvaluator(metricName="rmse",predictionCol="cf_prediction",labelCol="rating").evaluate(predictions.dropna())
mse=RegressionEvaluator(metricName="mse",predictionCol="cf_prediction",labelCol="rating").evaluate(predictions.dropna())
mape=predictions.withColumn('mean',abs((col('rating')-col('cf_prediction'))/col('rating'))).select(mean('mean')).collect()[0][0]*100
print("Collaborative Filtering\nRMSE =",rmse,"\nMSE =",mse,"\nMAPE =",round(mape,2),"%")

Collaborative Filtering
RMSE = 0.8986929968032633 
MSE = 0.8076491025032302 
MAPE = 27.26 %


Hybrid Approach

In [21]:
predictions=predictions.withColumn("hybrid",(0.4*col("prediction"))+(0.6*col("cf_prediction")))
rmse=RegressionEvaluator(metricName="rmse",predictionCol="hybrid",labelCol="rating").evaluate(predictions.dropna())
mse=RegressionEvaluator(metricName="mse",predictionCol="hybrid",labelCol="rating").evaluate(predictions.dropna())
mape=predictions.dropna().withColumn('mean',abs((col('rating')-col('hybrid'))/col('rating'))).select(mean('mean')).collect()[0][0]*100
print("Hybrid Result\nRMSE =",rmse,"\nMSE =",mse,"\nMAPE =",round(mape,2),"%")

Hybrid Result
RMSE = 0.8613713223607666 
MSE = 0.7419605549855357 
MAPE = 23.04 %
