<a href="https://colab.research.google.com/github/wei-yuan/recommender-systems/blob/master/Movie_Recommendation_by_Collaborative_Filtering.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Task: Collaborative Filtering


## Environment Set-Up
Set up Spark on the Colab environment.

In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 35 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 48.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=378c4d88e0976238dd052b1956f28950e46721db33cf99885fb35eadc7ddfc08
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1
The following additional packages will be installed:
  openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-m

## Prepare Dataset
Download the popular movie ranking dataset [MovieLens](https://grouplens.org/datasets/movielens/).

In [2]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

In [3]:
id='1QtPy_HuIMSzhtYllT3-WeM3Sqg55wK_D'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('MovieLens.training')

id='1ePqnsQTJRRvQcBoF2EhoPU8CU1i5byHK'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('MovieLens.test')

id='1ncUBWdI5AIt3FDUJokbMqpHD2knd5ebp'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('MovieLens.item')

Import some of the common libraries needed for our task.

In [4]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

Initialize the Spark context.

In [5]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

## Data loading
Load the ratings data in a 80%-20% training/test split, while the items dataframe contains the movie titles associated to the item identifiers.

In [6]:
schema_ratings = StructType([
    StructField("user_id", IntegerType(), False),
    StructField("item_id", IntegerType(), False),
    StructField("rating", IntegerType(), False),
    StructField("timestamp", IntegerType(), False)])

schema_items = StructType([
    StructField("item_id", IntegerType(), False),
    StructField("movie", StringType(), False)])

training = spark.read.option("sep", "\t").csv("MovieLens.training", header=False, schema=schema_ratings)
test = spark.read.option("sep", "\t").csv("MovieLens.test", header=False, schema=schema_ratings)
items = spark.read.option("sep", "|").csv("MovieLens.item", header=False, schema=schema_items)

In [7]:
training.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: integer (nullable = true)



In [46]:
training.show(n = 10)

+-------+-------+------+---------+
|user_id|item_id|rating|timestamp|
+-------+-------+------+---------+
|      1|      1|     5|874965758|
|      1|      2|     3|876893171|
|      1|      3|     4|878542960|
|      1|      4|     3|876893119|
|      1|      5|     3|889751712|
|      1|      7|     4|875071561|
|      1|      8|     1|875072484|
|      1|      9|     5|878543541|
|      1|     11|     2|875072262|
|      1|     13|     5|875071805|
+-------+-------+------+---------+
only showing top 10 rows



In [8]:
items.printSchema()

root
 |-- item_id: integer (nullable = true)
 |-- movie: string (nullable = true)



In [47]:
items.show(n = 10)

+-------+--------------------+
|item_id|               movie|
+-------+--------------------+
|      1|    Toy Story (1995)|
|      2|    GoldenEye (1995)|
|      3|   Four Rooms (1995)|
|      4|   Get Shorty (1995)|
|      5|      Copycat (1995)|
|      6|Shanghai Triad (Y...|
|      7|Twelve Monkeys (1...|
|      8|         Babe (1995)|
|      9|Dead Man Walking ...|
|     10|  Richard III (1995)|
+-------+--------------------+
only showing top 10 rows



## Exploration
What is the number of ratings in the training and test dataset? 

In [19]:
print(f"The number of ratings in training dataset: {training.count()}")
print(f"The number of ratings in testing dataset: {test.count()}")

The number of ratings in training dataset: 80000
The number of ratings in testing dataset: 20000


How many movies are in our dataset?

In [21]:
print(f"The number of movies: {items.count()}")

The number of movies: 1682


## Model Training
Using the training set, train a model with the Alternating Least Squares method available in the Spark MLlib: https://spark.apache.org/docs/latest/ml-collaborative-filtering.html

In [22]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [25]:
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="user_id", itemCol="item_id", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

Now compute the RMSE on the test dataset.


In [28]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {str(rmse)}")

Root-mean-square error = 1.113002577070955


## Model Inference
Using the trained model to produce the top-K recommendations for each user.

In [58]:
K = 10
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(K)
userRecs.show(n = 5)



+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|      1|[{1434, 6.7727833...|
|      3|[{1512, 9.196265}...|
|      5|[{767, 8.126447},...|
|      6|[{1643, 6.1858172...|
|      9|[{793, 10.151863}...|
+-------+--------------------+
only showing top 5 rows



In [113]:
df = userRecs.toPandas()
df.head(5)

Unnamed: 0,user_id,recommendations
0,1,"[(1434, 6.772783279418945), (1368, 5.781099796..."
1,3,"[(1512, 9.19626522064209), (536, 8.57999134063..."
2,5,"[(767, 8.126446723937988), (1434, 7.6733322143..."
3,6,"[(1643, 6.185817241668701), (1434, 5.793748855..."
4,9,"[(793, 10.151863098144531), (1475, 10.09850406..."


In [114]:
# creating a fast 'item_id' to 'movie' mapping 
itemsDF = items.toPandas()
mappingDict = dict()
# faster iterating way for reading column data
# see How to iterate over rows in a DataFrame in Pandas (https://stackoverflow.com/questions/16476924/how-to-iterate-over-rows-in-a-dataframe-in-pandas)
for row in zip(itemsDF["item_id"], itemsDF["movie"]):
    mappingDict[row[0]] = row[1]

In [115]:
df["recommendations"] = [[mappingDict[pair[0]] for pair in row] for row in df["recommendations"]]

In [117]:
df.style.hide_index()  # for python version >= 3.7
df.head(5)

Unnamed: 0,user_id,recommendations
0,1,"[Shooting Fish (1997), Mina Tannenbaum (1994),..."
1,3,"[World of Apu, The (Apur Sansar) (1959), Ponet..."
2,5,"[Addiction, The (1995), Shooting Fish (1997), ..."
3,6,"[Angel Baby (1995), Shooting Fish (1997), Bitt..."
4,9,"[Crooklyn (1994), Bhaji on the Beach (1993), A..."
