In [None]:
# Import require libraries

import pandas as pd
import numpy as np

from pyspark import SparkContext 
from pyspark.sql import SQLContext 
from pyspark.sql.functions import col
from pyspark.sql.functions import min, avg
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

Download dataset from https://www.kaggle.com/grouplens/movielens-20m-dataset/download/Mik23vvybMVT1WnMyUaB%2Fversions%2FSkDpcTj1HRLUw6J3MKQ8%2Ffiles%2Frating.csv?datasetVersionNumber=1


In [None]:
# Initiate context and load the data
sc = SparkContext()
sqlContext = SQLContext(sc)

# Due to computational efficiency loading only few records
ratings = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('rating.csv').limit(10000)

In [None]:
# Look at the column names
print(ratings.columns)

# Look at the first few rows of data
print(ratings.show())

In [None]:
# Count the total number of ratings in the dataset
numerator = ratings.select("rating").count()

# Count the number of distinct userIds and distinct movieIds
num_users = ratings.select("userID").distinct().count()
num_movies = ratings.select("movieID").distinct().count()

# Set the denominator equal to the number of users multiplied by the number of movies
denominator = num_users * num_movies

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("The ratings dataframe is ", "%.2f" % sparsity + "% empty.")

In [None]:
# Import the requisite packages

# View the ratings dataset
ratings.show()

# Filter to show only userIds less than 100
ratings.filter(col("userId") < 100).show()

# Group data by userId, count ratings
ratings.groupBy("userId").count().show()

In [None]:
# Min num ratings for movies
print("Movie with the fewest ratings: ")
ratings.groupBy("movieId").count().select(min("count")).show()

# Avg num ratings per movie
print("Avg num ratings per movie: ")
ratings.groupBy("movieId").count().select(avg("count")).show()

# Min num ratings for user
print("User with the fewest ratings: ")
ratings.groupBy("userId").count().select(min("count")).show()

# Avg num ratings per users
print("Avg num ratings per user: ")
ratings.groupBy("userId").count().select(avg("count")).show()

In [None]:
# Use .printSchema() to see the datatypes of the ratings dataset
ratings.printSchema()

# Tell Spark to convert the columns to the proper data types
ratings = ratings.select(ratings.userId.cast("integer"), ratings.movieId.cast("integer"), ratings.rating.cast("double"))

# Call .printSchema() again to confirm the columns are now in the correct format
ratings.printSchema()

In [None]:

# Create training and test set (80/20 split)
(training, test) = ratings.randomSplit([0.8, 0.2])

# Build generic ALS model without hyperparameters
als = ALS(userCol="userId", 
          itemCol="movieId", 
          ratingCol="rating", 
          coldStartStrategy="drop" , 
          nonnegative = True,
          implicitPrefs = False)

# Fit model to training data
model = als.fit(training)

In [None]:
# Generate predictions on test_data
predictions = model.transform(test)

# Tell Spark how to evaluate predictions
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# Obtain and print RMSE
rmse = evaluator.evaluate(predictions)
print ("RMSE: "), rmse

# CrossValidator and ParamGridBuilder

In [None]:
# Split data
(training_data, test_data) = ratings.randomSplit([0.8, 0.2])

# Create ALS model
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True, implicitPrefs = False)

# Confirm that a model called "als" was created
type(als)

In [None]:
# Add hyperparameters and their respective values to param_grid
# Note:- more parameters can be added. For faster execution kept limited parameters
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [5, 40, 80, 120]) \
            .addGrid(als.maxIter, [5, 100, 250, 500]) \
            .addGrid(als.regParam, [.05, .1, 1.5]) \
            .build()
           
# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

In [None]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3)

# Fit model to training data
model = cv.fit(training_data)

# Extract best combination of values from cross validation
best_model = model.bestModel

In [None]:
# Print best_model
print(type(cv.best_model))

# Complete the code below to extract the ALS model parameters
print("**Best Model**")

# Print "Rank"
print("  Rank:", best_model.getRank())

# Print "MaxIter"
print("  MaxIter:", best_model.getMaxIter())

# Print "RegParam"
print("  RegParam:", best_model.getRegParam())

In [None]:
# View the predictions 
test_predictions.show()

# Calculate and print the RMSE of test_predictions
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)