In [1]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math
%matplotlib inline

In [3]:
!pip install findspark

[33mYou are using pip version 8.1.1, however version 9.0.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [21]:
import os
import findspark
# use python 3
os.environ["PYSPARK_PYTHON"] = "python3"
# find your spark directory
findspark.init("/opt/spark-2.2.0",)

## Data ETL and Data Exploration

In [12]:
from pyspark.sql import SparkSession, Column, Row, functions as F 

In [13]:
spark = (
    SparkSession.builder
        .master("local[*]")
        .appName("Spark Movie Recommendation Project")
        .getOrCreate()
)
sc = spark.sparkContext

In [16]:
movies = spark.read.load("./ml-latest-small/movies.csv", format='csv', header = True)
ratings = spark.read.load("./ml-latest-small/ratings.csv", format='csv', header = True)
links = spark.read.load("./ml-latest-small/links.csv",format='csv', header = True)
tags = spark.read.load("./ml-latest-small/tags.csv",format='csv', header = True)

In [17]:
movies.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [18]:
ratings.show(5)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|     31|   2.5|1260759144|
|     1|   1029|   3.0|1260759179|
|     1|   1061|   3.0|1260759182|
|     1|   1129|   2.0|1260759185|
|     1|   1172|   4.0|1260759205|
+------+-------+------+----------+
only showing top 5 rows



In [19]:
links.show(5)

+-------+-------+------+
|movieId| imdbId|tmdbId|
+-------+-------+------+
|      1|0114709|   862|
|      2|0113497|  8844|
|      3|0113228| 15602|
|      4|0114885| 31357|
|      5|0113041| 11862|
+-------+-------+------+
only showing top 5 rows



In [20]:
tags.show(5)

+------+-------+--------------------+----------+
|userId|movieId|                 tag| timestamp|
+------+-------+--------------------+----------+
|    15|    339|sandra 'boring' b...|1138537770|
|    15|   1955|             dentist|1193435061|
|    15|   7478|            Cambodia|1170560997|
|    15|  32892|             Russian|1170626366|
|    15|  34162|         forgettable|1141391765|
+------+-------+--------------------+----------+
only showing top 5 rows



### Q1: The number of Users

In [25]:
# ratings.select('userId').union(tags.select('userId')).distinct().count()
ratings.select('userId').union(tags.select('userId')).distinct().count()

671

### Q2: The number of Movies

In [26]:
ratings.select('movieId').union(tags.select('movieId')).distinct().count()

9125

### Q3:  How many movies are rated by users? List movies not rated before

In [27]:
# number of movies rated
ratings.select('movieId').distinct().count()

9066

In [29]:
# movie not rated
all_movies = ratings.select('movieId').union(tags.select('movieId'))
rated = ratings.select('movieId')
not_rated = all_movies.subtract(rated)
not_rated.show()

+-------+
|movieId|
+-------+
| 144172|
|  94969|
| 132547|
|   7335|
| 110871|
|   5984|
| 131796|
| 132800|
| 128235|
|  39421|
|  82313|
| 111251|
|  42217|
| 132549|
|   8767|
| 161582|
| 155064|
| 111249|
|  48711|
| 132458|
+-------+
only showing top 20 rows



### Q4: List Movie Genres

In [40]:
# DataFrame has no "map" function. Should transform to rdd and then use flatMap
movies.select('genres').rdd.flatMap(lambda x: x[0].split('|')).distinct().collect()

[u'Mystery',
 u'Romance',
 u'IMAX',
 u'Sci-Fi',
 u'Horror',
 u'Film-Noir',
 u'Crime',
 u'Drama',
 u'Fantasy',
 u'Animation',
 u'War',
 u'Western',
 u'Children',
 u'Action',
 u'(no genres listed)',
 u'Comedy',
 u'Documentary',
 u'Musical',
 u'Thriller',
 u'Adventure']

### Q5: Movie for Each Category

In [47]:
# compute number of movies for each category
# generate a genre rdd
genre_rdd = movies.select('genres').rdd
# generate (genre, 1) pair
genre_pair = genre_rdd.flatMap(lambda x: x[0].split('|')).map(lambda x: (x,1))
# count using reduceByKey
genre_pair.reduceByKey(lambda x,y: x+y).collect()

[(u'Mystery', 543),
 (u'Romance', 1545),
 (u'IMAX', 153),
 (u'Sci-Fi', 792),
 (u'Horror', 877),
 (u'Film-Noir', 133),
 (u'Crime', 1100),
 (u'Drama', 4365),
 (u'Fantasy', 654),
 (u'Animation', 447),
 (u'War', 367),
 (u'Western', 168),
 (u'Children', 583),
 (u'Action', 1545),
 (u'(no genres listed)', 18),
 (u'Comedy', 3315),
 (u'Documentary', 495),
 (u'Musical', 394),
 (u'Thriller', 1729),
 (u'Adventure', 1117)]

In [52]:
genre_rdd.flatMap(lambda x: x[0].split('|')).countByValue()

defaultdict(int,
            {u'(no genres listed)': 18,
             u'Action': 1545,
             u'Adventure': 1117,
             u'Animation': 447,
             u'Children': 583,
             u'Comedy': 3315,
             u'Crime': 1100,
             u'Documentary': 495,
             u'Drama': 4365,
             u'Fantasy': 654,
             u'Film-Noir': 133,
             u'Horror': 877,
             u'IMAX': 153,
             u'Musical': 394,
             u'Mystery': 543,
             u'Romance': 1545,
             u'Sci-Fi': 792,
             u'Thriller': 1729,
             u'War': 367,
             u'Western': 168})

# Prepare Data for Training

In [55]:
from pyspark.mllib.recommendation import ALS,MatrixFactorizationModel, Rating

In [56]:
movie_rating = sc.textFile("./ml-latest-small/ratings.csv")

In [57]:
header = movie_rating.take(1)[0]
rating_data = movie_rating.filter(
    lambda line: line!=header).map(
    lambda line: line.split(",")).map(
    lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [61]:
rating_data.take(10)

[(u'1', u'31', u'2.5'),
 (u'1', u'1029', u'3.0'),
 (u'1', u'1061', u'3.0'),
 (u'1', u'1129', u'2.0'),
 (u'1', u'1172', u'4.0'),
 (u'1', u'1263', u'2.0'),
 (u'1', u'1287', u'2.0'),
 (u'1', u'1293', u'2.0'),
 (u'1', u'1339', u'3.5'),
 (u'1', u'1343', u'2.0')]

In [62]:
# 60% training, 20 %validation, 20% test
train, validation, test = rating_data.randomSplit([6,2,2],seed = 7856)

In [63]:
train.cache()

PythonRDD[207] at RDD at PythonRDD.scala:48

In [64]:
test.cache()

PythonRDD[208] at RDD at PythonRDD.scala:48

In [65]:
validation.cache()

PythonRDD[209] at RDD at PythonRDD.scala:48

In [72]:
test_RDD = test.map(lambda x: (x[0], x[1]))

# Training Step

In [68]:
import math
num_iterations = 10
ranks = [4,6,8,10]
reg_params = [0.005,0.01, 0.05, 0.1, 0.2]
all_errors = []
def train_ALS(train_data, validation_data, num_iters, reg_param, ranks, all_errors):
    min_error = float('inf')
    best_rank = -1
    best_regularization = 0
    # grid search to find optimal parameters
    for rank in ranks:
        for reg in reg_param:
            # train ALS model
            model = ALS.train(train_data, rank, iterations = num_iters, lambda_ = reg)
            # predict
            predictions = model.predictAll(validation_data.map(lambda x: (x[0], x[1])))
            predictions = predictions.map(lambda x: ((x[0], x[1]), x[2]))
            # join prediction and ground truth
            rate_and_preds = validation_data.map(lambda x: ((int(x[0]), int(x[1])), float(x[2]))).join(predictions)
            # square error
            error = math.sqrt(rate_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
            all_errors.append(error)
            print ('The rank %s and regularization %s has error %s' % (rank, reg, error))
            if error < min_error:
                min_error = error
                best_rank = rank
                best_regularization = reg
    print ('The best model is rank %s with regularization %s' % (best_rank, best_regularization))
    return all_errors

In [69]:
train_ALS(train, validation, num_iterations,reg_params, ranks, all_errors)

The rank 4 and regularization 0.005 has error 1.14696415621
The rank 4 and regularization 0.01 has error 1.1018654833
The rank 4 and regularization 0.05 has error 1.00468772406
The rank 4 and regularization 0.1 has error 0.95226932484
The rank 4 and regularization 0.2 has error 0.936175448729
The rank 6 and regularization 0.005 has error 1.20137464357
The rank 6 and regularization 0.01 has error 1.17053615502
The rank 6 and regularization 0.05 has error 1.02114171713
The rank 6 and regularization 0.1 has error 0.957807818564
The rank 6 and regularization 0.2 has error 0.93675378278
The rank 8 and regularization 0.005 has error 1.314841173
The rank 8 and regularization 0.01 has error 1.20882921524
The rank 8 and regularization 0.05 has error 1.03694123643
The rank 8 and regularization 0.1 has error 0.964283827021
The rank 8 and regularization 0.2 has error 0.936078451527
The rank 10 and regularization 0.005 has error 1.31805777495
The rank 10 and regularization 0.01 has error 1.24665038

[1.1469641562072765,
 1.1018654832966273,
 1.0046877240641021,
 0.9522693248398854,
 0.9361754487290246,
 1.2013746435713124,
 1.1705361550207616,
 1.0211417171343715,
 0.9578078185639611,
 0.9367537827800781,
 1.3148411730008425,
 1.2088292152403097,
 1.036941236430903,
 0.9642838270205953,
 0.9360784515274764,
 1.3180577749473712,
 1.2466503806582134,
 1.0540927346939004,
 0.9632631974580673,
 0.9364552169162403]

In [70]:
ranks = [6, 8, 10, 12]
reg_params = [0.1, 0.2, 0.3, 0.4]
num_iterations = 15
train_ALS(train, validation, num_iterations,reg_params, ranks, all_errors)

The rank 6 and regularization 0.1 has error 0.955943140101
The rank 6 and regularization 0.2 has error 0.935673147404
The rank 6 and regularization 0.3 has error 0.959392942985
The rank 6 and regularization 0.4 has error 0.998364574792
The rank 8 and regularization 0.1 has error 0.958949698257
The rank 8 and regularization 0.2 has error 0.936553515608
The rank 8 and regularization 0.3 has error 0.959412033037
The rank 8 and regularization 0.4 has error 0.998571137915
The rank 10 and regularization 0.1 has error 0.957418615109
The rank 10 and regularization 0.2 has error 0.935631378553
The rank 10 and regularization 0.3 has error 0.959545624773
The rank 10 and regularization 0.4 has error 0.998777517814
The rank 12 and regularization 0.1 has error 0.95966892676
The rank 12 and regularization 0.2 has error 0.936733968529
The rank 12 and regularization 0.3 has error 0.959297416943
The rank 12 and regularization 0.4 has error 0.998784786965
The best model is rank 10 with regularization 0.2

[1.1469641562072765,
 1.1018654832966273,
 1.0046877240641021,
 0.9522693248398854,
 0.9361754487290246,
 1.2013746435713124,
 1.1705361550207616,
 1.0211417171343715,
 0.9578078185639611,
 0.9367537827800781,
 1.3148411730008425,
 1.2088292152403097,
 1.036941236430903,
 0.9642838270205953,
 0.9360784515274764,
 1.3180577749473712,
 1.2466503806582134,
 1.0540927346939004,
 0.9632631974580673,
 0.9364552169162403,
 0.9559431401008522,
 0.9356731474036093,
 0.9593929429848145,
 0.9983645747915506,
 0.9589496982570764,
 0.9365535156083707,
 0.9594120330373135,
 0.9985711379152917,
 0.9574186151088256,
 0.935631378552555,
 0.9595456247734144,
 0.998777517813549,
 0.9596689267602637,
 0.9367339685287167,
 0.9592974169426627,
 0.9987847869650786]

## The Model Selection and Evaluation

From previous two training steps, we could basically find that the model with 8 latent factors and lambda = 0.2, by iterating over 15 times yields the best result.

In [73]:
best_rank = 8
iterations = 15
reg = 0.2
final_model = ALS.train(train, best_rank, iterations=iterations,
                      lambda_=reg)
predictions = final_model.predictAll(test_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
print ('For testing data the error is %s' % (error))

For testing data the error is 0.9085149741
