In [1]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math
%matplotlib inline

In [2]:
#!pip install findspark



In [2]:
import os
import findspark
os.environ["PYSPARK_PYTHON"] = "python3"
findspark.init("../../spark-2.2.0-bin-hadoop2.7",)

## Data ETL and Data Exploration

In [3]:
from pyspark.sql import SparkSession, Column, Row, functions as F 

In [4]:
spark = (
    SparkSession.builder
        .master("local[*]")
        .appName("Spark Movie Recommendation Project")
        .getOrCreate()
)
sc = spark.sparkContext

In [33]:
movies = spark.read.load("../data/ml-latest-small/movies.csv", format='csv', header = True)
ratings = spark.read.load("../data/ml-latest-small/ratings.csv", format='csv', header = True)
links = spark.read.load("../data/ml-latest-small/links.csv",format='csv', header = True)
tags = spark.read.load("../data/ml-latest-small/tags.csv",format='csv', header = True)

In [34]:
movies.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [35]:
ratings.show(5)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|     31|   2.5|1260759144|
|     1|   1029|   3.0|1260759179|
|     1|   1061|   3.0|1260759182|
|     1|   1129|   2.0|1260759185|
|     1|   1172|   4.0|1260759205|
+------+-------+------+----------+
only showing top 5 rows



In [36]:
links.show(5)

+-------+-------+------+
|movieId| imdbId|tmdbId|
+-------+-------+------+
|      1|0114709|   862|
|      2|0113497|  8844|
|      3|0113228| 15602|
|      4|0114885| 31357|
|      5|0113041| 11862|
+-------+-------+------+
only showing top 5 rows



In [37]:
tags.show(5)

+------+-------+--------------------+----------+
|userId|movieId|                 tag| timestamp|
+------+-------+--------------------+----------+
|    15|    339|sandra 'boring' b...|1138537770|
|    15|   1955|             dentist|1193435061|
|    15|   7478|            Cambodia|1170560997|
|    15|  32892|             Russian|1170626366|
|    15|  34162|         forgettable|1141391765|
+------+-------+--------------------+----------+
only showing top 5 rows



### Q1: The number of Users

In [50]:
ratings.select('userId').union(tags.select('userId')).distinct().count()

671

### Q2: The number of Movies

In [49]:
ratings.select('movieId').union(tags.select('movieId')).distinct().count()

9125

### Q3:  How many movies are rated by users? List movies not rated before

In [51]:
num_movies_rated = ratings.select('movieId').distinct().count()
num_movies_rated

9066

In [55]:
all_movies = ratings.select('movieId').union(tags.select('movieId'))
rated = ratings.select('movieId')
not_rated = all_movies.subtract(rated)
not_rated.distinct().show()

+-------+
|movieId|
+-------+
| 144172|
|  94969|
| 132547|
|   7335|
| 110871|
|   5984|
| 131796|
| 132800|
| 128235|
|  39421|
|  82313|
| 111251|
|  42217|
| 132549|
|   8767|
| 161582|
| 155064|
| 111249|
|  48711|
| 132458|
+-------+
only showing top 20 rows



### Q4: List Movie Genres

In [61]:
#movies.select('genres').map(lambda x: x[0].split('|')) ????? does not work
movies.select('genres').distinct().show()

+--------------------+
|              genres|
+--------------------+
|Comedy|Horror|Thr...|
|Adventure|Sci-Fi|...|
|Action|Adventure|...|
| Action|Drama|Horror|
|Comedy|Drama|Horr...|
|Action|Animation|...|
|Animation|Childre...|
|Action|Adventure|...|
| Adventure|Animation|
|    Adventure|Sci-Fi|
|Documentary|Music...|
|Adventure|Childre...|
|  Documentary|Sci-Fi|
| Musical|Romance|War|
|Action|Adventure|...|
|Adventure|Childre...|
|Crime|Drama|Fanta...|
|Comedy|Mystery|Th...|
|   Adventure|Fantasy|
|Action|Animation|...|
+--------------------+
only showing top 20 rows



### Q5: Movie for Each Category

# Prepare Data for Training

In [11]:
from pyspark.mllib.recommendation import ALS,MatrixFactorizationModel, Rating

In [12]:
movie_rating = sc.textFile("../data/ml-latest-small/ratings.csv")

In [13]:
header = movie_rating.take(1)[0]
rating_data = movie_rating.filter(lambda line: line!=header).map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

In [14]:
rating_data.take(3)

[('1', '31', '2.5'), ('1', '1029', '3.0'), ('1', '1061', '3.0')]

In [15]:
train, validation, test = rating_data.randomSplit([6,2,2],seed = 7856)

In [16]:
train.cache()

PythonRDD[52] at RDD at PythonRDD.scala:48

In [17]:
test.cache()

PythonRDD[53] at RDD at PythonRDD.scala:48

In [18]:
validation.cache()

PythonRDD[54] at RDD at PythonRDD.scala:48

In [27]:
test_RDD = test.map(lambda x: (x[0], x[1]))

# Training Step

