In [3]:
# Initial import and starting the spark session
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").getOrCreate()

In [10]:
# Importing some pandas functionality we didn't end up using
from pyspark.sql.functions import pandas_udf

In [4]:
# Initiating the spark session??
spark = SparkSession\
        .builder\
        .appName("RecommendationEngine").config("spark.driver.host","localhost")\
        .getOrCreate()

In [5]:
# Reading in the first json file, ratings
ratings = spark.read.json("data/ratings.json")
ratings.take(10)

[Row(movie_id=858, rating=4, timestamp=956678732.0, user_id=6040),
 Row(movie_id=2384, rating=4, timestamp=956678754.0, user_id=6040),
 Row(movie_id=593, rating=5, timestamp=956678754.0, user_id=6040),
 Row(movie_id=1961, rating=4, timestamp=956678777.0, user_id=6040),
 Row(movie_id=1419, rating=3, timestamp=956678856.0, user_id=6040),
 Row(movie_id=213, rating=5, timestamp=956678856.0, user_id=6040),
 Row(movie_id=3111, rating=5, timestamp=956678856.0, user_id=6040),
 Row(movie_id=573, rating=4, timestamp=956678856.0, user_id=6040),
 Row(movie_id=3505, rating=4, timestamp=956678856.0, user_id=6040),
 Row(movie_id=1734, rating=2, timestamp=956678881.0, user_id=6040)]

In [7]:
type(ratings)

pyspark.sql.dataframe.DataFrame

In [28]:
ratings.head()

Row(movie_id=858, rating=4, timestamp=956678732.0, user_id=6040)

In [25]:
# Grouping the ratings by movie_id, to arrive at the average movie rating
# This is the most basic iteration, an unweighted average rating

grouped = ratings.groupby(["movie_id"]).mean()

In [34]:
grouped.head()

Row(movie_id=26, avg(movie_id)=26.0, avg(rating)=3.4788732394366195, avg(timestamp)=967825426.5070423, avg(user_id)=3564.774647887324)

In [32]:
# Sorting to find the movies with the highest average rating
grouped.sort(grouped["avg(rating)"].desc()).take(10)

[Row(movie_id=3881, avg(movie_id)=3881.0, avg(rating)=5.0, avg(timestamp)=972427747.0, avg(user_id)=2885.0),
 Row(movie_id=2964, avg(movie_id)=2964.0, avg(rating)=5.0, avg(timestamp)=962393248.0, avg(user_id)=5077.0),
 Row(movie_id=1830, avg(movie_id)=1830.0, avg(rating)=5.0, avg(timestamp)=972413840.0, avg(user_id)=2869.0),
 Row(movie_id=3607, avg(movie_id)=3607.0, avg(rating)=5.0, avg(timestamp)=957731408.0, avg(user_id)=5851.0),
 Row(movie_id=3522, avg(movie_id)=3522.0, avg(rating)=5.0, avg(timestamp)=959108978.0, avg(user_id)=5616.0),
 Row(movie_id=3656, avg(movie_id)=3656.0, avg(rating)=5.0, avg(timestamp)=960895192.0, avg(user_id)=5313.0),
 Row(movie_id=3233, avg(movie_id)=3233.0, avg(rating)=5.0, avg(timestamp)=966399373.5, avg(user_id)=3733.0),
 Row(movie_id=3382, avg(movie_id)=3382.0, avg(rating)=5.0, avg(timestamp)=960770959.0, avg(user_id)=5334.0),
 Row(movie_id=1471, avg(movie_id)=1471.0, avg(rating)=5.0, avg(timestamp)=975243157.0, avg(user_id)=1896.0),
 Row(movie_id=787, 

In [33]:
# sorted(grouped, key=lambda x: x[2])

In [35]:
# Creating a clean ratings dataframe, without the timestamp column
clean_ratings = ratings.drop("timestamp")

In [36]:
clean_ratings.head()

Row(movie_id=858, rating=4, user_id=6040)

In [37]:
# New imports! The recommendation engine and an evaluator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [38]:
# Creating a train/test split of our clean ratings dataframe
(train, test) = clean_ratings.randomSplit([.8, .2])

In [40]:
# Initiating our recommendation model, with just default hyperparameters
als = ALS(userCol="user_id", itemCol="movie_id",
          ratingCol="rating", coldStartStrategy="drop")

# Fitting the recommendation model as our base model (since it's so basic)
base_model = als.fit(train)

In [42]:
# Arriving at predictions for our test data
base_preds = base_model.transform(test)

# Initiating our evaluator
evaluator = RegressionEvaluator(
    metricName="rmse", labelCol="rating", predictionCol="prediction")

# Actually using our evaluator, and arriving at the RMSE
base_rmse = evaluator.evaluate(base_preds)
print("Root Mean Squared Error: " + str(base_rmse))

Root Mean Squared Error: 0.8756899586123921


An RMSE of nearly 1 is pretty bad - means that the average prediction is nearly one star off, out of five. But this is the baseline we'll try to improve upon!

In [44]:
# More imports
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Initiating a new recommendation model
als_tuned = ALS(userCol="user_id", itemCol="movie_id",
                ratingCol="rating", coldStartStrategy="drop")

# Setting up our parameter grid, to try different hyperparameters
params = ParamGridBuilder().addGrid(als_tuned.regParam, [0.01, 0.001, 0.1]).addGrid(
    als_tuned.rank, [4, 10, 50]).build()

# instantiating crossvalidator estimator, 
cv = CrossValidator(estimator=als_tuned, estimatorParamMaps=params,
                    evaluator=evaluator, parallelism=4)

best_model = cv.fit(ratings)

# This takes forever!
best_model.bestModel.rank

10

In [45]:
# Arriving at predictions for our test data for our newly-tuned model
tuned_preds = best_model.transform(test)

# Initiating our evaluator
evaluator = RegressionEvaluator(
    metricName="rmse", labelCol="rating", predictionCol="prediction")

# Actually using our evaluator, and arriving at the RMSE
tuned_rmse = evaluator.evaluate(tuned_preds)
print("Root Mean Squared Error: " + str(tuned_rmse))

Root Mean Squared Error: 0.8173528520014758


Slight improvement! But only slight, and testing the parameter grid and using the cross validator together took probably 20 minutes. Not enough of an improvement to be worth that.

pyspark.sql.dataframe.DataFrame