In [21]:
from pyspark import SparkContext
from pyspark.mllib.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
# ops 
from time import time
import time
import math
import os 
import pandas as pd 
import numpy as np 



In [2]:
# import sc  (SparkContext)
from pyspark import SparkContext
sc =SparkContext()

In [3]:
sc

In [3]:
datasets_path = '/Users/yennanliu/movie_recommendation/datasets/'
small_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')
df_rating=pd.read_csv(small_ratings_file)
small_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')
df_movie=pd.read_csv(small_movies_file)

In [4]:
df_rating.head(3)

Unnamed: 0,userId,movieId,rating,timestamp
0,1,31,2.5,1260759144
1,1,1029,3.0,1260759179
2,1,1061,3.0,1260759182


In [5]:
df_movie.head(3)

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance


In [6]:

def get_data_preview():
	datasets_path = '/Users/yennanliu/movie_recommendation/datasets/'
	small_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')
	small_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')
	print (" ----------------------- ")
	time.sleep(5)
	df_rating=pd.read_csv(small_ratings_file)
	df_movie=pd.read_csv(small_movies_file)
	print (df_rating.head(3))
	print (df_movie.head(3))
	print (" ----------------------- ")
	time.sleep(5)





def get_data(full_dataset=False):

	datasets_path = '/Users/yennanliu/movie_recommendation/datasets/'
	if full_dataset==False:
		#------  rating small dataset  ------#
		# userid, movieid, rating, timestamp 
		small_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')
		small_ratings_raw_data = sc.textFile(small_ratings_file)
		# get heater 
		small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]
		# filter out header 
		# only get 1st, 2rd, and 3rd columns
		small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
		.map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()
		#------ movie dataset ------#
		# movieid, name 
		small_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')
		small_movies_raw_data = sc.textFile(small_movies_file)
		small_movies_raw_data_header = small_movies_raw_data.take(1)[0]
		# filter out header 
		small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
		.map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()
		return small_ratings_data, small_movies_data

	elif full_dataset==True:
		#------ rating completed dataset  ------# 
		complete_ratings_file = os.path.join(datasets_path, 'ml-latest', 'ratings.csv')
		complete_ratings_raw_data = sc.textFile(complete_ratings_file)
		complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]
		# filter out header 
		complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
		    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()
		#------ movie completed dataset  ------# 
		complete_movies_file = os.path.join(datasets_path, 'ml-latest', 'movies.csv')
		complete_movies_raw_data = sc.textFile(complete_movies_file)
		complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]
		# filter out header 
		complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
		    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()
		complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))
		return complete_ratings_data, complete_movies_data

 


def train_test_split(dataset):
	# split data into train (60%), validate (20%), and test (20%)
	training_RDD, validation_RDD, test_RDD = dataset.randomSplit([6, 2, 2])
	validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
	test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))
	return training_RDD, validation_RDD, test_RDD, validation_for_predict_RDD, test_for_predict_RDD

# ML 


def ALS_model(training_RDD,validation_RDD,validation_for_predict_RDD):
	# ------------- 
	# super parameters
	err=0
	min_error = float('inf')
	parameter = {}
	parameter['seed'] = 30
	parameter['iterations'] = 10
	parameter['regularization_parameter'] = 0.1
	parameter['ranks'] = [4, 8, 12]
	parameter['errors'] = [0, 0, 0]
	parameter['tolerance'] = 0.02
	# -------------
	# train the model over super parameters sets 
	for rank in parameter['ranks']:
		model = ALS.train(training_RDD, rank, seed=parameter['seed'], iterations=parameter['iterations'],lambda_=parameter['regularization_parameter'])
		predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
		# join real rating and predicted rating 
		rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
		# predicted rating error (mean square error)
		error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
		parameter['errors'][err] = error
		err += 1
		# --- fix here for python 3 --- #
		print ('For rank %s the RMSE is %s' % (rank, error))
		if error < min_error:
			min_error = error
			best_rank = rank
	# --- fix here for python 3 --- #
	print ('The best model was trained with rank %s' % best_rank)

	return model, predictions, rates_and_preds, min_error,best_rank, parameter


