### Spark Setup 

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.mirrors.hoobly.com/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install py4j

Collecting py4j
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |█▋                              | 10kB 17.1MB/s eta 0:00:01[K     |███▎                            | 20kB 2.9MB/s eta 0:00:01[K     |█████                           | 30kB 3.1MB/s eta 0:00:01[K     |██████▋                         | 40kB 3.5MB/s eta 0:00:01[K     |████████▎                       | 51kB 3.4MB/s eta 0:00:01[K     |██████████                      | 61kB 3.8MB/s eta 0:00:01[K     |███████████▌                    | 71kB 4.1MB/s eta 0:00:01[K     |█████████████▏                  | 81kB 4.1MB/s eta 0:00:01[K     |██████████████▉                 | 92kB 4.3MB/s eta 0:00:01[K     |████████████████▌               | 102kB 4.4MB/s eta 0:00:01[K     |██████████████████▏             | 112kB 4.4MB/s eta 0:00:01[K     |███████████████████▉            | 122kB 4.4MB/s eta 0:0

In [0]:
!ls

sample_data  spark-2.4.5-bin-hadoop2.7	spark-2.4.5-bin-hadoop2.7.tgz


In [0]:
!export JAVA_HOME=$(/usr/lib/jvm/java-8-openjdk-amd64 -v 1.8)

/bin/bash: /usr/lib/jvm/java-8-openjdk-amd64: Is a directory


In [0]:
! echo $JAVA_HOME

/usr/lib/jvm/java-8-openjdk-amd64


In [0]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math
%matplotlib inline


In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"
import findspark
findspark.init("spark-2.4.5-bin-hadoop2.7")# SPARK_HOME

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()


In [0]:
spark.version

'2.4.5'

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [0]:
spark = SparkSession \
    .builder \
    .appName("moive analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

Part1: Data ETL and Data Exploration

### Read data from your machine

In [0]:
from google.colab import files

uploaded = files.upload()

for fn in uploaded.keys():
  print('User uploaded file "{name}" with length {length} bytes'.format(
      name=fn, length=len(uploaded[fn])))

Saving links.csv to links.csv
Saving movies.csv to movies.csv
Saving ratings.csv to ratings.csv
Saving README.txt to README.txt
Saving tags.csv to tags.csv
User uploaded file "links.csv" with length 197979 bytes
User uploaded file "movies.csv" with length 494431 bytes
User uploaded file "ratings.csv" with length 2483723 bytes
User uploaded file "README.txt" with length 8342 bytes
User uploaded file "tags.csv" with length 118660 bytes


In [0]:
!ls

links.csv    README.txt			spark-2.4.5-bin-hadoop2.7.tgz
movies.csv   sample_data		tags.csv
ratings.csv  spark-2.4.5-bin-hadoop2.7


In [0]:
import os 
os.listdir('./')

['.config',
 'spark-2.4.5-bin-hadoop2.7.tgz',
 'tags.csv',
 'spark-2.4.5-bin-hadoop2.7',
 'links.csv',
 'ratings.csv',
 'movies.csv',
 'README.txt',
 'sample_data']

In [0]:
movies_df = spark.read.load("movies.csv", format='csv', header = True)
ratings_df = spark.read.load("ratings.csv", format='csv', header = True)
links_df = spark.read.load("links.csv", format='csv', header = True)
tags_df = spark.read.load("tags.csv", format='csv', header = True)

In [0]:
movies_df.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [0]:
ratings_df.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [0]:
links_df.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- imdbId: string (nullable = true)
 |-- tmdbId: string (nullable = true)



In [0]:
tags_df.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- tag: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [0]:
movies_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [0]:
ratings_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [0]:
tmp1 = ratings_df.groupBy("userID").count().select('count').rdd.min()[0]
tmp2 = ratings_df.groupBy("movieId").count().select('count').rdd.min()[0]
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

For the users that rated movies and the movies that were rated:
Minimum number of ratings per user is 20
Minimum number of ratings per movie is 1


In [0]:
tmp1 = ratings_df.groupBy("movieId").count().filter('count = 1').count()
tmp2 = ratings_df.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))

3446 out of 9724 movies are rated by only one user


Part 1: Spark SQL and OLAP

In [0]:
movies_df.registerTempTable("movies")
ratings_df.registerTempTable("ratings")
links_df.registerTempTable("links")
tags_df.registerTempTable("tags")

Q1: The number of Users

In [0]:
output = spark.sql("select count(distinct userID) as Number_of_users from ratings")
output.show()

+---------------+
|Number_of_users|
+---------------+
|            610|
+---------------+



Q2: The number of Movies

In [0]:
output = spark.sql("select count(distinct movieId) as Number_of_movies from movies")
output.show()

+----------------+
|Number_of_movies|
+----------------+
|            9742|
+----------------+



Q3: How many movies are rated by users? List movies not rated before

In [0]:
#how many movies are rated by users??
output=spark.sql("select count(distinct movieId) as rated_movies from ratings")
output.show()

+------------+
|rated_movies|
+------------+
|        9724|
+------------+



In [0]:
output=spark.sql("select title, genres from movies where movieID not in(select movieId from ratings)")
output.show()

+--------------------+--------------------+
|               title|              genres|
+--------------------+--------------------+
|Innocents, The (1...|Drama|Horror|Thri...|
|      Niagara (1953)|      Drama|Thriller|
|For All Mankind (...|         Documentary|
|Color of Paradise...|               Drama|
|I Know Where I'm ...|   Drama|Romance|War|
|  Chosen, The (1981)|               Drama|
|Road Home, The (W...|       Drama|Romance|
|      Scrooge (1970)|Drama|Fantasy|Mus...|
|        Proof (1991)|Comedy|Drama|Romance|
|Parallax View, Th...|            Thriller|
|This Gun for Hire...|Crime|Film-Noir|T...|
|Roaring Twenties,...|Crime|Drama|Thriller|
|Mutiny on the Bou...|Adventure|Drama|R...|
|In the Realms of ...|Animation|Documen...|
|Twentieth Century...|              Comedy|
|Call Northside 77...|Crime|Drama|Film-...|
|Browning Version,...|               Drama|
|  Chalet Girl (2011)|      Comedy|Romance|
+--------------------+--------------------+



In [0]:
output=spark.sql("select distinct genres from movies")
output.show()

+--------------------+
|              genres|
+--------------------+
|Comedy|Horror|Thr...|
|Adventure|Sci-Fi|...|
|Action|Adventure|...|
| Action|Drama|Horror|
|Action|Animation|...|
|Animation|Childre...|
|Action|Adventure|...|
|    Adventure|Sci-Fi|
|Documentary|Music...|
|Adventure|Childre...|
| Adventure|Animation|
| Musical|Romance|War|
|Action|Adventure|...|
|Adventure|Childre...|
|Comedy|Crime|Horr...|
|Crime|Drama|Fanta...|
|Comedy|Mystery|Th...|
|   Adventure|Fantasy|
|Horror|Romance|Sc...|
|Drama|Film-Noir|R...|
+--------------------+
only showing top 20 rows



In [0]:
output=spark.sql("Select distinct Category from movies \
lateral view explode(split(genres,'[|]')) as Category order by Category")
output.show()

+------------------+
|          Category|
+------------------+
|(no genres listed)|
|            Action|
|         Adventure|
|         Animation|
|          Children|
|            Comedy|
|             Crime|
|       Documentary|
|             Drama|
|           Fantasy|
|         Film-Noir|
|            Horror|
|              IMAX|
|           Musical|
|           Mystery|
|           Romance|
|            Sci-Fi|
|          Thriller|
|               War|
|           Western|
+------------------+



Q5: Movie for each category

In [0]:
output=spark.sql("Select Category, count(*) as number from movies \
lateral view explode(split(genres,'[|]')) as Category group by Category order by number desc")
output.show()

+------------------+------+
|          Category|number|
+------------------+------+
|             Drama|  4361|
|            Comedy|  3756|
|          Thriller|  1894|
|            Action|  1828|
|           Romance|  1596|
|         Adventure|  1263|
|             Crime|  1199|
|            Sci-Fi|   980|
|            Horror|   978|
|           Fantasy|   779|
|          Children|   664|
|         Animation|   611|
|           Mystery|   573|
|       Documentary|   440|
|               War|   382|
|           Musical|   334|
|           Western|   167|
|              IMAX|   158|
|         Film-Noir|    87|
|(no genres listed)|    34|
+------------------+------+



In [0]:
output=spark.sql("Select Category, concat_ws(',', collect_set(title)) as list_of_movies from \
(Select Category, title from movies lateral view explode(split(genres,'[|]')) as Category \
group by Category, title) t group by Category")
output.show()

+------------------+--------------------+
|          Category|      list_of_movies|
+------------------+--------------------+
|             Crime|Stealing Rembrand...|
|           Romance|Vampire in Brookl...|
|          Thriller|Element of Crime,...|
|         Adventure|Ice Age: Collisio...|
|             Drama|Airport '77 (1977...|
|               War|General, The (192...|
|       Documentary|Jim & Andy: The G...|
|           Fantasy|Masters of the Un...|
|           Mystery|Before and After ...|
|           Musical|U2: Rattle and Hu...|
|         Animation|Ice Age: Collisio...|
|         Film-Noir|Rififi (Du rififi...|
|(no genres listed)|T2 3-D: Battle Ac...|
|              IMAX|Harry Potter and ...|
|            Horror|Tormented (1960),...|
|           Western|Man Who Shot Libe...|
|            Comedy|Hysteria (2011),H...|
|          Children|Ice Age: Collisio...|
|            Action|Stealing Rembrand...|
|            Sci-Fi|Push (2009),SORI:...|
+------------------+--------------

Part2: Spark ALS based approach for training model
We will use an Spark ML to predict the ratings, so let's reload "ratings.csv" using sc.textFile and then convert it to the form of (user, item, rating) tuples.

In [0]:
movie_ratings=ratings_df.drop('timestamp')

In [0]:
# Data type convert
from pyspark.sql.types import IntegerType, FloatType
movie_ratings = movie_ratings.withColumn("userId", movie_ratings["userId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("movieId", movie_ratings["movieId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("rating", movie_ratings["rating"].cast(FloatType()))

**ALS Model Selection and Evaluation**

With the ALS model, we can use a grid search to find the optimal hyperparameters.

In [0]:
# import package
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder

In [0]:
#Create test and train set
(training,test)=movie_ratings.randomSplit([0.8,0.2], seed = 12)

In [0]:
#Create ALS model
als = ALS(maxIter=5, rank=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")

In [0]:
#Tune model using ParamGridBuilder
paramGrid = ParamGridBuilder()\
            .addGrid(als.regParam, [0.1, 0.01, 0.001])\
            .addGrid(als.maxIter, [3, 5, 10])\
            .addGrid(als.rank, [5, 10, 15])\
            .build()

In [0]:
# Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

In [0]:
# Build Cross validation 
crossval = CrossValidator(estimator=als,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

In [0]:
#Fit ALS model to training data
model = als.fit(training)

In [0]:
#Extract best model from the tuning exercise using ParamGridBuilder
cvModel = crossval.fit(training)
predictions = cvModel.transform(training)
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.6396344464539596


**Model testing**

And finally, make a prediction and check the testing error.

In [0]:
#Generate predictions and evaluate using RMSE
best_model = cvModel.bestModel
predictions=best_model.transform(test)
rmse = evaluator.evaluate(predictions)

In [0]:
#Print evaluation metrics and model parameters
print ("RMSE = "+str(rmse))
print ("**Best Model**")
print (" Rank:"+str(best_model._java_obj.parent().getRank())), 
print (" MaxIter:"+str(best_model._java_obj.parent().getMaxIter())), 
print (" RegParam:"+str(best_model._java_obj.parent().getRegParam()))

RMSE = 0.8791781344582082
**Best Model**
 Rank:5
 MaxIter:10
 RegParam:0.1


In [0]:
predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   133|    471|   4.0| 2.8902345|
|   182|    471|   4.5| 3.9165268|
|   218|    471|   4.0| 3.3796198|
|   474|    471|   3.0| 3.4625764|
|   387|    471|   3.0|  3.005991|
|   555|    471|   3.0| 4.3878803|
|   520|    471|   5.0|   3.64513|
|   273|    471|   5.0| 4.2263184|
|   104|    471|   4.5|  3.618175|
|   463|   1088|   3.5| 3.4042325|
|   159|   1088|   4.0| 2.8817494|
|    20|   1088|   4.5| 3.5094023|
|   583|   1088|   3.5| 3.3393486|
|   555|   1088|   4.0| 3.8758426|
|    84|   1088|   3.0| 3.1596396|
|   268|   1238|   5.0| 3.9441013|
|    19|   1238|   3.0| 3.3423872|
|   425|   1342|   3.5| 1.9365878|
|   593|   1580|   1.5|  2.843863|
|   115|   1580|   4.0|  3.627467|
+------+-------+------+----------+
only showing top 20 rows



**Model apply and see the performance**

In [0]:
alldata=best_model.transform(movie_ratings)
print(alldata)
rmse = evaluator.evaluate(alldata)
print ("RMSE = "+str(rmse))

DataFrame[userId: int, movieId: int, rating: float, prediction: float]
RMSE = 0.6928290986326348


In [0]:
alldata.registerTempTable("alldata")

In [0]:
output=spark.sql("Select * from alldata")
output.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   191|    148|   5.0| 4.9314938|
|   133|    471|   4.0| 2.8902345|
|   597|    471|   2.0|  4.010602|
|   385|    471|   4.0| 3.0239646|
|   436|    471|   3.0| 3.2848468|
|   602|    471|   4.0| 3.7501707|
|    91|    471|   1.0| 2.3306181|
|   409|    471|   3.0| 3.7002354|
|   372|    471|   3.0| 3.2912526|
|   599|    471|   2.5| 2.5064275|
|   603|    471|   4.0|  2.807358|
|   182|    471|   4.5| 3.9165268|
|   218|    471|   4.0| 3.3796198|
|   474|    471|   3.0| 3.4625764|
|   500|    471|   1.0| 2.2667398|
|    57|    471|   3.0|  3.620501|
|   462|    471|   2.5| 2.3731492|
|   387|    471|   3.0|  3.005991|
|   610|    471|   4.0| 3.3667607|
|   217|    471|   2.0| 2.4807806|
+------+-------+------+----------+
only showing top 20 rows



In [0]:
output=spark.sql("Select * from movies join alldata on movies.movieId=alldata.movieId")
output.show()

+-------+--------------------+------+------+-------+------+----------+
|movieId|               title|genres|userId|movieId|rating|prediction|
+-------+--------------------+------+------+-------+------+----------+
|    148|Awfully Big Adven...| Drama|   191|    148|   5.0| 4.9314938|
|    471|Hudsucker Proxy, ...|Comedy|   133|    471|   4.0| 2.8902345|
|    471|Hudsucker Proxy, ...|Comedy|   597|    471|   2.0|  4.010602|
|    471|Hudsucker Proxy, ...|Comedy|   385|    471|   4.0| 3.0239646|
|    471|Hudsucker Proxy, ...|Comedy|   436|    471|   3.0| 3.2848468|
|    471|Hudsucker Proxy, ...|Comedy|   602|    471|   4.0| 3.7501707|
|    471|Hudsucker Proxy, ...|Comedy|    91|    471|   1.0| 2.3306181|
|    471|Hudsucker Proxy, ...|Comedy|   409|    471|   3.0| 3.7002354|
|    471|Hudsucker Proxy, ...|Comedy|   372|    471|   3.0| 3.2912526|
|    471|Hudsucker Proxy, ...|Comedy|   599|    471|   2.5| 2.5064275|
|    471|Hudsucker Proxy, ...|Comedy|   603|    471|   4.0|  2.807358|
|    4

**`Recommend movies to users with id: 575, 232.`**

you can choose some users to recommend the movies

In [0]:
#Generate top 10 movie recommendations for each user
userRecs = best_model.recommendForAllUsers(10)

In [0]:
userRecs.filter(userRecs.userId==575).show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   575|[[5485, 5.9325547...|
+------+--------------------+



In [0]:
userRecs.head()

Row(userId=471, recommendations=[Row(movieId=26171, rating=4.887014389038086), Row(movieId=7096, rating=4.855432510375977), Row(movieId=51931, rating=4.728470325469971), Row(movieId=158966, rating=4.671980381011963), Row(movieId=33649, rating=4.644101142883301), Row(movieId=89904, rating=4.6268086433410645), Row(movieId=4495, rating=4.620540618896484), Row(movieId=92494, rating=4.603967666625977), Row(movieId=26326, rating=4.572039604187012), Row(movieId=1411, rating=4.561243057250977)])

In [0]:
user_rec=userRecs.toPandas()

In [0]:
user_rec.head(5)

Unnamed: 0,userId,recommendations
0,471,"[(26171, 4.887014389038086), (7096, 4.85543251..."
1,463,"[(33649, 5.205497741699219), (3379, 5.02415418..."
2,496,"[(51931, 5.303631782531738), (89904, 5.1255559..."
3,148,"[(26171, 5.268674373626709), (32892, 4.9461045..."
4,540,"[(33649, 5.663629531860352), (3379, 5.61428785..."


In [0]:
movies_pandas=movies_df.toPandas()

In [0]:
rec_movieId=[]
for item in user_rec.loc[user_rec['userId']==575]['recommendations'][502]:
  rec_movieId.append(str(item[0]))
print(rec_movieId)
movies_pandas[movies_pandas['movieId'].isin(rec_movieId)]

['5485', '3266', '306', '183897', '40491', '59018', '60943', '5992', '72171', '170705']


Unnamed: 0,movieId,title,genres
266,306,Three Colors: Red (Trois couleurs: Rouge) (1994),Drama
2453,3266,Man Bites Dog (C'est arrivé près de chez vous)...,Comedy|Crime|Drama|Thriller
3905,5485,Tadpole (2002),Comedy|Drama|Romance
4162,5992,"Hours, The (2002)",Drama|Romance
6051,40491,"Match Factory Girl, The (Tulitikkutehtaan tytt...",Comedy|Drama
6728,59018,"Visitor, The (2007)",Drama|Romance
6813,60943,Frozen River (2008),Drama
7177,72171,Black Dynamite (2009),Action|Comedy
9497,170705,Band of Brothers (2001),Action|Drama|War
9683,183897,Isle of Dogs (2018),Animation|Comedy


In [0]:
rec_movieId=[]
for item in user_rec.loc[user_rec['userId']==232]['recommendations'][96]:
  rec_movieId.append(str(item[0]))
print(rec_movieId)
movies_pandas[movies_pandas['movieId'].isin(rec_movieId)]

['33649', '179135', '74226', '138966', '134796', '7071', '26073', '117531', '184245', '59018']


Unnamed: 0,movieId,title,genres
4747,7071,"Woman Under the Influence, A (1974)",Drama
5448,26073,"Human Condition III, The (Ningen no joken III)...",Drama|War
5906,33649,Saving Face (2004),Comedy|Drama|Romance
6728,59018,"Visitor, The (2007)",Drama|Romance
7249,74226,"Dream of Light (a.k.a. Quince Tree Sun, The) (...",Documentary|Drama
8591,117531,Watermark (2014),Documentary
8896,134796,Bitter Lake (2015),Documentary
8986,138966,Nasu: Summer in Andalusia (2003),Animation
9634,179135,Blue Planet II (2017),Documentary
9688,184245,De platte jungle (1978),Documentary



**Find the similar movies for movie with id: 463, 471**

You can find the similar movies based on the ALS results

In [0]:
#Generate top 10 user recommendations for each movie
movieRecs=model.recommendForAllItems(10)

In [0]:
item_factors = best_model.itemFactors

In [0]:
item_factors.show(3)

+---+--------------------+
| id|            features|
+---+--------------------+
| 10|[1.11374, 0.34687...|
| 20|[0.7336472, -0.77...|
| 30|[1.1324253, -1.73...|
+---+--------------------+
only showing top 3 rows



In [0]:
!pip install koalas

Collecting koalas
[?25l  Downloading https://files.pythonhosted.org/packages/85/e3/e1a24f6e8d110ef03bf4023b75581a42583643b7f38a5245a086175b4053/koalas-0.33.0-py3-none-any.whl (964kB)
[K     |▍                               | 10kB 24.6MB/s eta 0:00:01[K     |▊                               | 20kB 3.1MB/s eta 0:00:01[K     |█                               | 30kB 4.1MB/s eta 0:00:01[K     |█▍                              | 40kB 4.4MB/s eta 0:00:01[K     |█▊                              | 51kB 3.6MB/s eta 0:00:01[K     |██                              | 61kB 4.1MB/s eta 0:00:01[K     |██▍                             | 71kB 4.3MB/s eta 0:00:01[K     |██▊                             | 81kB 4.7MB/s eta 0:00:01[K     |███                             | 92kB 5.1MB/s eta 0:00:01[K     |███▍                            | 102kB 4.8MB/s eta 0:00:01[K     |███▊                            | 112kB 4.8MB/s eta 0:00:01[K     |████                            | 122kB 4.8MB/s eta 0:0

In [0]:
import databricks.koalas as ks

In [0]:
movie_factors = item_factors.to_koalas()

In [0]:
movie_factors.features[0]

[1.1137399673461914,
 0.3468739688396454,
 1.1952674388885498,
 0.2775809168815613,
 1.1339366436004639]

In [0]:
def similar_movies(features, movieId):

  try: 
    target_id_feature = movie_factors.loc[movie_factors.id == movieId].features.to_numpy()[0]
  except:
    return 'There is no movie with id ' + str(movieId)

  similarities = []
  for feature in movie_factors['features'].to_numpy():
    similarity = np.dot(target_id_feature,feature)/(np.linalg.norm(target_id_feature) * np.linalg.norm(feature))
    similarities.append(similarity)
    #cosine similarity:sim(A,B)=cos(theta)=AdotB/(||A||*||B||)
  ks_similarity = ks.DataFrame({'similarity' : similarities}, index = movie_factors.id.to_numpy())
  # top 11 similar movies contain the movie itself with similarity = 1, so I need to remove it. 
  top_11 = ks_similarity.sort_values(by = ['similarity'], ascending = False).head(11)
  joint = top_11.merge(movies_koalas, left_index=True, right_on = 'movieId', how = 'inner')
  joint.sort_values(by = ['similarity'], ascending = False,inplace = True)
  joint.reset_index(inplace = True)
  # take top 10 similar movies
  return joint.loc[1:,['movieId','title','genres']]


In [0]:
similar_movies(features = movie_factors['features'], movieId = 463)

'There is no movie with id 463'

In [0]:
movies_koalas = movies_df.to_koalas()

In [0]:
similar_movies(features = movie_factors['features'], movieId = 471)

Unnamed: 0,movieId,title,genres
1,3841,Air America (1990),Action|Comedy
2,6408,Animals are Beautiful People (1974),Comedy|Documentary
3,8738,"Woman Is a Woman, A (femme est une femme, Une)...",Comedy|Drama|Musical|Romance
4,102123,This Is the End (2013),Action|Comedy
5,43460,Tristram Shandy: A Cock and Bull Story (2005),Comedy|Drama
6,6331,Spellbound (2002),Documentary
7,72733,Invictus (2009),Drama
8,49280,Bobby (2006),Drama
9,412,"Age of Innocence, The (1993)",Drama
10,6996,Highlander II: The Quickening (1991),Action|Sci-Fi


In [0]:
similar_movies(features = movie_factors['features'], movieId = 500)

Unnamed: 0,movieId,title,genres
1,1022,Cinderella (1950),Animation|Children|Fantasy|Musical|Romance
2,60397,Mamma Mia! (2008),Comedy|Musical|Romance
3,1805,Wild Things (1998),Crime|Drama|Mystery|Thriller
4,27478,Ali G Indahouse (2002),Comedy
5,837,Matilda (1996),Children|Comedy|Fantasy
6,210,Wild Bill (1995),Western
7,74,Bed of Roses (1996),Drama|Romance
8,346,Backbeat (1993),Drama|Musical
9,2436,Tea with Mussolini (1999),Comedy|Drama|War
10,45517,Cars (2006),Animation|Children|Comedy


# **Write the report**

**motivation:**
In this notebook, I try to use an Alternating Least Squares (ALS) algorithm with Spark APIs to predict the ratings for the movies in MovieLens small dataset.

**Step 1: Data ETL and Data Exploration**

* Conducted data preprocessing like splitting 
genres into categories
* Conducted exploratory data analysis such as couting the number of movies for each category

**Step 2: Trained the recommendation model based on Spark ALS**
Solved by matrix factorization and alternating least squares(ALS).

**Step 3: Used a grid search to find the optimal hyperparameters**

**Step 4: Model testing**

Check for the RMSE

**Step 5: choose some users to recommend the movies**

**Step 6: Find the similar movies based on the ALS results for certain movies**

**Conclusion:**
The best model has rmse=0.64 on the cross-validation and rmse=0.88 on the test dataset.

The best model has 5 features which describe a movie in 5 dimensions. Based on the feature, I can define the cosine similarity between movies.








