# Build ML model using PySpark-MLlib

This notebook is based on https://github.com/jadianes/spark-py-notebooks

It is about to build a movie recommendation model (Collaborative Filtering) using public MovieLens dataset.

## Getting data

GroupLens Research has collected and made available rating data sets from the MovieLens website. The data sets were collected over various periods of time, depending on the size of the set. 

In our case, we will use the lastest datasets:
- Small dataset 
- Full dataset

In [1]:
# links of datasets
complete_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest.zip'
small_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'

# locations' paths store datasets
import os
datasets_path = os.path.join('C:/Users/xuand/FlaskAuthSpark', 'datasets')

complete_dataset_path = os.path.join(datasets_path, 'ml-latest.zip')
small_dataset_path = os.path.join(datasets_path, 'ml-latest-small.zip')

# download datasets
import urllib.request
 
small_f = urllib.request.urlretrieve (small_dataset_url, small_dataset_path)
complete_f = urllib.request.urlretrieve (complete_dataset_url, complete_dataset_path)

# Extract these downloaded files into its individual folders
import zipfile

with zipfile.ZipFile(small_dataset_path, 'r') as z:
    z.extractall(datasets_path)
with zipfile.ZipFile(complete_dataset_path, 'r') as z:
    z.extractall(datasets_path)   

## Create RDD files

### Create a PySpark session

In [2]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
conf = pyspark.SparkConf().setAppName('MLmodel-PySpark').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

### Create RDD files for ratings file

In [3]:
# create RDD files
import os
datasets_path = os.path.join('C:/Users/xuand/FlaskAuthSpark', 'datasets')
small_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')

small_ratings_raw_data = sc.textFile(small_ratings_file)
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

small_ratings_data = small_ratings_raw_data.filter(lambda line: line != small_ratings_raw_data_header).map(lambda line: line.split(',')).map(lambda tokens: (tokens[0], tokens[1], tokens[2])).cache()
small_ratings_data.take(3)

### Create RDD files for movies file

In [4]:
small_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')

small_movies_raw_data = sc.textFile(small_movies_file)
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]

small_movies_data = small_movies_raw_data.filter(lambda line: line != small_movies_raw_data_header).map(lambda line: line.split(',')).map(lambda tokens: (tokens[0], tokens[1])).cache()
small_movies_data.take(3)

## Collaborative filtering (CF)
In CF, we make predictions (filtering) about the interests of a user by collecting preferences or taste information from many users (collaborating). The underlying assumption is that if a user A has the same option as a user B on an issue, A is more likely to have B's opinion on a different issue x than to have the opinion on x of a user chosen randomly.

The image below shows an example of CF. At first, people rate different items, then the system makes predictions about a user's rating for an item not rated yet. The new predictions are built upon the existing ratings of other users with similar ratings with the active user. In the image, the system predicts that the user will not like the video.

### CF in MLlib 
Spark MLlib library for ML provides CF implementation by using Alternating Least Squares (ALS). The implementation in MLlib has the following parameters:

- numBlocks is the number of blocks used to parallelize computaiton (set to -1 to auto-configure)
- rank is the number of latent factors in the model
- iterations is the number of iterations to run
- lambda specifies the regularization parameter in ALS
- implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data
- alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations.

In [5]:
from pyspark.mllib.recommendation import ALS
import math

### Using small dataset to select ALS parameters

In [6]:
# split dataset into train, validation, and test datasets
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6,2,2], seed=0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print('For rank %s the RMSE is %s' % (rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

print('The best model was trained with rank %s' %best_rank)

In [7]:
# test the selected model
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

print('for testing data, the RMSE is %s' %error)

### Using the complete dataset to build the final model

In [8]:
# load the complete dataset file
complete_ratings_file = os.path.join(datasets_path, 'ml-latest', 'ratings.csv')
complete_ratings_raw_data = sc.textFile(complete_ratings_file)
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]

# parse
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line != complete_ratings_raw_data_header).map(lambda line: line.split(',')).map(lambda tokens: (int(tokens[0]), int(tokens[1]), float(tokens[2]))).cache()
print('There are %s recommendations in the complete dataset' %(complete_ratings_data.count()))

# split dataset into train and test datasets
training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=0)

complete_model = ALS.train(training_RDD, best_rank, seed=seed,iterations=iterations, lambda_=regularization_parameter)

# test the selected model
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print('For testing data the RMSE is %s' % (error))

In [9]:
from pyspark.mllib.recommendation import MatrixFactorizationModel

model_path = os.path.join('C:/Users/xuand/FlaskAuthSpark', 'models', 'movie_lens_als')
complete_model_path = os.path.join('C:/Users/xuand/FlaskAuthSpark', 'complete_models', 'complete_movie_lens_als')

# Save models
model.save(sc, model_path)
model.save(sc, complete_model_path)