def ALS_model_predict(model,test_for_predict_RDD,test_RDD):
	#model = ALS.train(training_RDD, best_rank, seed=parameter['seed'], iterations=parameter['iterations'],lambda_=parameter['regularization_parameter'])
	predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
	rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
	error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
	print ('For testing data the RMSE is %s' % (error))



In [7]:
# get data 
small_ratings_data,small_movies_data = get_data()

In [8]:
# train, test split 
training_RDD, validation_RDD, test_RDD, validation_for_predict_RDD, test_for_predict_RDD = train_test_split(small_ratings_data)

In [9]:
# run ALS model 

model, predictions, rates_and_preds, errors, min_error,best_rank = ALS_model(training_RDD,validation_RDD,validation_for_predict_RDD)

For rank 4 the RMSE is 0.9401609617479811
For rank 8 the RMSE is 0.9449944748899938
For rank 12 the RMSE is 0.9506949318393243
The best model was trained with rank 4


In [10]:
# preview rating data 
small_ratings_data.take(3)

[('1', '31', '2.5'), ('1', '1029', '3.0'), ('1', '1061', '3.0')]

In [11]:
# preview movie data 
small_movies_data.take(3)

[('1', 'Toy Story (1995)'),
 ('2', 'Jumanji (1995)'),
 ('3', 'Grumpier Old Men (1995)')]

In [12]:
# preview predict 
# userid, movieid, rating
predictions.take(3)

[((564, 1084), 4.0765296160322055),
 ((436, 1084), 3.9781325083761034),
 ((533, 1084), 3.0107373107957556)]

In [13]:
###  preview ture rating and predictions ### 
# userid, movieid, user rating, predicted rating
rates_and_preds.take(3)

[((393, 2747), (2.0, 1.9116388074370707)),
 ((146, 736), (3.5, 3.0677951437837816)),
 ((212, 520), (4.0, 2.2724643989967257))]

In [14]:
# view errors 
errors

0.9401609617479811

In [15]:
# view min errors (best output)
# view best rank (super parameter set :  ranks = [4, 8, 12])
min_error ,best_rank

(4,
 {'errors': [0.9401609617479811, 0.9449944748899938, 0.9506949318393243],
  'iterations': 10,
  'ranks': [4, 8, 12],
  'regularization_parameter': 0.1,
  'seed': 30,
  'tolerance': 0.02})

In [17]:
parameter = {}
parameter['seed'] = 30
parameter['iterations'] = 10
parameter['regularization_parameter'] = 0.1
parameter['ranks'] = [4, 8, 12]
parameter['errors'] = [0, 0, 0]
parameter['tolerance'] = 0.02

In [19]:
########################## run the whole process  ##########################

In [22]:
get_data_preview()
# get data 
small_ratings_data,small_movies_data = get_data()
# train, test split 
training_RDD, validation_RDD, test_RDD, validation_for_predict_RDD, test_for_predict_RDD = train_test_split(small_ratings_data)
# ------------ Model Training  ------------ #
# train ALS model 
model, predictions, rates_and_preds, min_error,best_rank, parameter = ALS_model(training_RDD,validation_RDD,validation_for_predict_RDD)
# predict with trained ALS model 
print ('************')
ALS_model_predict(model,test_for_predict_RDD,test_RDD)
print ('************')

 ----------------------- 
   userId  movieId  rating   timestamp
0       1       31     2.5  1260759144
1       1     1029     3.0  1260759179
2       1     1061     3.0  1260759182
   movieId                    title  \
0        1         Toy Story (1995)   
1        2           Jumanji (1995)   
2        3  Grumpier Old Men (1995)   

                                        genres  
0  Adventure|Animation|Children|Comedy|Fantasy  
1                   Adventure|Children|Fantasy  
2                               Comedy|Romance  
 ----------------------- 
For rank 4 the RMSE is 0.9367225318261077
For rank 8 the RMSE is 0.947739543257468
For rank 12 the RMSE is 0.9465096746020654
The best model was trained with rank 4
************
For testing data the RMSE is 0.9495904193775816
************
