# Spark ALS Movie Recommendation System
MovieLens small dataset](https://grouplens.org/datasets/movielens/latest/)

In [0]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math
%matplotlib inline



In [0]:
import os
os.environ["PYSPARK_PYTHON"] = "python3"

## Part 1 : Data ETL and Data Exploration

In [0]:
movies_df = spark.read.load("/FileStore/tables/movies-1.csv", format='csv', header = True)
ratings_df = spark.read.load("/FileStore/tables/ratings-1.csv", format='csv', header = True)
links_df = spark.read.load("/FileStore/tables/links-1.csv", format='csv', header = True)
tags_df = spark.read.load("/FileStore/tables/tags-1.csv", format='csv', header = True)

In [0]:
movies_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [0]:
ratings_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [0]:
links_df.show(5)

+-------+-------+------+
|movieId| imdbId|tmdbId|
+-------+-------+------+
|      1|0114709|   862|
|      2|0113497|  8844|
|      3|0113228| 15602|
|      4|0114885| 31357|
|      5|0113041| 11862|
+-------+-------+------+
only showing top 5 rows



In [0]:
tags_df.show(5)

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
|     2|  89774|   Boxing story|1445715207|
|     2|  89774|            MMA|1445715200|
+------+-------+---------------+----------+
only showing top 5 rows



In [0]:
tmp1 = ratings_df.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings_df.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

For the users that rated movies and the movies that were rated:
Minimum number of ratings per user is 20
Minimum number of ratings per movie is 1


In [0]:
tmp1 = sum(ratings_df.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = ratings_df.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))

3446 out of 9724 movies are rated by only one user


## Part 2 : Spark SQL and Online Analytical Processing

In [0]:
movies_df.registerTempTable("movies")
ratings_df.registerTempTable("ratings")
links_df.registerTempTable("links")
tags_df.registerTempTable("tags")



### 2.1 : The number of Users