In [19]:
num_iterations = 10
ranks = [4,6,8,10]
reg_params = [0.005,0.01, 0.05, 0.1, 0.2]
all_errors = []
def train_ALS(train_data, validation_data, 
              num_iters, reg_param, ranks, all_errors):
    min_error = float('inf')
    best_rank = -1
    best_regularization = 0
    for rank in ranks:
        for reg in reg_param:
            model = ALS.train(train_data, rank, iterations = num_iters, lambda_ = reg)
            predictions = model.predictAll(validation_data.map(lambda x: (x[0], x[1])))
            predictions = predictions.map(lambda x: ((x[0], x[1]), x[2]))
            rate_and_preds = validation_data.map(lambda x: ((int(x[0]), int(x[1])), float(x[2]))).join(predictions)
            error = math.sqrt(rate_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
            all_errors.append(error)
            print ('The rank %s and regularization %s has error %s' % (rank, reg, error))
            if error < min_error:
                min_error = error
                best_rank = rank
                best_regularization = reg
    print ('The best model is rank %s with regularization %s' % (best_rank, best_regularization))
    return all_errors

In [20]:
train_ALS(train, validation, num_iterations,reg_params, ranks, all_errors)

The rank 4 and regularization 0.005 has error 1.1515068374101347
The rank 4 and regularization 0.01 has error 1.101216032563203
The rank 4 and regularization 0.05 has error 0.9939169633425827
The rank 4 and regularization 0.1 has error 0.9389824403856298
The rank 4 and regularization 0.2 has error 0.928869759990465
The rank 6 and regularization 0.005 has error 1.1892308206213413
The rank 6 and regularization 0.01 has error 1.1530171720004954
The rank 6 and regularization 0.05 has error 1.0199963148665374
The rank 6 and regularization 0.1 has error 0.9452470506622955
The rank 6 and regularization 0.2 has error 0.9246506294296363
The rank 8 and regularization 0.005 has error 1.292682127181308
The rank 8 and regularization 0.01 has error 1.2328705177018204
The rank 8 and regularization 0.05 has error 1.0330165946100194
The rank 8 and regularization 0.1 has error 0.9534565256850409
The rank 8 and regularization 0.2 has error 0.9254819148717985
The rank 10 and regularization 0.005 has error

[1.1515068374101347,
 1.101216032563203,
 0.9939169633425827,
 0.9389824403856298,
 0.928869759990465,
 1.1892308206213413,
 1.1530171720004954,
 1.0199963148665374,
 0.9452470506622955,
 0.9246506294296363,
 1.292682127181308,
 1.2328705177018204,
 1.0330165946100194,
 0.9534565256850409,
 0.9254819148717985,
 1.3326478797170918,
 1.2550113622933003,
 1.0385154080658179,
 0.9522246802278894,
 0.9245870119771291]

In [21]:
ranks = [6, 8, 10, 12]
reg_params = [0.1, 0.2, 0.3, 0.4]
num_iterations = 15
train_ALS(train, validation, num_iterations,reg_params, ranks, all_errors)

The rank 6 and regularization 0.1 has error 0.9548362655290386
The rank 6 and regularization 0.2 has error 0.9242203211600902
The rank 6 and regularization 0.3 has error 0.9485718236156073
The rank 6 and regularization 0.4 has error 0.9884530799657907
The rank 8 and regularization 0.1 has error 0.948657559865619
The rank 8 and regularization 0.2 has error 0.9241639812185355
The rank 8 and regularization 0.3 has error 0.9487247530067183
The rank 8 and regularization 0.4 has error 0.9885909658545506
The rank 10 and regularization 0.1 has error 0.9573066938701698
The rank 10 and regularization 0.2 has error 0.9270865965905998
The rank 10 and regularization 0.3 has error 0.948070300396319
The rank 10 and regularization 0.4 has error 0.9881652388367489
The rank 12 and regularization 0.1 has error 0.9519413609712789
The rank 12 and regularization 0.2 has error 0.9260401598318719
The rank 12 and regularization 0.3 has error 0.9482798930538371
The rank 12 and regularization 0.4 has error 0.988

[1.1515068374101347,
 1.101216032563203,
 0.9939169633425827,
 0.9389824403856298,
 0.928869759990465,
 1.1892308206213413,
 1.1530171720004954,
 1.0199963148665374,
 0.9452470506622955,
 0.9246506294296363,
 1.292682127181308,
 1.2328705177018204,
 1.0330165946100194,
 0.9534565256850409,
 0.9254819148717985,
 1.3326478797170918,
 1.2550113622933003,
 1.0385154080658179,
 0.9522246802278894,
 0.9245870119771291,
 0.9548362655290386,
 0.9242203211600902,
 0.9485718236156073,
 0.9884530799657907,
 0.948657559865619,
 0.9241639812185355,
 0.9487247530067183,
 0.9885909658545506,
 0.9573066938701698,
 0.9270865965905998,
 0.948070300396319,
 0.9881652388367489,
 0.9519413609712789,
 0.9260401598318719,
 0.9482798930538371,
 0.9880315452108321]

## The Model Selection and Evaluation

From previous two training steps, we could basically find that the model with 8 latent factors and lambda = 0.2, by iterating over 15 times yields the best result.

In [28]:
best_rank = 8
iterations = 15
reg = 0.2
final_model = ALS.train(train, best_rank, iterations=iterations,
                      lambda_=reg)
predictions = final_model.predictAll(test_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
print ('For testing data the error is %s' % (error))

For testing data the error is 0.9180982046078183
