In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark import SparkConf
from pyspark.mllib.recommendation import ALS
from pyspark.mllib.classification import NaiveBayes
from pyspark.mllib.regression import LabeledPoint
from pyspark import SparkContext

sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

sqlContext = SQLContext(sc)

In [2]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

In [3]:
#For databricks,  the data were uploaded into the databricks SQL sever
#This gave the option to use the first row directly, and made it much
#easier to convert the data to RDD format for processing
ratings = spark.sql("select userId,movieId,rating from ratings_csv")
ratings = ratings.rdd

In [4]:
ratings.take(5)
#type(ratings)

In [5]:
#Generate datasets
#Split dataset into train, validation and test sets
movie_train, movie_val, movie_test= ratings.randomSplit([0.6, 0.2, 0.2])

#Load data into memory
movie_train.cache()
movie_test.cache()
movie_val.cache()

#Sample results
print('Train set')
movie_train.take(5)

In [6]:
#Remove ratings for validation and test datasets
movie_test_no_rate = movie_test.map(lambda x: (x[0], x[1]))
movie_val_no_rate = movie_val.map(lambda x: (x[0], x[1]))

#Load data into memory
movie_test_no_rate.cache()
movie_val_no_rate.cache()

In [7]:
# Training the model
import math
rank = 1
iterations = 5

min_error = 0
best_rank = -1

for rank in range(1, 8):
    #generate model
    model = ALS.train(movie_train, rank=rank, iterations=iterations)

    #generate predictions
    predictions = model.predictAll(movie_val_no_rate).map(lambda r: ((r[0], r[1]), r[2]))

    #get actual vs predictions
    rates_and_preds = movie_val.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)

    #calculate error
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

    #display output
    print('For rank %s the RMSE is %s' % (rank, error))

    if error < min_error:
        best_rank = rank

    min_error = error
        
print('The best model was trained with rank %s' % best_rank)

In [8]:
#generate the model with best rank and iterations
rank = 4
iterations = 5
model = ALS.train(movie_train, rank=rank, iterations=iterations)

#apply the model for test data.
predictions = model.predictAll(movie_test_no_rate).map(lambda r: ((r[0], r[1]), r[2]))

# joining the prediction with the original test dataset
ratesAndPreds = movie_test.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)

# calculating error
RMSE = math.sqrt(ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
print("Root Mean Squared Error = " + str(RMSE))