# Recommendation systems scaled using Spark 20m Movie lens
20m Movie lens dataset for this one. </br>
Use Alternative Least Squares matrix factorization for making recommendations

In [None]:
import csv
import multiprocessing

from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [None]:
# get number of available cores
#cores = str(multiprocessing.cpu_count() - 1)
# define spark session
spark = SparkSession.builder.appName("ALSExample").config("spark.executor.cores", '8').getOrCreate()

In [None]:
# read file and retrieve
lines = spark.read.option("header", "true").csv("../ml-20m/ratings.csv").rdd
# define data structure and convert rdd into dataset
ratingsRDD = lines.map(
    lambda p: Row(
        userID=int(p[0]),
        movieID=int(p[1]),
        rating=float(p[2]),
        timestamp=int(p[3])
    )
)
ratings = spark.createDataFrame(ratingsRDD)

In [None]:
# define a training and test set
(training, test) = ratings.randomSplit([0.8, 0.2])

In [None]:
# define ALS model instance
als = ALS(
    maxIter=5,
    regParam=0.01,
    userCol="userID",
    itemCol="movieID",
    ratingCol="rating",
    coldStartStrategy="drop"
)
# fit model
model = als.fit(training)

In [None]:
# get preds and test
preds = model.transform(test)
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)
rmse = evaluator.evaluate(preds)
print("Root-mean-square-error = "+str(rmse))

In [None]:
# get some recommendations for user
userRecs = model.recommendForAllUsers(10)
user85Recs = userRecs.filter(userRecs['userID']==85).collect()

In [None]:
# read movies data and get movies names
moviesPath = "../data/ml-latest-small/movies.csv"
# now parse movies dataset
movieID_to_name = {}
name_to_movieID = {}
with open(moviesPath, newline='', encoding='ISO-8859-1') as csvfile:
    movieReader = csv.reader(csvfile)
    next(movieReader)
    for row in movieReader:
        movieID = int(row[0])
        movieName = row[1]
        movieID_to_name[movieID] = movieName
        name_to_movieID[movieName] = movieID

In [None]:
# funtion to ger movie name based on movie ID
def getMovieName(movieID):
    if movieID in movieID_to_name:
        return movieID_to_name[movieID]
    else:
        return ""

In [None]:
for row in user85Recs:
    for rec in row.recommendations:
        print(getMovieName(rec.movieID))