### Spark HW2 Moive Recommendation
In this notebook, we will use an Alternating Least Squares (ALS) algorithm with Spark APIs to predict the ratings for the movies in [MovieLens small dataset](https://grouplens.org/datasets/movielens/latest/)

In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math
%matplotlib inline

In [None]:
! wget http://files.grouplens.org/datasets/movielens/ml-latest-small.zip
! unzip ml-latest-small.zip
!ls

--2020-10-30 20:34:31--  http://files.grouplens.org/datasets/movielens/ml-latest-small.zip
Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152
Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 978202 (955K) [application/zip]
Saving to: ‘ml-latest-small.zip’


2020-10-30 20:34:32 (3.74 MB/s) - ‘ml-latest-small.zip’ saved [978202/978202]

Archive:  ml-latest-small.zip
   creating: ml-latest-small/
  inflating: ml-latest-small/links.csv  
  inflating: ml-latest-small/tags.csv  
  inflating: ml-latest-small/ratings.csv  
  inflating: ml-latest-small/README.txt  
  inflating: ml-latest-small/movies.csv  
ml-latest-small  ml-latest-small.zip  sample_data


In [None]:
!ls ml-latest-small/

links.csv  movies.csv  ratings.csv  README.txt	tags.csv


In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz
!tar xf spark-3.0.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Test the spark
df = spark.createDataFrame([{"hello": "world"} for x in range(1000)])
df.show(3, False)



+-----+
|hello|
+-----+
|world|
|world|
|world|
+-----+
only showing top 3 rows



In [None]:
import os
os.environ["PYSPARK_PYTHON"] = "python3"

## Part1: Data ETL and Data Exploration

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("moive analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [None]:
# if using google colab, using this
root = "ml-latest-small/"
# if using databrick
databrick_root = "/FileStore/tables/movielen_small/"


movies_df = spark.read.load(root+"movies.csv", format='csv', header = True)
ratings_df = spark.read.load(root+ "ratings.csv", format='csv', header = True)
links_df = spark.read.load(root+"links.csv", format='csv', header = True)
tags_df = spark.read.load(root+"tags.csv", format='csv', header = True)

In [None]:
movies_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [None]:
ratings_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [None]:
links_df.show(5)

+-------+-------+------+
|movieId| imdbId|tmdbId|
+-------+-------+------+
|      1|0114709|   862|
|      2|0113497|  8844|
|      3|0113228| 15602|
|      4|0114885| 31357|
|      5|0113041| 11862|
+-------+-------+------+
only showing top 5 rows



In [None]:
tags_df.show(5)

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
|     2|  89774|   Boxing story|1445715207|
|     2|  89774|            MMA|1445715200|
+------+-------+---------------+----------+
only showing top 5 rows



In [None]:
tmp1 = ratings_df.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings_df.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

For the users that rated movies and the movies that were rated:
Minimum number of ratings per user is 20
Minimum number of ratings per movie is 1


In [None]:
tmp1 = sum(ratings_df.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = ratings_df.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))

3446 out of 9724 movies are rated by only one user


## Part 1: Spark SQL and OLAP

---



In [None]:
movies_df.registerTempTable("movies")
ratings_df.registerTempTable("ratings")
links_df.registerTempTable("links")
tags_df.registerTempTable("tags")

### Q1: The number of Users

In [None]:
# Using SQL + Spark method
user_count = spark.sql("select count(distinct userId) as user_count from ratings")
# Using pyspark
# user_count = ratings_df.select(["userId"]).distinct().count()
user_count.show()

+----------+
|user_count|
+----------+
|       610|
+----------+



### Q2: The number of Movies

In [None]:
# %sql 
# Using SQL
movie_count = spark.sql("select count(distinct movieId) as movie_count from movies")
movie_count.show()

+-----------+
|movie_count|
+-----------+
|       9742|
+-----------+



### Q3:  How many movies are rated by users? List movies not rated before

#### 3.1 show the number of movie rated

In [None]:
from pyspark.sql.functions import col
# %sql 
# Show number of movies rated
movie_rated = spark.sql("select count(distinct movieId) from movies where movieId in (select distinct movieId from ratings)")
movie_rated.show()


+-----------------------+
|count(DISTINCT movieId)|
+-----------------------+
|                   9724|
+-----------------------+



#### 3.2 list movies that are not rated before

In [None]:
# %sql 
# Show movies that are not rated
movie_not_rated = spark.sql("select distinct movieId, title from movies where movieId not in (select distinct movieId from ratings)")
movie_not_rated.show()

+-------+--------------------+
|movieId|               title|
+-------+--------------------+
|   3338|For All Mankind (...|
|  34482|Browning Version,...|
|   2939|      Niagara (1953)|
|   3456|Color of Paradise...|
|  30892|In the Realms of ...|
|   7792|Parallax View, Th...|
|  32160|Twentieth Century...|
|  26085|Mutiny on the Bou...|
|   1076|Innocents, The (1...|
|   4194|I Know Where I'm ...|
|  32371|Call Northside 77...|
|  25855|Roaring Twenties,...|
|   5721|  Chosen, The (1981)|
|   8765|This Gun for Hire...|
|   6668|Road Home, The (W...|
|   6849|      Scrooge (1970)|
|   7020|        Proof (1991)|
|  85565|  Chalet Girl (2011)|
+-------+--------------------+



#### The amount of movies that are not counted before 

In [None]:
count_not_rated = movie_not_rated.count()
count_not_rated

18

#### I want to check if there are missing values in rating as well here

In [None]:
rating_missing_value = spark.sql("select * from ratings where rating = NULL or timestamp = NULL or userId=NULL or movieId=NULL")
rating_missing_value.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
+------+-------+------+---------+



#### It seems like there is no null values  in the table that we are interested in

In [None]:
# movie_missing_value = spark.sql("select movieId from movies where title = NULL or genres = NULL")
movie_missing_value= movies_df.where(col('title').isNull() | col('genres').isNull())
movie_missing_value.show()

+-------+-----+------+
|movieId|title|genres|
+-------+-----+------+
+-------+-----+------+



### Q4: List Movie Genres

#### Directly list movie genres

In [None]:
# %sql
#Using SQL + PySpark method
# directly list movie genres
movie_genres_df = spark.sql("select distinct genres as genres_count from movies")
movie_genres_df.show()

+--------------------+
|        genres_count|
+--------------------+
|Comedy|Horror|Thr...|
|Adventure|Sci-Fi|...|
|Action|Adventure|...|
| Action|Drama|Horror|
|Action|Animation|...|
|Animation|Childre...|
|Action|Adventure|...|
|    Adventure|Sci-Fi|
|Documentary|Music...|
|Adventure|Childre...|
| Adventure|Animation|
| Musical|Romance|War|
|Action|Adventure|...|
|Adventure|Childre...|
|Comedy|Crime|Horr...|
|Crime|Drama|Fanta...|
|Comedy|Mystery|Th...|
|   Adventure|Fantasy|
|Horror|Romance|Sc...|
|Drama|Film-Noir|R...|
+--------------------+
only showing top 20 rows



#### Split genres and then list movie genres

In [None]:
# splite movie genres and then count
from pyspark.sql.functions import col, explode, split
movie_genres = movies_df.withColumn("splited_genres", explode(split(col("genres"),"[|]")))
splited_genres_df = movie_genres.select(["splited_genres"]).distinct().orderBy("splited_genres",ascending=True)
splited_genres_df.show()


+------------------+
|    splited_genres|
+------------------+
|(no genres listed)|
|            Action|
|         Adventure|
|         Animation|
|          Children|
|            Comedy|
|             Crime|
|       Documentary|
|             Drama|
|           Fantasy|
|         Film-Noir|
|            Horror|
|              IMAX|
|           Musical|
|           Mystery|
|           Romance|
|            Sci-Fi|
|          Thriller|
|               War|
|           Western|
+------------------+



### Q5: Movie for Each Category

In [None]:
# %sql
# Using SQL + Pyspark method
category_movie_count = spark.sql(" with movie_category as (select distinct (explode(split(genres, '[|]'))) as category, * from movies) \
                  select category, count(category) as category_count from movie_category group by category order by category_count desc")
category_movie_count.show()
print("Number of categories: ",len(category_movie_count.collect()))

+------------------+--------------+
|          category|category_count|
+------------------+--------------+
|             Drama|          4361|
|            Comedy|          3756|
|          Thriller|          1894|
|            Action|          1828|
|           Romance|          1596|
|         Adventure|          1263|
|             Crime|          1199|
|            Sci-Fi|           980|
|            Horror|           978|
|           Fantasy|           779|
|          Children|           664|
|         Animation|           611|
|           Mystery|           573|
|       Documentary|           440|
|               War|           382|
|           Musical|           334|
|           Western|           167|
|              IMAX|           158|
|         Film-Noir|            87|
|(no genres listed)|            34|
+------------------+--------------+

Number of categories:  20


## Analysis: 
It seems like the dataset is actually incomplete, despite there is no null values, since there are 34 items labeled as **(no genres listed)**. 

However, Since we will use Non-negative Matrix Factorization method for movie recommendation system, which only cares the userId and movieId, without considering categories, **I don't drop the movies that are labeled as (no genres listed)**

In [None]:
#Using PySpark method
from pyspark.sql.functions import col, explode, split
movie_genres = movies_df.withColumn("splited_genres", explode(split(col("genres"),"[|]")))
splited_genres_df = movie_genres.groupBy(["splited_genres"]).count().orderBy("count",ascending=False)
splited_genres_df = splited_genres_df.withColumnRenamed('count','category_count')
splited_genres_df.show()

+------------------+--------------+
|    splited_genres|category_count|
+------------------+--------------+
|             Drama|          4361|
|            Comedy|          3756|
|          Thriller|          1894|
|            Action|          1828|
|           Romance|          1596|
|         Adventure|          1263|
|             Crime|          1199|
|            Sci-Fi|           980|
|            Horror|           978|
|           Fantasy|           779|
|          Children|           664|
|         Animation|           611|
|           Mystery|           573|
|       Documentary|           440|
|               War|           382|
|           Musical|           334|
|           Western|           167|
|              IMAX|           158|
|         Film-Noir|            87|
|(no genres listed)|            34|
+------------------+--------------+



show the movie with genres labeled as "(no genres listed)"

In [None]:
movie_genres.where(col("splited_genres").isin(["(no genres listed)"])).join(movies_df,"movieId","left").show()

+-------+--------------------+------------------+------------------+--------------------+------------------+
|movieId|               title|            genres|    splited_genres|               title|            genres|
+-------+--------------------+------------------+------------------+--------------------+------------------+
| 114335|   La cravate (1957)|(no genres listed)|(no genres listed)|   La cravate (1957)|(no genres listed)|
| 122888|      Ben-hur (2016)|(no genres listed)|(no genres listed)|      Ben-hur (2016)|(no genres listed)|
| 122896|Pirates of the Ca...|(no genres listed)|(no genres listed)|Pirates of the Ca...|(no genres listed)|
| 129250|   Superfast! (2015)|(no genres listed)|(no genres listed)|   Superfast! (2015)|(no genres listed)|
| 132084| Let It Be Me (1995)|(no genres listed)|(no genres listed)| Let It Be Me (1995)|(no genres listed)|
| 134861|Trevor Noah: Afri...|(no genres listed)|(no genres listed)|Trevor Noah: Afri...|(no genres listed)|
| 141131|    Guardi

## Part2: Spark ALS based approach for training model
We will use an Spark ML to predict the ratings, so let's reload "ratings.csv" using ``sc.textFile`` and then convert it to the form of (user, item, rating) tuples.

In [None]:
ratings_df.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



In [None]:
movie_ratings=ratings_df.drop('timestamp')

In [None]:
# Data type convert
from pyspark.sql.types import IntegerType, FloatType
movie_ratings = movie_ratings.withColumn("userId", movie_ratings["userId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("movieId", movie_ratings["movieId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("rating", movie_ratings["rating"].cast(FloatType()))

In [None]:
movie_ratings.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
|     1|     70|   3.0|
|     1|    101|   5.0|
|     1|    110|   4.0|
|     1|    151|   5.0|
|     1|    157|   5.0|
|     1|    163|   5.0|
|     1|    216|   5.0|
|     1|    223|   3.0|
|     1|    231|   5.0|
|     1|    235|   4.0|
|     1|    260|   5.0|
|     1|    296|   3.0|
|     1|    316|   3.0|
|     1|    333|   5.0|
|     1|    349|   4.0|
+------+-------+------+
only showing top 20 rows



### ALS Model Selection and Evaluation

With the ALS model, we can use a grid search to find the optimal hyperparameters.

In [None]:
# import package
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder

In [None]:
#Create test and train set
(training,test)=movie_ratings.randomSplit([0.8,0.2])

In [None]:
#Create ALS model
len(movie_ratings.select(["userId"]).distinct().collect()),len(movie_ratings.select(["movieId"]).distinct().collect())

(610, 9724)

## Apply ALS (alternative least square) algorithm for matrix factorization

In [None]:
als_model = ALS(rank=10, maxIter=10,coldStartStrategy='drop' )
als_model.setUserCol("userId")
als_model.setItemCol("movieId")
als_model.setRatingCol("rating")
als_model.setPredictionCol("Prediction")
als_model.getUserCol(),als_model.getItemCol(),als_model.getRatingCol(),als_model.getPredictionCol(),

('userId', 'movieId', 'rating', 'Prediction')

In [None]:
#Tune model using ParamGridBuilder
grid = ParamGridBuilder()\
  .baseOn({als_model.predictionCol:"Prediction"})\
  .addGrid(als_model.regParam,[0.1,0.5,0.8])\
  .addGrid(als_model.rank,[5,10,15])\
  .build()

In [None]:
# Define evaluator as RMSE
evaluator= RegressionEvaluator(predictionCol="Prediction",labelCol="rating",metricName="rmse")

In [None]:
# Build Cross validation 
cv = CrossValidator(estimator=als_model, 
            estimatorParamMaps=grid,
            evaluator=evaluator,
            numFolds=5,seed=2020,parallelism=2)

In [None]:
#Fit ALS model to training data
cvModel = cv.fit(training)

In [None]:
#Extract best model from the tuning exercise using ParamGridBuilder
best_model = cvModel.bestModel
best_model

ALSModel: uid=ALS_d2f750307fda, rank=5

### Model testing
And finally, make a prediction and check the testing error.

In [None]:
#Generate predictions and evaluate using RMSE
predictions=best_model.transform(test)
rmse = evaluator.evaluate(predictions)

In [None]:
#Print evaluation metrics and model parameters
print ("RMSE = "+str(rmse))
print ("**Best Model**")
print (" Rank:",best_model.rank), 
print (" MaxIter:", best_model._java_obj.parent().getMaxIter()), 
print (" RegParam:",best_model._java_obj.parent().getRegParam()), 

RMSE = 0.8798379591077872
**Best Model**
 Rank: 5
 MaxIter: 10
 RegParam: 0.1


(None,)

In [None]:
predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|Prediction|
+------+-------+------+----------+
|    91|    471|   1.0| 2.5418828|
|   409|    471|   3.0| 3.5666108|
|   218|    471|   4.0|  3.139399|
|   387|    471|   3.0|  3.042109|
|   217|    471|   2.0|  2.754712|
|   520|    471|   5.0| 3.9758658|
|   312|    471|   4.0| 3.8095398|
|   608|    471|   1.5| 2.4980073|
|   599|   1088|   2.5| 2.4698544|
|   177|   1088|   3.5| 3.3751724|
|   132|   1088|   4.0|  2.847704|
|   387|   1088|   1.5| 2.3881683|
|   381|   1088|   3.5| 3.6309452|
|   307|   1088|   3.0| 2.3773305|
|    51|   1088|   4.0| 3.5448327|
|   525|   1088|   4.5|  3.256174|
|   385|   1238|   3.0|  3.264252|
|   587|   1238|   4.0| 3.1346846|
|   216|   1238|   5.0| 3.4526994|
|   156|   1238|   4.0| 3.8308578|
+------+-------+------+----------+
only showing top 20 rows



### Model apply and see the performance

In [None]:
alldata=best_model.transform(movie_ratings)
rmse = evaluator.evaluate(alldata)
print ("RMSE = "+str(rmse))

RMSE = 0.6936810208034438


In [None]:
alldata.registerTempTable("alldata")

In [None]:
result = spark.sql("select * from alldata")
result.show()

+------+-------+------+----------+
|userId|movieId|rating|Prediction|
+------+-------+------+----------+
|   191|    148|   5.0|  4.936762|
|   133|    471|   4.0| 3.2076418|
|   597|    471|   2.0| 3.6909595|
|   385|    471|   4.0| 3.3912919|
|   436|    471|   3.0|  3.546002|
|   602|    471|   4.0|  3.164786|
|    91|    471|   1.0| 2.5418828|
|   409|    471|   3.0| 3.5666108|
|   372|    471|   3.0| 3.1000493|
|   599|    471|   2.5| 2.8947043|
|   603|    471|   4.0|  3.817274|
|   182|    471|   4.5|  3.962853|
|   218|    471|   4.0|  3.139399|
|   474|    471|   3.0| 3.8796182|
|   500|    471|   1.0| 2.4193392|
|    57|    471|   3.0| 3.7525496|
|   462|    471|   2.5| 2.8512473|
|   387|    471|   3.0|  3.042109|
|   610|    471|   4.0| 3.4810367|
|   217|    471|   2.0|  2.754712|
+------+-------+------+----------+
only showing top 20 rows



In [None]:
result = spark.sql("select * from movies join alldata on movies.movieId=alldata.movieId")
result.show()

+-------+--------------------+------+------+-------+------+----------+
|movieId|               title|genres|userId|movieId|rating|Prediction|
+-------+--------------------+------+------+-------+------+----------+
|    148|Awfully Big Adven...| Drama|   191|    148|   5.0|  4.936762|
|    471|Hudsucker Proxy, ...|Comedy|   133|    471|   4.0| 3.2076418|
|    471|Hudsucker Proxy, ...|Comedy|   597|    471|   2.0| 3.6909595|
|    471|Hudsucker Proxy, ...|Comedy|   385|    471|   4.0| 3.3912919|
|    471|Hudsucker Proxy, ...|Comedy|   436|    471|   3.0|  3.546002|
|    471|Hudsucker Proxy, ...|Comedy|   602|    471|   4.0|  3.164786|
|    471|Hudsucker Proxy, ...|Comedy|    91|    471|   1.0| 2.5418828|
|    471|Hudsucker Proxy, ...|Comedy|   409|    471|   3.0| 3.5666108|
|    471|Hudsucker Proxy, ...|Comedy|   372|    471|   3.0| 3.1000493|
|    471|Hudsucker Proxy, ...|Comedy|   599|    471|   2.5| 2.8947043|
|    471|Hudsucker Proxy, ...|Comedy|   603|    471|   4.0|  3.817274|
|    4

In [None]:
best_model.save("/content/model")

## Recommend moive to users with id: 575, 232. 
you can choose some users to recommend the moives

In [None]:
from pyspark.sql.types import *
def get_recommendation(id_type="userId",id =None, numItems = 5):
  """
  id_type: type of id to input:  userId, movieId
  id: movie/ user id to query, either string or integer type
  numItems: number of Items to recommend in each query
  """
  if id_type =="userId" :
    # User-Id based 
    recommendation = best_model.recommendForAllUsers(numItems)
    recommended_movies_df = recommendation.where(col("userId")==int(id)).toPandas()
  elif id_type =="movieId" :
    # Movie-Id based
    recommendation = best_model.recommendForAllItems(numItems)
    recommended_movies_df = recommendation.where(col("movieId")==int(id)).toPandas()  
  else:
    print("id_type should be either 'userId' or 'movieId'")
    print("id should integer")
    return None

  #make sure there are movies recommended
  if len(recommended_movies_df) >0:
    movie_recommended = recommended_movies_df.iloc[0].loc["recommendations"]
    
    schema = StructType([
          StructField('movieId', IntegerType(), False),
          StructField('Prediction', FloatType(), False)
      ])
    movies = spark.createDataFrame(movie_recommended,schema)
    print("Movies recommended to User:%d"%int(id))
    movies = movies.join(movies_df,'movieId','left').toPandas()
  else:
    print("No movies for "+id_type+ ": %d"%int(id))
    movies = None
  return movies




In [None]:
#Query User Id: 575
get_recommendation(id_type="userId", id =575)

Movies recommended to User:575


Unnamed: 0,movieId,Prediction,title,genres
0,68945,5.431108,Neon Genesis Evangelion: Death & Rebirth (Shin...,Action|Animation|Mystery|Sci-Fi
1,3379,5.431108,On the Beach (1959),Drama
2,8477,5.417279,"Jetée, La (1962)",Romance|Sci-Fi
3,4256,5.29441,"Center of the World, The (2001)",Drama
4,33649,5.286087,Saving Face (2004),Comedy|Drama|Romance


In [None]:
#Query User Id: 232
get_recommendation(id_type="userId", id =232)

Movies recommended to User:232


Unnamed: 0,movieId,Prediction,title,genres
0,68945,4.871829,Neon Genesis Evangelion: Death & Rebirth (Shin...,Action|Animation|Mystery|Sci-Fi
1,3379,4.871829,On the Beach (1959),Drama
2,33649,4.734941,Saving Face (2004),Comedy|Drama|Romance
3,117531,4.689919,Watermark (2014),Documentary
4,84273,4.689919,Zeitgeist: Moving Forward (2011),Documentary


## Using interact widget to query recommendations
You can type the movie Id , or userId you want to query into the "id" box and search recommended movies

In [None]:
from ipywidgets import interact
interact(get_recommendation,id_type= ["userId","movieId"], id= '575',numItems= [1,5,10,15])

interactive(children=(Dropdown(description='id_type', options=('userId', 'movieId'), value='userId'), Text(val…

<function __main__.get_recommendation>

## Find the similar moives for moive with id: 463, 471
You can find the similar moives based on the ALS results

In [None]:
get_recommendation(id_type="movieId", id =463)

No movies for movieId: 463


In [None]:
get_recommendation(id_type="movieId", id =471)

Movies recommended to User:471


Unnamed: 0,movieId,Prediction,title,genres
0,461,5.806471,Go Fish (1994),Drama|Romance
1,99,5.278326,Heidi Fleiss: Hollywood Madam (1995),Documentary
2,258,5.172138,"Kid in King Arthur's Court, A (1995)",Adventure|Children|Comedy|Fantasy|Romance
3,224,5.09114,Don Juan DeMarco (1995),Comedy|Drama|Romance
4,108,4.8042,Catwalk (1996),Documentary


### We can see that there is a movie with title and genres equal to "None". The reason is that this movie id from rating dataframe doesn't exist in movie dataframe from movie.csv.

## Using interact widget to load recommendations

In [None]:
interact(get_recommendation,id_type= ["userId","movieId"], id= '471',numItems= [1,5,10,15])

interactive(children=(Dropdown(description='id_type', options=('userId', 'movieId'), value='userId'), Text(val…

<function __main__.get_recommendation>

In [None]:
from pyspark.sql.functions import *

def cos_similarity(id1,id2,type="userId"):
  """Implementation of cosine similarity
  """
  similarity = 0.0
  if type =="userId":
    # If input ids are user Id, Compute similarity between users
    user1 = movie_ratings.where((col('userId')==id1)).select(["movieId"]).distinct()
    user2 = movie_ratings.where((col('userId')==id2)).select(["movieId"]).distinct()
    N1 = user1.count()
    N2 = user2.count()
    
    
    intersection = user1.intersect(user2).count()
    if N1!=0 and N2!= 0:
      similarity = intersection/np.sqrt(N1*N2)
    pass
  elif type == "movieId":
    # If input ids are movies Id, Compute similarity between movies
    movie1 = movie_ratings.where((col('movieId')==id1)).select(["userId"]).distinct()
    movie2 = movie_ratings.where((col('movieId')==id2)).select(["userId"]).distinct()
    N1 = movie1.count()
    N2 = movie2.count()
    
    
    intersection = movie1.intersect(movie2).count()
    # print(N1,N2, intersection)
    if N1!=0 and N2!= 0:
      similarity = intersection/np.sqrt(N1*N2)
      
    pass
  else:

    pass
  return similarity
cos_similarity(471, 464, type="movieId"), cos_similarity(575, 232, type="userId")
# movie_ratings.where((col('movieId')==464)).select(["userId"]).distinct().count()

(0.35, 0.028383445970423818)

#### An Updated version of recommendation, which can compute the cosine similarity between two movies

In [None]:
def get_recommendation(id_type="userId",id =None, numItems = 5):
  """
  id_type: type of id to input:  userId, movieId
  id: movie/ user id to query, either string or integer type
  numItems: number of Items to recommend in each query
  """
  if id_type =="userId" :
    # User-Id based 
    recommendation = best_model.recommendForAllUsers(numItems)
    recommended_movies_df = recommendation.where(col("userId")==int(id)).toPandas()
    
  elif id_type =="movieId" :
    # Movie-Id based
    recommendation = best_model.recommendForAllItems(numItems)
    recommended_movies_df = recommendation.where(col("movieId")==int(id)).toPandas()  
  else:
    print("id_type should be either 'userId' or 'movieId'")
    print("id should integer")
    return None

  #make sure there are movies recommended
  if len(recommended_movies_df) >0:
    movie_recommended = recommended_movies_df.iloc[0].loc["recommendations"]
    
    schema = StructType([
          StructField('movieId', IntegerType(), False),
          StructField('Prediction', FloatType(), False)
      ])
    movies = spark.createDataFrame(movie_recommended,schema)
    print("Movies recommended to User:%d"%int(id))
    movies = movies.join(movies_df,'movieId','left').toPandas()

    if id_type =="movieId":
      id_list = movies["movieId"].tolist()
      cos_similarity_ls = []
      for movieId in id_list:
        cos_similarity_ls.append(cos_similarity(movieId, id, type="movieId"))
      movies["cos_similarity"]= cos_similarity_ls

  else:
    print("No movies for "+id_type+ ": %d"%int(id))
    movies = None
  return movies
interact(get_recommendation,id_type= ["userId","movieId"], id= '471',numItems= [1,5,10,15])

interactive(children=(Dropdown(description='id_type', options=('userId', 'movieId'), value='userId'), Text(val…

<function __main__.get_recommendation>

## Write the report 
#### **motivation**

Personally, watching movies and animes is one of my hobbies and I want to explore the potential movies that may attract me. For this purpose, movie recommendation system could be a wonderful tool to help me figure out intertesting movies and enjoy my life. 

For business insight, as the amount of movies,videos and the amount of movie platforms increase and there are diversity of tastes of users, it is difficult for users to explore the movies they prefer. A good movie recommendation system can help companies provide users with better movie watching experience and attract more users to their platform. Hence it is necessary to build a movie recommendation system.

#### **step1: Data Collection, Extract and Data Analysis**
The dataset is MovieLens small dataset in 2018. There are four csv files in the dataset, including movies.csv, ratings.csv, tags.csv, links.csv. In this project, I only use movies.csv and ratings.csv as they provides the information I need in constructing recommendation system, such as, movie id, user id, ratings, etc. Since all columns in datasets are in string data type, I need to convert some of them to numerical data type as we want in preprocessing step.
In addition, I also find that the dataset is incomplete as 18 movies are not rated by users.

#### **step2: Data Preprocessing**
In data preprocessing step, I convert the movieId, userId, rating from string type to Integer type and float type so that I can use the numerical features to feed the machine learning model.
During data analysis, I find that there are some genres and titles of movies are missing. However, since I only care the userId and movieId and rating score during recommendation, I choose to keep those movies missing genres and titles here.

In addition, in order to train the model and test model, I split the whole dataset to 80% training data and 20% testing data randomly.

#### **step3: Construct Movie Recommendation System**
In this step, I use Alternative Least Square (ALS) method for non-negative matrix factorization to construct a movie recommendation system. In training step, the inputs to the model are userId, movieId and rating score and ALS model will decompose user-item-rating matrix into two non-negative matrices: user-potential topic dense matrix, potential topic-movie dense matrix. 
The ALS model outputs predictions of rating of given user-item pair.

In order to find a better Model, I use grid search method and cross-validation method to tune the model parameters, such as rank of matrix, regularization rate, iteration number,etc.

#### **step4: Model Evaluation**  
In evaluation and testing step, I use  Root Mean Square Error (RMSE) to compute the error rate of ALS model,between rating scores and prediction score. Smaller RMSE is, better performance the system has.
In this step, we can see that the RMSE score on test set is 0.868 and 0.69 on all data. It seems like the model is over-fitting to the training data.

Moreover, Matrix factorization method for recommendation system works well on users, movie items that it knows only. For new items and users that have no rating, it is hard to learn to predict the ratings. This is one drawback in ALS method. Hence we can see that there is no movie recommended when we input movieId=463 into system.

#### **step5: Deploy recommendation System**
After I train and test the recommendation system, I use an ipywidget to simply deploy the model and allow user to change the userid and movieid to find recommended movies

#### **Output and conclusion**
In conclusion, I designed an movie recommendation system based on MovieLen small dataset using matrix factorization method and also tune the model using grid search, cross-validation method. After that, I used RMSE error to evaluate the model and found that ALS model is a little over-fitting and could have problem when I input users/movies that it has not seen before. Moreover, this method doesn't utilize the whole dataset efficiently as it uses userId, movieId, rating features only. Finally, I deploy the model using interact widget to let user use the system easily.