In [0]:
%sql 
select count(distinct userId) as number_of_users
from ratings

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-4373915032349204>[0m in [0;36m<cell line: 1>[0;34m()[0m
[1;32m      5[0m     [0mdisplay[0m[0;34m([0m[0mdf[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      6[0m     [0;32mreturn[0m [0mdf[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 7[0;31m   [0m_sqldf[0m [0;34m=[0m [0m____databricks_percent_sql[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      8[0m [0;32mfinally[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m      9[0m   [0;32mdel[0m [0m____databricks_percent_sql[0m[0;34m[0m[0;34m[0m[0m

[0;32m<command-4373915032349204>[0m in [0;36m____databricks_percent_sql[0;34m()[0m
[1;32m      2[0m   [0;32mdef[0m [0m____databricks_percent_sql[0m[0;34m([0m[0;34m)[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m      3[0m     [0;32mimport

In [0]:
%sql 
select count(*) as number_of_rows
from ratings

number_of_rows
100836


### 2.2: The number of Movies

In [0]:
%sql 
select count(distinct movieId) as number_of_movies
from ratings

number_of_movies
9724


### 2.3:  How many movies are rated by users? List movies not rated before

In [0]:
%sql 
select count(distinct m.movieId) as number_of_movies_rated
from movies as m
left outer join ratings as r
on m.movieId=r.movieId
where r.userId is not NULL

number_of_movies_rated
9724


In [0]:
%sql 
select count(distinct m.movieId) as number_of_movies_not_rated
from movies as m
left outer join ratings as r
on m.movieId=r.movieId
where r.userId is NULL

number_of_movies_not_rated
18


### 2.4: List Movie Genres

In [0]:
%sql
select count(distinct genres) as number_of_genres
from movies

number_of_genres
951


### 2.5 : Movie for Each Category

In [0]:
%sql
select distinct genres, count(*)
from movies
group by 1
order by 2 desc

genres,count(1)
Drama,1053
Comedy,946
Comedy|Drama,435
Comedy|Romance,363
Drama|Romance,349
Documentary,339
Comedy|Drama|Romance,276
Drama|Thriller,168
Horror,167
Horror|Thriller,135


## Part 3 : Spark ALS Training and Model Selection

We will use an Spark ML to predict the ratings, so let's reload "ratings.csv" using ``sc.textFile`` and then convert it to the form of (user, item, rating) tuples.

In [0]:
ratings_df.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



In [0]:
movie_ratings=ratings_df.drop('timestamp')

In [0]:
# Data type convert
from pyspark.sql.types import IntegerType, FloatType
movie_ratings = movie_ratings.withColumn("userId", movie_ratings["userId"].cast(IntegerType()))

movie_ratings = movie_ratings.withColumn("movieId", movie_ratings["movieId"].cast(IntegerType()))

movie_ratings = movie_ratings.withColumn("rating", movie_ratings["rating"].cast(FloatType()))

In [0]:
movie_ratings.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
|     1|     70|   3.0|
|     1|    101|   5.0|
|     1|    110|   4.0|
|     1|    151|   5.0|
|     1|    157|   5.0|
|     1|    163|   5.0|
|     1|    216|   5.0|
|     1|    223|   3.0|
|     1|    231|   5.0|
|     1|    235|   4.0|
|     1|    260|   5.0|
|     1|    296|   3.0|
|     1|    316|   3.0|
|     1|    333|   5.0|
|     1|    349|   4.0|
+------+-------+------+
only showing top 20 rows



In [0]:
# import package
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder

In [0]:
#Create test and train set
(training,test)=movie_ratings.randomSplit([0.8,0.2])

In [0]:
#Create ALS model
als = ALS(maxIter=5, rank=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

In [0]:
#Tune model using ParamGridBuilder
paramGrid = (ParamGridBuilder()
             .addGrid(als.regParam, [0.05, 0.1, 0.3, 0.5])
             .addGrid(als.rank, [5, 10, 15])
             .addGrid(als.maxIter, [1, 5, 10])
             .build())

In [0]:
# Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

In [0]:
# Build Cross validation 
cv = CrossValidator(estimator=als, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

In [0]:
#Fit ALS model to training data
cvModel = cv.fit(training)

In [0]:
#Extract best model from the tuning exercise using ParamGridBuilder
bestModel=cvModel.bestModel

## Part 4 : Model Evaluation
And finally, make a prediction and check the testing error.

In [0]:
#Generate predictions and evaluate using RMSE
predictions=bestModel.transform(test)
rmse = evaluator.evaluate(predictions)

In [0]:
#Print evaluation metrics and model parameters
print ("RMSE = "+str(rmse))
print ("**Best Model**")
print (" Rank: ", str(bestModel._java_obj.parent().getRank())),
print (" MaxIter: ", str(bestModel._java_obj.parent().getMaxIter())), 
print (" RegParam: ", str(bestModel._java_obj.parent().getRegParam()))

RMSE = 0.8950618256579692
**Best Model**
 Rank:  5
 MaxIter:  10
 RegParam:  0.1


In [0]:
predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   148|   4308|   4.0| 3.7712424|
|   148|   4896|   4.0| 3.7123435|
|   148|   5952|   3.0|  3.174405|
|   148|   8368|   4.0|  3.875325|
|   148|  44191|   4.0| 3.5638723|
|   148|  50872|   3.0| 3.8877084|
|   148|  69757|   3.5|  4.075873|
|   148|  72998|   4.0|  3.013301|
|   148|  79091|   3.5|  3.929764|
|   148| 112852|   3.5| 3.7387235|
|   148| 115617|   3.5|  3.717218|
|   148| 157296|   3.0| 3.4594297|
|   463|    520|   4.0|  3.455255|
|   463|    552|   3.5| 3.4553185|
|   463|   2019|   4.0| 3.9546757|
|   463|   5378|   4.0|  3.261105|
|   463|   6377|   3.5| 3.7775528|
|   471|      1|   5.0| 3.4891508|
|   471|   2571|   3.5| 3.7363117|
|   471|   2959|   4.0| 4.1894026|
+------+-------+------+----------+
only showing top 20 rows



## Part 5 : Model Application - Recommend movie to users

In [0]:
alldata=bestModel.transform(movie_ratings)
rmse = evaluator.evaluate(alldata)
print ("RMSE = "+str(rmse))

RMSE = 0.6946112562051512


In [0]:
alldata.registerTempTable("alldata")

In [0]:
%sql select * from alldata

userId,movieId,rating,prediction
148,356,4.0,3.592932
148,1197,3.0,3.5758662
148,4308,4.0,3.7712424
148,4886,3.0,3.4737272
148,4896,4.0,3.7123435
148,4993,3.0,3.2538111
148,5618,3.0,3.4483886
148,5816,4.0,3.8740797
148,5952,3.0,3.174405
148,6377,3.0,3.6790614


In [0]:
%sql select * from movies join alldata on movies.movieId=alldata.movieId

movieId,title,genres,userId,movieId.1,rating,prediction
356,Forrest Gump (1994),Comedy|Drama|Romance|War,148,356,4.0,3.592932
1197,"Princess Bride, The (1987)",Action|Adventure|Comedy|Fantasy|Romance,148,1197,3.0,3.5758662
4308,Moulin Rouge (2001),Drama|Musical|Romance,148,4308,4.0,3.7712424
4886,"Monsters, Inc. (2001)",Adventure|Animation|Children|Comedy|Fantasy,148,4886,3.0,3.4737272
4896,Harry Potter and the Sorcerer's Stone (a.k.a. Harry Potter and the Philosopher's Stone) (2001),Adventure|Children|Fantasy,148,4896,4.0,3.7123435
4993,"Lord of the Rings: The Fellowship of the Ring, The (2001)",Adventure|Fantasy,148,4993,3.0,3.2538111
5618,Spirited Away (Sen to Chihiro no kamikakushi) (2001),Adventure|Animation|Fantasy,148,5618,3.0,3.4483886
5816,Harry Potter and the Chamber of Secrets (2002),Adventure|Fantasy,148,5816,4.0,3.8740797
5952,"Lord of the Rings: The Two Towers, The (2002)",Adventure|Fantasy,148,5952,3.0,3.174405
6377,Finding Nemo (2003),Adventure|Animation|Children|Comedy,148,6377,3.0,3.6790614


### 5.1 Recommend moive to users with id: 575, 232. 
you can choose some users to recommend the moives

In [0]:
!pip install koalas
import databricks.koalas as ks

You should consider upgrading via the '/databricks/python3/bin/python -m pip install --upgrade pip' command.[0m


In [0]:
userRecs = bestModel.recommendForAllUsers(10) #通过模型为每个用户生成top10个电影推荐
userRecs_ks=userRecs.to_koalas()
movies_ks=movies_df.to_koalas()



In [0]:
def movieRecommendation(inputId):
  recs_list=[]
  for recs in userRecs_ks.loc[str(inputId), 'recommendations']:
    recs_list.append(str(recs[0]))
  return (movies_ks[movies_ks['movieId'].isin(recs_list)])
print("Recommended movies for user with id '575' are as follows.")
movieRecommendation(575)

Recommended movies for user with id '575' are as follows.
  Unable to convert the field recommendations. If this column is not necessary, you may consider dropping it or converting to primitive type before the conversion.
Direct cause: Unsupported type in conversion to Arrow: ArrayType(StructType(List(StructField(movieId,IntegerType,true),StructField(rating,FloatType,true))),true)
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.


Unnamed: 0,movieId,title,genres
2209,2936,Sullivan's Travels (1941),Adventure|Comedy|Romance
2283,3030,Yojimbo (1961),Action|Adventure
2410,3200,"Last Detail, The (1973)",Comedy|Drama
3272,4429,Moby Dick (1956),Drama
3320,4495,Crossing Delancey (1988),Comedy|Romance
3908,5490,The Big Bus (1976),Action|Comedy
4539,6732,"Hello, Dolly! (1969)",Comedy|Musical|Romance
4782,7121,Adam's Rib (1949),Comedy|Romance
5136,8235,Safety Last! (1923),Action|Comedy|Romance
8839,132333,Seve (2014),Documentary|Drama


In [0]:
print("Recommended movies for user with id '232' are as follows.")
movieRecommendation(232)

Recommended movies for user with id '232' are as follows.
  Unable to convert the field recommendations. If this column is not necessary, you may consider dropping it or converting to primitive type before the conversion.
Direct cause: Unsupported type in conversion to Arrow: ArrayType(StructType(List(StructField(movieId,IntegerType,true),StructField(rating,FloatType,true))),true)
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.


Unnamed: 0,movieId,title,genres
735,955,Bringing Up Baby (1938),Comedy|Romance
890,1187,Passion Fish (1992),Drama
2021,2693,Trekkies (1997),Documentary
2410,3200,"Last Detail, The (1973)",Comedy|Drama
2543,3404,Titanic (1953),Action|Drama
2665,3567,Bossa Nova (2000),Comedy|Drama|Romance
2734,3672,Benji (1974),Adventure|Children
3892,5466,My Wife is an Actress (Ma Femme est une Actric...,Comedy|Drama|Romance
6110,42730,Glory Road (2006),Drama
9297,158872,Sausage Party (2016),Animation|Comedy


## Part 6 : Model Application - Find the Similar Movies

### 6.1 Find the similar moives for moive with id: 463, 471
You can find the similar moives based on the ALS results

In [0]:
itemFactors=bestModel.itemFactors.to_koalas() #得到拥有两个columns的dataframe 一个是id列 一个是feature列 使用feature列来计算相似的电影



In [0]:
def similarMovies(inputId, matrix='cosine_similarity'):
  try:
    movieFeature=itemFactors.loc[itemFactors.id==str(inputId),'features'].to_numpy()[0]
  except:
    return 'There is no movie with id ' + str(inputId)
  
  if matrix=='cosine_similarity': #余弦相似度的计算
    similarMovie=pd.DataFrame(columns=('movieId','cosine_similarity'))
    for id,feature in itemFactors.to_numpy():
      cs=np.dot(movieFeature,feature)/(np.linalg.norm(movieFeature) * np.linalg.norm(feature))
      similarMovie=similarMovie.append({'movieId':str(id), 'cosine_similarity':cs}, ignore_index=True)
    similarMovie_cs=similarMovie.sort_values(by=['cosine_similarity'],ascending = False)[1:11]
    joint=similarMovie_cs.merge(movies_ks.to_pandas(), left_on='movieId', right_on = 'movieId', how = 'inner')
  if matrix=='euclidean_distance': #欧几里得距离的计算
    similarMovie=pd.DataFrame(columns=('movieId','euclidean_distance'))
    for id,feature in itemFactors.to_numpy():
      ed=np.linalg.norm(np.array(movieFeature)-np.array(feature))
      similarMovie=similarMovie.append({'movieId':str(id), 'euclidean_distance':ed}, ignore_index=True)
    similarMovie_ed=similarMovie.sort_values(by=['euclidean_distance'])[1:11]
    joint=similarMovie_ed.merge(movies_ks.to_pandas(), left_on='movieId', right_on = 'movieId', how = 'inner')
  return joint[['movieId','title','genres']]

In [0]:
similarMovies(463)

Out[35]: 'There is no movie with id 463'

In [0]:
print('Similar movies based on cosine similarity matrix are as follows.')
similarMovies(471, 'cosine_similarity')

Similar movies based on cosine similarity matrix are as follows.


Unnamed: 0,movieId,title,genres
0,1416,Evita (1996),Drama|Musical
1,106100,Dallas Buyers Club (2013),Drama
2,3480,Lucas (1986),Drama|Romance
3,950,"Thin Man, The (1934)",Comedy|Crime
4,5446,Rabbit-Proof Fence (2002),Adventure|Drama
5,8254,Arizona Dream (1993),Comedy|Drama|Fantasy|Romance
6,177593,"Three Billboards Outside Ebbing, Missouri (2017)",Crime|Drama
7,3545,Cabaret (1972),Drama|Musical
8,3968,Bedazzled (2000),Comedy
9,906,Gaslight (1944),Drama|Thriller


In [0]:
print('Similar movies based on euclidean distance matrix are as follows.')
similarMovies(471, 'euclidean_distance')

Similar movies based on euclidean distance matrix are as follows.


Unnamed: 0,movieId,title,genres
0,106100,Dallas Buyers Club (2013),Drama
1,3480,Lucas (1986),Drama|Romance
2,5446,Rabbit-Proof Fence (2002),Adventure|Drama
3,8254,Arizona Dream (1993),Comedy|Drama|Fantasy|Romance
4,906,Gaslight (1944),Drama|Thriller
5,25886,Random Harvest (1942),Drama|Romance
6,950,"Thin Man, The (1934)",Comedy|Crime
7,48322,Jackass Number Two (2006),Comedy|Documentary
8,72226,Fantastic Mr. Fox (2009),Adventure|Animation|Children|Comedy|Crime
9,142456,The Brand New Testament (2015),(no genres listed)
