### Movie Recommendation: Using Spark Mlib & Python DataFrame on Databricks Community cloud setup

The training dataset has been taken from http://files.grouplens.org/datasets/movielens/ml-1m.zip. This dataset has 1000209 ratings (userId::movieId::rating::timestamp), 3883 movies (movieId::title::genre1|genre2|genre3) & 6040 users (UserID::Gender::Age::Occupation::Zip-code).

This solution is based on DataBricks training material given in Scala. This solution uses Matrix Factorization based Collaborative Filtering using ALS (alternating least squares) optimization algorithm. This solution requirement is fulfilled by ALS estimator in Spark Mlib. The parametrized Grid Search approach has been deployed to identify best fit model.

In [2]:
%sh
rm -rf "/tmp/ml-1m"
rm -f "/tmp/ml-1m.zip"
curl -o /tmp/ml-1m.zip http://files.grouplens.org/datasets/movielens/ml-1m.zip
unzip /tmp/ml-1m.zip -d /tmp

In [3]:
%sh 
ls -lah /tmp

In [4]:
%fs cp -r file:/tmp/ml-1m dbfs:/tmp/ml-1m

In [5]:
%fs ls /tmp/ml-1m/

path,name,size
dbfs:/tmp/ml-1m/README,README,5577
dbfs:/tmp/ml-1m/movies.dat,movies.dat,171308
dbfs:/tmp/ml-1m/ratings.dat,ratings.dat,24594131
dbfs:/tmp/ml-1m/users.dat,users.dat,134368


In [6]:
ratingsDF = sc.textFile("/tmp/ml-1m/ratings.dat") \
  .map(lambda l: l.split("::")) \
  .map(lambda l: (int(l[0]), int(l[1]), float(l[2]))) \
  .toDF(["user", "item", "label"])

ratingsDF.show(5, False)

In [7]:
moviesDF = sc.textFile("/tmp/ml-1m/movies.dat") \
  .map(lambda l: l.split("::")) \
  .map(lambda l: (int(l[0]), str(l[1]), l[2].split("|"))) \
  .toDF(["movie", "title", "genre"])

moviesDF.show(5, False)

In [8]:
(ratingsDF.schema, moviesDF.schema)

###Collaborative filtering
Collaborative filtering is commonly used for recommender systems. These techniques aim to fill in the missing entries of a user-item association matrix, in our case, the user-movie rating matrix. MLlib currently supports model-based collaborative filtering, in which users and products are described by a small set of latent factors that can be used to predict missing entries. In particular, we implement the alternating least squares (ALS) algorithm to learn these latent factors.

In [10]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator

paramGrid = ParamGridBuilder() \
                    .addGrid(als.rank, [1, 5]) \
                    .addGrid(als.maxIter, [5, 10]) \
                    .addGrid(als.regParam, [0.3, 0.1, 0.01]) \
                    .addGrid(als.alpha, [2.0, 3.0]) \
                    .build()

als = ALS(userCol="user", itemCol="item", ratingCol="label", coldStartStrategy="drop")
cv = TrainValidationSplit(estimator=als, evaluator=RegressionEvaluator(), estimatorParamMaps=paramGrid, trainRatio=0.8)
model = cv.fit(ratingsDF)

In [11]:
best_model = model.bestModel
best_rank = best_model.rank
best_maxiter = (best_model._java_obj.parent().getMaxIter())

print("Best model's rank & maxiter:", best_rank, best_maxiter)

In [12]:
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(model.transform(ratingsDF))
print("Root Mean Square Error (RMSE):", str(rmse))

In [13]:
# Top 10 movies for users
userRecs = best_model.recommendForAllUsers(10)

# Top 10 users of all movies
movieRecs = best_model.recommendForAllItems(10)

In [14]:
# List first 5 users along with their top 10 recommended movies
userRecs.show(5, truncate = False)

In [15]:
# List first 5 movies along with their top 10 recommended-to-users
movieRecs.show(5, truncate = False)