In [1]:
import numpy as np
import pandas as pd
import seaborn as sns
import math


In [2]:
#!pip install findspark

In [3]:
import os
os.environ["PYSPARK_PYTHON"] = "python3"


## Data ETL and Data Exploration

In [5]:
from pyspark.sql import SparkSession, Column, Row, functions as F

In [6]:
spark = (
    SparkSession.builder
        .master("local[*]")
        .appName("Spark Movie Recommendation Project")
        .getOrCreate()
)
sc = spark.sparkContext

In [7]:
movies = spark.read.load("FileStore/tables/movies.csv", format='csv', header = True)
ratings = spark.read.load("FileStore/tables/ratings.csv", format='csv', header = True)
links = spark.read.load("FileStore/tables/links.csv",format='csv', header = True)
tags = spark.read.load("FileStore/tables/tags.csv",format='csv', header = True)

In [8]:
movies.show(5)

In [9]:
ratings.show(5)

In [10]:
links.show(5)

In [11]:
tags.show(5)

### Q1: The number of Users

In [13]:
ratings.select('userId').union(tags.select('userId')).distinct().count()

### Q2: The number of Movies

In [15]:
ratings.select('movieId').union(tags.select('movieId')).distinct().count()

### Q3:  How many movies are rated by users? List movies not rated before

In [17]:
num_movies_rated = ratings.select('movieId').distinct().count()
num_movies_rated

In [18]:
all_movies = ratings.select('movieId').union(tags.select('movieId'))
rated = ratings.select('movieId')
not_rated = all_movies.subtract(rated)
not_rated.distinct().show()

### Q4: List Movie Genres

In [20]:
#movies.select('genres').map(lambda x: x[0].split('|')) ????? does not work
movies.select('genres').distinct().show()

### Q5: Movie for Each Category

In [22]:
movies.groupby("genres").count().orderBy("count", ascending=False).show()

# Prepare Data for Training

In [24]:
from pyspark.mllib.recommendation import ALS,MatrixFactorizationModel, Rating

In [25]:
movie_rating = sc.textFile("FileStore/tables/ratings.csv")

In [26]:
header = movie_rating.take(1)[0]
rating_data = movie_rating.filter(lambda line: line!=header).map(lambda line: line.split(",")).cache()
rating_data.take(10)

In [27]:
rating_data.take(3)

In [28]:
train, validation, test = rating_data.randomSplit([6,2,2],seed = 7856)

In [29]:
train.cache()

In [30]:
test.cache()

In [31]:
validation.cache()

In [32]:
test_RDD = test.map(lambda x: (x[0], x[1]))
test_RDD.take(10)

# Training Step

In [34]:
num_iterations = 10
ranks = [4,6,8,10]
reg_params = [0.005,0.01, 0.05, 0.1, 0.2]
all_errors = []
def train_ALS(train_data, validation_data, 
              num_iters, reg_param, ranks, all_errors):
    min_error = float('inf')
    best_rank = -1
    best_regularization = 0
    for rank in ranks:
        for reg in reg_param:
            model = ALS.train(train_data, rank, iterations = num_iters, lambda_ = reg)
            predictions = model.predictAll(validation_data.map(lambda x: (x[0], x[1])))
            predictions = predictions.map(lambda x: ((x[0], x[1]), x[2]))
            rate_and_preds = validation_data.map(lambda x: ((int(x[0]), int(x[1])), float(x[2]))).join(predictions)
            error = math.sqrt(rate_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
            all_errors.append(error)
            print ('The rank %s and regularization %s has error %s' % (rank, reg, error))
            if error < min_error:
                min_error = error
                best_rank = rank
                best_regularization = reg
    print ('The best model is rank %s with regularization %s' % (best_rank, best_regularization))
    return all_errors

In [35]:
train_ALS(train, validation, num_iterations,reg_params, ranks, all_errors)

In [36]:
ranks = [6, 8, 10, 12]
reg_params = [0.1, 0.2, 0.3, 0.4]
num_iterations = 15
train_ALS(train, validation, num_iterations,reg_params, ranks, all_errors)

## The Model Selection and Evaluation

From previous two training steps, we could basically find that the model with 8 latent factors and lambda = 0.2, by iterating over 15 times yields the best result.

In [38]:
best_rank = 8
iterations = 15
reg = 0.2
final_model = ALS.train(train, best_rank, iterations=iterations,
                      lambda_=reg)
predictions = final_model.predictAll(test_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
print ('For testing data the error is %s' % (error))

In [39]:
pred_train = final_model.transform(train)
df = pred_train.toPandas()
x = np.arange(0.5, 5.1, 0.5) # To draw the red one-to-one line below

fig = plt.figure(figsize=(10,8))
plt.tick_params(labelsize=15)
plt.scatter(df.rating, df.prediction, s = 10, alpha = 0.2)
plt.plot(x, x, c = 'r')
plt.xlabel('rating', fontsize=20)
plt.ylabel('prediction', fontsize=20)
plt.title('Training Set', fontsize=20)
plt.show()

In [40]:
df = pred_test.toPandas()
fig = plt.figure(figsize=(10,8))
plt.tick_params(labelsize=15)
plt.scatter(df.rating, df.prediction, s = 10, alpha = 0.2)
plt.plot(x, x, c = 'r')
plt.xlabel('rating', fontsize=20)
plt.ylabel('prediction', fontsize=20)
plt.title('Testing Set', fontsize=20)
plt.show()