In [None]:
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf,col,collect_list
from pyspark.sql.types import StringType, ArrayType, DoubleType,IntegerType
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RankingEvaluator
from HR import HitRate

from pyspark.sql.types import *
from pyspark.sql import functions as F
import pandas as pd

#For windows user only
import os 
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# Create spark session

In [None]:
spark = SparkSession.builder.master("local[*]") \
                    .config('spark.ui.showConsoleProgress', 'false')\
                    .appName('MovieRecomender') \
                    .getOrCreate()

In [None]:
schema =             StructType([
                    StructField('UserID', LongType(), True),
                     StructField('MovieID', LongType(), True),
                     StructField('Rating', IntegerType(), True),
                     StructField('Timestamp', LongType(), True),
                     ])

In [None]:
df = spark.read.option("sep", "::").schema(schema).csv("data/ml-1m/ratings.dat")
df = df.toDF(*["UserID", "MovieID", "Rating", "Timestamp"])
df.createOrReplaceTempView("dataset");
df = df.dropna()
df.cache().count() #Force cache

# Model config

In [None]:
als = ALS(
    rank=10,
    maxIter=15,
    userCol="UserID",
    itemCol="MovieID",
    ratingCol="Rating",
    implicitPrefs=False,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=False,
)

# Hit Rate

In [None]:
hr_evaluator = HitRate(predictionCol='prediction', labelCol='Rating', userCol='UserID', itemCol = "MovieID")
value = hr_evaluator.eval(als, df)
print("Hit rate is {}".format(value))

# RMSE and NDCG

In [None]:
(train, test) = df.randomSplit([0.8, 0.2])

In [None]:
rmse = RegressionEvaluator(metricName="rmse", labelCol="Rating", predictionCol="prediction") 
ndcg = RankingEvaluator(labelCol="RealRank", predictionCol="recommendations",metricName="ndcgAtK", k=10)

In [None]:
spark.sparkContext.setCheckpointDir('checkpoint/')
model=als.fit(train)

In [None]:
n_items = df.select("MovieID").distinct().count()
print('Total # items is {}'.format(n_items))

In [None]:
def getRank(a):
    ret=[]
    for i in a:
        ret.append(float(i.MovieID))
    return ret
convertUDF = udf(lambda z: getRank(z),ArrayType(DoubleType()))

def toDouble(a):
    return [float(i) for i in a]
toDoubleUDF = udf(lambda z: toDouble(z),ArrayType(DoubleType()))

tempt=df.sort(col('Rating').desc()).groupBy("UserID").agg(collect_list('MovieID').alias("RealRank"))
tempt=tempt.withColumn("RealRank",toDoubleUDF(col("RealRank")))

rec=model.recommendForAllUsers(n_items).join(tempt,"UserID","inner")
rec=rec.withColumn("recommendations",convertUDF(col("recommendations")))
rec.persist().count()
rec.show()

In [None]:
predictions=model.transform(test).na.drop()
print(rmse.evaluate(predictions),ndcg.evaluate(rec))