In [1]:
import pyspark as ps
from pyspark.sql import SparkSession
import random
import pandas as pd
import numpy as np
from pyspark.ml.recommendation import ALS
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt

In [2]:
spark = SparkSession.builder.getOrCreate()

Load pandas dataframes

In [9]:
ratings_df = pd.read_csv('../data/movies/ratings.csv')
movies_df = pd.read_csv('../data/movies/movies.csv')
links_df = pd.read_csv('../data/movies/links.csv')

Load spark dataframes

In [12]:
spark_df = spark.createDataFrame(ratings_df)

In [13]:
spark_df.printSchema()

root
 |-- userId: long (nullable = true)
 |-- movieId: long (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: long (nullable = true)



Train Test Split on Ratings Spark DataFrame

In [14]:
train, test = spark_df.randomSplit([0.8, 0.2], seed=427471138)

In [15]:
train.registerTempTable('train_ratings')

What is the density of our ratings (utility) matrix?

In [17]:
density = spark.sql("SELECT COUNT(rating)/(COUNT(DISTINCT(movieId))*COUNT(DISTINCT(userId))) AS train_density FROM train_ratings")

In [18]:
density.show()

+--------------------+
|       train_density|
+--------------------+
|0.014229143803102185|
+--------------------+



Average rating might be useful in the future

In [19]:
avg_rating = spark.sql("SELECT AVG(rating) FROM train_ratings")

In [20]:
avg_rating = avg_rating.toPandas()
# avg_rating[avg_rating.index == 0]

In [21]:
avg_rating = avg_rating['avg(rating)'][0]

Create and fit the ALS model

In [24]:
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="nan")


In [25]:
recommender = als.fit(train)

Predict movie ratings for the Test Data

In [26]:
test_recommender = recommender.transform(test).toPandas()

Fill N/A values with average rating from Train Data

In [28]:
test_recommender = test_recommender.fillna(avg_rating)


RMSE calculation

In [29]:
y_true_test = test_recommender.rating.to_numpy()
y_pred_test = test_recommender.prediction.to_numpy()

In [31]:
rmse_test = np.sqrt(mean_squared_error(y_true_test, y_pred_test))

In [33]:
rmse_test

1.1304923742951785