## **Spark Movie Recommendation**
In this notebook, I will use an Alternating Least Squares (ALS) algorithm with Spark APIs to predict the ratings for the movies in [MovieLens small dataset](https://grouplens.org/datasets/movielens/latest/).
<br><br>

### **Install Spark and import packages**
<br>

In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!wget -q http://apache.forsale.plus/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!ls
!tar xf spark-2.4.7-bin-hadoop2.7.tgz

In [None]:
!pip install -q findspark
!pip install py4j

!export JAVA_HOME=$(/usr/lib/jvm/java-8-openjdk-amd64 -v 1.8)
! echo $JAVA_HOME
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"
import findspark
findspark.init("spark-2.4.7-bin-hadoop2.7")# SPARK_HOME

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
!ls

sample_data  spark-2.4.7-bin-hadoop2.7	spark-2.4.7-bin-hadoop2.7.tgz


In [None]:
spark.version

'2.4.7'

In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib
%matplotlib inline

In [None]:
import os
os.environ['PYSPARK_PYTHON'] = 'python3'

In [None]:
!pip install -U -q PyDrive

from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("moive analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()


##<br>**1. Data ETL and Data Exploration**
I'll load the datasets into spark dataframes, and explore the data.

<br>

In [None]:
from google.colab import files

uploaded = files.upload()

Saving links.csv to links.csv
Saving movies.csv to movies.csv
Saving ratings.csv to ratings.csv
Saving tags.csv to tags.csv


In [None]:
os.listdir()

['.config',
 'links.csv',
 'ratings.csv',
 'spark-2.4.7-bin-hadoop2.7',
 'tags.csv',
 'movies.csv',
 'spark-2.4.7-bin-hadoop2.7.tgz',
 'adc.json',
 'sample_data']

In [None]:
movies_df = spark.read.load("./movies.csv", format='csv', header = True)
ratings_df = spark.read.load("./ratings.csv", format='csv', header = True)
links_df = spark.read.load("./links.csv", format='csv', header = True)
tags_df = spark.read.load("./tags.csv", format='csv', header = True)

In [None]:
movies_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [None]:
ratings_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [None]:
links_df.show(5)

+-------+-------+------+
|movieId| imdbId|tmdbId|
+-------+-------+------+
|      1|0114709|   862|
|      2|0113497|  8844|
|      3|0113228| 15602|
|      4|0114885| 31357|
|      5|0113041| 11862|
+-------+-------+------+
only showing top 5 rows



In [None]:
tags_df.show(5)

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
|     2|  89774|   Boxing story|1445715207|
|     2|  89774|            MMA|1445715200|
+------+-------+---------------+----------+
only showing top 5 rows



In [None]:
ratings_df.toPandas().isnull().sum()

userId       0
movieId      0
rating       0
timestamp    0
dtype: int64

<br>There is no missing value in ratings (which I'll use to build the recommendation system later).
<br><br>

In [None]:
tmp1 = ratings_df.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings_df.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

For the users that rated movies and the movies that were rated:
Minimum number of ratings per user is 20
Minimum number of ratings per movie is 1


In [None]:
tmp1 = sum(ratings_df.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = ratings_df.select('movieId').distinct().count()
print('{} out of {} movies have only one rating'.format(tmp1, tmp2))

3446 out of 9724 movies have only one rating


<br> Every movie in ratings has at least one rating, and more than 1/3 movies have only one rating.

##**<br>2. Spark SQL and OLAP**
I'll use spark sql to analyze the datasets, and extract some information of interest.
<br><br>

In [None]:
movies_df.registerTempTable("movies")
ratings_df.registerTempTable("ratings")
links_df.registerTempTable("links")
tags_df.registerTempTable("tags")

###<br>**2.1 The number of users that rated movies**
<br>

In [None]:
spark.sql('\
            select count(distinct userid) as Number_of_users \
            from ratings \
          ').show()

+---------------+
|Number_of_users|
+---------------+
|            610|
+---------------+



###<br>**2.2 The total number of movies**
<br>

In [None]:
spark.sql('\
            select count(*) as Number_of_movies \
            from movies \
          ').show()

+----------------+
|Number_of_movies|
+----------------+
|            9742|
+----------------+



###<br>**2.3 The number of movies rated by users and movies not rated**
<br>

In [None]:
spark.sql('\
            select count(distinct movieid) as number_of_movies_rated \
            from ratings \
          ').show()

+----------------------+
|number_of_movies_rated|
+----------------------+
|                  9724|
+----------------------+



In [None]:
# movies not rated by any user

spark.sql('\
            select title, genres \
            from movies \
            where movieid not in (select distinct movieid \
                                  from ratings)\
          ').show(truncate=False)

+--------------------------------------------+------------------------+
|title                                       |genres                  |
+--------------------------------------------+------------------------+
|Innocents, The (1961)                       |Drama|Horror|Thriller   |
|Niagara (1953)                              |Drama|Thriller          |
|For All Mankind (1989)                      |Documentary             |
|Color of Paradise, The (Rang-e khoda) (1999)|Drama                   |
|I Know Where I'm Going! (1945)              |Drama|Romance|War       |
|Chosen, The (1981)                          |Drama                   |
|Road Home, The (Wo de fu qin mu qin) (1999) |Drama|Romance           |
|Scrooge (1970)                              |Drama|Fantasy|Musical   |
|Proof (1991)                                |Comedy|Drama|Romance    |
|Parallax View, The (1974)                   |Thriller                |
|This Gun for Hire (1942)                    |Crime|Film-Noir|Th

### <br>**2.4 Movie genres and their frequency**
<br>

In [None]:
spark.sql('\
            select genres as genre, \
                   count(*) as freq \
            from movies \
            group by genres \
          ').show()

+--------------------+----+
|               genre|freq|
+--------------------+----+
|Comedy|Horror|Thr...|  17|
|Adventure|Sci-Fi|...|   4|
|Action|Adventure|...|   6|
| Action|Drama|Horror|   1|
|Action|Animation|...|   2|
|Animation|Childre...|   2|
|Action|Adventure|...|  27|
|    Adventure|Sci-Fi|  15|
|Documentary|Music...|   1|
|Adventure|Childre...|   1|
| Adventure|Animation|   2|
| Musical|Romance|War|   1|
|Action|Adventure|...|   1|
|Adventure|Childre...|   2|
|Comedy|Crime|Horr...|   1|
|Crime|Drama|Fanta...|   1|
|Comedy|Mystery|Th...|   3|
|   Adventure|Fantasy|  13|
|Horror|Romance|Sc...|   1|
|Drama|Film-Noir|R...|   2|
+--------------------+----+
only showing top 20 rows



<br> The genres in the movies dataset were grouped together for each movie; in order to find out the basic genres, I'll need to split them first.
<br><br>

In [None]:
spark.sql("\
            select genre, \
                   count(*) as freq \
            from (select explode(split(genres, '[\|]')) as genre \
                  from movies) \
            group by genre \
            order by genre \
          ").show()

+------------------+----+
|             genre|freq|
+------------------+----+
|(no genres listed)|  34|
|            Action|1828|
|         Adventure|1263|
|         Animation| 611|
|          Children| 664|
|            Comedy|3756|
|             Crime|1199|
|       Documentary| 440|
|             Drama|4361|
|           Fantasy| 779|
|         Film-Noir|  87|
|            Horror| 978|
|              IMAX| 158|
|           Musical| 334|
|           Mystery| 573|
|           Romance|1596|
|            Sci-Fi| 980|
|          Thriller|1894|
|               War| 382|
|           Western| 167|
+------------------+----+



###<br>**2.5 Movies for each genre**
<br>

In [None]:
spark.sql("\
            select genre, collect_set(title) as list_of_movies \
            from \
                (select explode(split(genres, '[\|]')) as genre, title \
                 from movies) \
            group by genre \
          ").show(truncate=50)

+------------------+--------------------------------------------------+
|             genre|                                    list_of_movies|
+------------------+--------------------------------------------------+
|             Crime|[Stealing Rembrandt (Rembrandt) (2003), Innocen...|
|           Romance|[Vampire in Brooklyn (1995), Hysteria (2011), F...|
|          Thriller|[Element of Crime, The (Forbrydelsens Element) ...|
|         Adventure|[Ice Age: Collision Course (2016), Masters of t...|
|             Drama|[Airport '77 (1977), Meet John Doe (1941), Elem...|
|               War|[General, The (1926), Joyeux Noël (Merry Christ...|
|       Documentary|[The Barkley Marathons: The Race That Eats Its ...|
|           Fantasy|[Masters of the Universe (1987), Odd Life of Ti...|
|           Mystery|[Before and After (1996), Primal Fear (1996), O...|
|           Musical|[U2: Rattle and Hum (1988), Sword in the Stone,...|
|         Animation|[Ice Age: Collision Course (2016), Planes (2

##<br>**3. Spark ALS based approach for training model**
In order to train an ALS model, I'll first preprocess ratings data by removing useless columns, converting datatypes, and spliting into training and testing datasets. For the modeling part, I'll use 5-fold cross-validation to tune hyper-parameters and get the best model.
<br><br>

###**3.1 Data preprocessing**<br><br>

In [None]:
ratings_df.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



In [None]:
movie_ratings = ratings_df.drop('timestamp')

In [None]:
# convert data type

from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, FloatType
movie_ratings = movie_ratings.withColumn('userId', col('userId').cast(IntegerType())) \
                             .withColumn('movieId', col('movieId').cast(IntegerType())) \
                             .withColumn('rating', col('rating').cast(FloatType()))
movie_ratings.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
|     1|     70|   3.0|
|     1|    101|   5.0|
|     1|    110|   4.0|
|     1|    151|   5.0|
|     1|    157|   5.0|
|     1|    163|   5.0|
|     1|    216|   5.0|
|     1|    223|   3.0|
|     1|    231|   5.0|
|     1|    235|   4.0|
|     1|    260|   5.0|
|     1|    296|   3.0|
|     1|    316|   3.0|
|     1|    333|   5.0|
|     1|    349|   4.0|
+------+-------+------+
only showing top 20 rows



In [None]:
# create test and train set

(training,test)=movie_ratings.randomSplit([0.8,0.2])

In [None]:
training.count()

80509

In [None]:
test.count()

20327

###**<br>3.2 Model tuning**
With the ALS model, I will use a grid search to find the optimal hyperparameters.<br><br>

In [None]:
# import package

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder

In [None]:
# create ALS model

als = ALS(maxIter=5, rank=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")

In [None]:
# tune model using ParamGridBuilder

paramGrid = (ParamGridBuilder()
             .addGrid(als.maxIter, [5, 10, 20])
             .addGrid(als.rank, [5, 10, 20])
             .addGrid(als.regParam, [0.01, 0.05, 0.1, 0.5, 1])
             .build())

In [None]:
# define evaluator as RMSE

evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating')

In [None]:
# build cross validation

cv = CrossValidator(estimator=als, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5, parallelism=3)

In [None]:
# fit ALS model to training data

model = cv.fit(training)

In [None]:
# extract the best model derived from cross-validation

best_model = model.bestModel

### <br> **3.3 Model evaluation and testing**
I'll evaluate the model performance with RMSE, get the best parameters, and make predictions with the best model on testing data.
<br><br>

In [None]:
# generate predictions and evaluate using RMSE

predictions = best_model.transform(test)
rmse = evaluator.evaluate(predictions)

In [None]:
# print evaluation metrics and model parameters

print('RMSE = ' + str(rmse))
print('**Best Model**')
print(' Rank: ' + str(best_model.rank))
print(' MaxIter: ' + str(best_model._java_obj.parent().getMaxIter()))
print(' RegParam: ' + str(best_model._java_obj.parent().getRegParam()))

RMSE = 0.8734780948785988
**Best Model**
 Rank: 5
 MaxIter: 20
 RegParam: 0.1


In [None]:
predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|    91|    471|   1.0| 2.5700579|
|   555|    471|   3.0| 3.6426597|
|   520|    471|   5.0| 3.7199087|
|   216|    471|   3.0| 3.3477087|
|   411|    471|   4.0| 2.8862646|
|   426|    471|   5.0| 3.5451188|
|   373|    471|   5.0|  3.786501|
|   609|    833|   3.0| 1.6236358|
|    64|   1088|   4.0|  3.213802|
|   563|   1088|   4.0| 3.3304694|
|   387|   1088|   1.5| 2.5037713|
|    84|   1088|   3.0| 3.3815079|
|    51|   1088|   4.0|  3.394082|
|   414|   1088|   3.0|   3.12684|
|   525|   1088|   4.5| 3.2516308|
|    42|   1088|   3.0| 3.2737103|
|   268|   1238|   5.0|  4.059777|
|   462|   1238|   3.5| 3.7874742|
|   223|   1342|   1.0|   2.61581|
|   608|   1342|   2.0|  3.108882|
+------+-------+------+----------+
only showing top 20 rows



### <br> **3.4 Alternative model testing using the full Movielens dataset**
I will use the full movielens dataset to further test the best model's performance.
<br><br>

In [None]:
# get the dataset

id = "1XCoscHsVvNYZeW3bb3E1HFq5oWuZrcAq"
file = drive.CreateFile({'id':id}) 
file.GetContentFile('fullratings.csv') 

In [None]:
full_ratings = spark.read.load("fullratings.csv", format='csv', header = True)
full_ratings = full_ratings.drop('timestamp') \
                           .withColumn('userId', col('userId').cast(IntegerType())) \
                           .withColumn('movieId', col('movieId').cast(IntegerType())) \
                           .withColumn('rating', col('rating').cast(FloatType()))
full_ratings.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|    307|   3.5|
|     1|    481|   3.5|
|     1|   1091|   1.5|
|     1|   1257|   4.5|
|     1|   1449|   4.5|
|     1|   1590|   2.5|
|     1|   1591|   1.5|
|     1|   2134|   4.5|
|     1|   2478|   4.0|
|     1|   2840|   3.0|
|     1|   2986|   2.5|
|     1|   3020|   4.0|
|     1|   3424|   4.5|
|     1|   3698|   3.5|
|     1|   3826|   2.0|
|     1|   3893|   3.5|
|     2|    170|   3.5|
|     2|    849|   3.5|
|     2|   1186|   3.5|
|     2|   1235|   3.0|
+------+-------+------+
only showing top 20 rows



In [None]:
full_predictions = best_model.transform(full_ratings)
rmse = evaluator.evaluate(full_predictions)
print('RMSE = ' + str(rmse))

RMSE = 1.2015137601941959


In [None]:
full_predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   165|    148|   3.0| 2.6483757|
|   251|    471|   3.0|  4.276767|
|   593|    471|   3.0| 3.2378964|
|    81|    471|   3.5|   2.77449|
|   332|    471|   2.0|  3.272566|
|   336|    471|   4.0| 3.4886627|
|   417|    471|   3.0| 3.4968605|
|   360|    471|   5.0| 3.3354547|
|   185|    471|   4.0| 3.4695992|
|   429|    471|   4.0| 3.9506187|
|   295|    471|   4.0|  3.758897|
|   432|    471|   3.0|  3.006175|
|   235|    471|   4.0| 3.9211006|
|   609|    471|   3.0| 2.8487773|
|   423|    471|   3.0|  3.889041|
|   485|    471|   4.0| 3.6557608|
|     4|    471|   4.5| 3.5808876|
|   549|    471|   3.5| 1.7442908|
|   277|    471|   3.5| 2.6861544|
|   428|    471|   5.0|  2.301082|
+------+-------+------+----------+
only showing top 20 rows



## <br>**4. Apply the model and see the performance**
First, I'll apply the best model to all ratings and get the predictions and RMSE. Next, I'll create some further applications with the model: making movie recommendations to a specified user, finding similar movies to a specified movie, and finding similar users to a specified user.
<br><BR>

### **4.1 Make predictions on all ratings**
<br>

In [None]:
alldata = best_model.transform(movie_ratings)
rmse = evaluator.evaluate(alldata)
print('RMSE = ' + str(rmse))

RMSE = 0.6879682968314835


In [None]:
alldata.registerTempTable('alldata')

In [None]:
spark.sql('select * from alldata').show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   191|    148|   5.0| 4.9088535|
|   133|    471|   4.0|  3.160316|
|   597|    471|   2.0|  4.316874|
|   385|    471|   4.0| 3.1888158|
|   436|    471|   3.0| 3.4699144|
|   602|    471|   4.0| 3.3708653|
|    91|    471|   1.0| 2.5700579|
|   409|    471|   3.0| 3.5420384|
|   372|    471|   3.0| 3.3641524|
|   599|    471|   2.5| 2.7523317|
|   603|    471|   4.0|  3.286894|
|   182|    471|   4.5| 3.7909832|
|   218|    471|   4.0| 3.1973581|
|   474|    471|   3.0| 3.3265297|
|   500|    471|   1.0| 2.4843462|
|    57|    471|   3.0| 3.6280024|
|   462|    471|   2.5| 2.5884402|
|   387|    471|   3.0| 3.1550527|
|   610|    471|   4.0| 3.6646678|
|   217|    471|   2.0| 2.6178916|
+------+-------+------+----------+
only showing top 20 rows



### <br> **4.2 Make movie recommendations to a specified user**
I'll use the recommendForAllUsers method of ALSModel to make personalized movie recommendations to users.
<br><br>

In [None]:
from pyspark.sql.functions import explode

def recommend_movies_to_user(userId, num):
  '''
  userId: int
  num: int, the number of recommended movies
  '''
  recommended_movies_all = best_model.recommendForAllUsers(num) \
                                     .withColumn('recommendations', explode(col('recommendations'))) \
                                     .withColumn('movieId', col('recommendations')['movieId']) \
                                     .withColumn('prediction', col('recommendations')['rating']) \
                                     .select('userId', 'movieId', 'prediction')
  return recommended_movies_all.filter(col('userId')==userId).join(movies_df, 'movieId').select('movieId', 'title', 'genres', 'prediction')

In [None]:
# recommend 10 movies to user with id 575

result = recommend_movies_to_user(575, 10)
result.show(truncate=False)

+-------+----------------------------------+---------------------+----------+
|movieId|title                             |genres               |prediction|
+-------+----------------------------------+---------------------+----------+
|26171  |Play Time (a.k.a. Playtime) (1967)|Comedy               |5.760414  |
|70946  |Troll 2 (1990)                    |Fantasy|Horror       |5.5026865 |
|7842   |Dune (2000)                       |Drama|Fantasy|Sci-Fi |5.4999223 |
|60943  |Frozen River (2008)               |Drama                |5.3544703 |
|7841   |Children of Dune (2003)           |Fantasy|Sci-Fi       |5.3544703 |
|2261   |One Crazy Summer (1986)           |Comedy               |5.2885776 |
|170355 |Mulholland Dr. (1999)             |Drama|Mystery|Romance|5.2243824 |
|3379   |On the Beach (1959)               |Drama                |5.2243824 |
|82     |Antonia's Line (Antonia) (1995)   |Comedy|Drama         |5.1518235 |
|86377  |Louis C.K.: Shameless (2007)      |Comedy              

In [None]:
# recommend 10 movies to user with id 232

result = recommend_movies_to_user(232, 10)
result.show(truncate=False)

+-------+---------------------------------------------------------------------------+--------------------+----------+
|movieId|title                                                                      |genres              |prediction|
+-------+---------------------------------------------------------------------------+--------------------+----------+
|84847  |Emma (2009)                                                                |Comedy|Drama|Romance|4.8353314 |
|4256   |Center of the World, The (2001)                                            |Drama               |4.730609  |
|134796 |Bitter Lake (2015)                                                         |Documentary         |4.730609  |
|171495 |Cosmos                                                                     |(no genres listed)  |4.730609  |
|74226  |Dream of Light (a.k.a. Quince Tree Sun, The) (Sol del membrillo, El) (1992)|Documentary|Drama   |4.730609  |
|179135 |Blue Planet II (2017)                          

### <br> **4.3 Find similar movies to a specified movie**
Since the ALS model stores item features and user features, I could use these features to compute cosine similarity, so as to find similar movies/users to any specified movie/user.
<br><br>

In [None]:
movies_df = movies_df.withColumn('movieId', col('movieId').cast(IntegerType()))
df_movies = movies_df.toPandas()
item_factors = best_model.itemFactors.toPandas()

In [None]:
def similar_movies(movieId, num):
  '''
  movieId: int
  num: int, the number of similar movies
  '''
  try:
    features = item_factors[item_factors.id == movieId].iloc[0, 1]
  except:
    return 'There is no movie with id ' + str(movieId)
  
  similarities = np.empty((item_factors.shape[0], 2))
  for i in range(item_factors.shape[0]):
    id = item_factors.iloc[i, 0]
    numerator = np.dot(features, item_factors.iloc[i, 1])
    denominator = np.linalg.norm(features) * np.linalg.norm(item_factors.iloc[i, 1])
    cos_similarity = numerator / denominator
    similarities[i] = [id, cos_similarity]
  
  df_similarities = pd.DataFrame(similarities, columns = ['movieId', 'similarity']) \
                      .astype({"movieId": int, "similarity": float}) \
                      .sort_values(by = 'similarity', ascending = False)
  return df_similarities.iloc[1:num+1, :].merge(df_movies, on = 'movieId')

In [None]:
# 10 movies most similar to the movie with id 165

similar_movies(165, 10)

Unnamed: 0,movieId,similarity,title,genres
0,83613,0.997315,Cowboys & Aliens (2011),Action|Sci-Fi|Thriller|Western|IMAX
1,3827,0.996754,Space Cowboys (2000),Action|Adventure|Comedy|Sci-Fi
2,3071,0.99603,Stand and Deliver (1988),Comedy|Drama
3,6298,0.9952,Malibu's Most Wanted (2003),Comedy|Crime
4,445,0.995151,Fatal Instinct (1993),Comedy
5,37739,0.994441,"Greatest Game Ever Played, The (2005)",Drama
6,1485,0.993698,Liar Liar (1997),Comedy
7,94985,0.993447,Get the Gringo (2012),Action|Crime|Drama|Thriller
8,832,0.993299,Ransom (1996),Crime|Thriller
9,251,0.992782,"Hunted, The (1995)",Action


In [None]:
df_movies[df_movies.movieId == 165]

Unnamed: 0,movieId,title,genres
138,165,Die Hard: With a Vengeance (1995),Action|Crime|Thriller


In [None]:
# 10 movies most similar to the movie with id 471

similar_movies(471, 10)

Unnamed: 0,movieId,similarity,title,genres
0,2937,0.9969,"Palm Beach Story, The (1942)",Comedy
1,3083,0.996399,All About My Mother (Todo sobre mi madre) (1999),Drama
2,313,0.995804,"Swan Princess, The (1994)",Animation|Children
3,27731,0.993522,"Cat Returns, The (Neko no ongaeshi) (2002)",Adventure|Animation|Children|Fantasy
4,2100,0.992877,Splash (1984),Comedy|Fantasy|Romance
5,3983,0.992632,You Can Count on Me (2000),Drama|Romance
6,2060,0.991633,BASEketball (1998),Comedy
7,1696,0.99102,Bent (1997),Drama|War
8,6238,0.987916,Green Card (1990),Comedy|Drama|Romance
9,3055,0.986899,Felicia's Journey (1999),Thriller


In [None]:
df_movies[df_movies.movieId == 471]

Unnamed: 0,movieId,title,genres
409,471,"Hudsucker Proxy, The (1994)",Comedy


### <br> **4.4 Find similar users to a specified user**
<br>

In [None]:
user_factors = best_model.userFactors.toPandas()

def similar_users(userId, num):
  try:
    features = user_factors[user_factors.id == userId].iloc[0, 1]
  except:
    return 'There is no user with id ' + str(userId)
  
  similarities = np.empty((user_factors.shape[0], 2))
  for i in range(user_factors.shape[0]):
    id = user_factors.iloc[i, 0]
    numerator = np.dot(features, user_factors.iloc[i, 1])
    denominator = np.linalg.norm(features) * np.linalg.norm(user_factors.iloc[i, 1])
    cos_similarity = numerator / denominator
    similarities[i] = [id, cos_similarity]
  
  df_similarities = pd.DataFrame(similarities, columns = ['userId', 'similarity']) \
                      .astype({"userId": int, "similarity": float}) \
                      .sort_values(by = 'similarity', ascending = False)
  return df_similarities.iloc[1:num+1, :]

In [None]:
# 10 users most similar to the user with id 134

similar_users(134, 10)

Unnamed: 0,userId,similarity
348,435,0.99961
95,341,0.998717
56,570,0.99784
572,239,0.994971
390,246,0.994737
204,213,0.99471
609,609,0.994647
138,162,0.994154
62,11,0.993705
178,562,0.993597


In [None]:
# 10 users most similar to the user with id 256

similar_users(256, 10)

Unnamed: 0,userId,similarity
377,116,0.993387
110,491,0.992079
221,383,0.991963
224,413,0.988579
123,12,0.985983
13,140,0.983033
93,321,0.980677
412,466,0.979963
502,148,0.97654
494,68,0.97653


##<br>**Final Report**

**Motivation**:<br><br>

In the era of big data, recommendation system has become the core technology of internet businesses, which provides tremendous values through recommending personalized items to users. Of various approaches to build recommendation systems, the ALS model based on collaborative filtering approach is widely used and has many advantages such as low space cost and missing value imputation. Meanwhile, since the ALS model training based on matrix factorization tends to be time consuming, the Apache Spark ML cound be used to take advantage of its distributed computing. This motivated me to build a recommendation system with ALS model on Spark and to test its performance with real data.<br><br>

**Steps:**<br><br>


1.   The first step is to set up the Spark Session in Google Colab, import packages, and load the data into Spark Dataframes. After that, I did some data exploration to understand the missing data and sparseness in the ratings data, which could be important in the model training process. This exploration shows that there is no missing data in the ratings, and less than half of movies are rated by only one user.<br><br>

2.   I used Spark SQL to perform the OLAP process and further explore the data. There are 610 users and 9724 movies in the ratings data, 9742 movies in the movies data, and therefore 18 movies not in the ratings data. In the movies data, there are 20 genres in total after splitting the genre combinations for each movie, and number of movies belonging to each genre ranges from 34 to 4361.<br><Br>

3.    The third step contains data transformation, model selection, training and tuning with cross validation. Data types in the ratings data were converted from string type to integer and float types, and training, testing datasets were created. Then I built up and trained an ALS model, set up a ParamGrid with hyperparameter sets and tuned the model with best parameters using 5-fold cross validation. Additionally, 3 model parallelism was used in cross validation to speed up the tuning process.<br><br>

4.    The trained model was evaluated on testing dataset using root mean square error (RMSE) metric. Additionally, I used the Movielens full dataset to test the model performance on a larger dataset. <br><br>

5.    Finally, I made some applications with the trained model, such as making personalized movie recommendations to specific users, and finding similar movies/users to specified movie/user.<br><br>


**Conclusion:**<br><br>

In this notebook I have trained an ALS model for movie recommendation using Movielens data and tuned the model with best hyperparameters, deriving the RMSE score of 0.88 in the testing data and 1.20 in the full Movielens data. I have applied the model to recommend personalized movies to users, and to find similar movies/users for specified movie/user. Since the RMSE is still large, especially when further testing the model with full Movielens data, future work could focus on reducing the RMSE and increase the model robustness.