In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col

In [2]:
# Initialize SparkSession
spark = SparkSession.builder.appName('25m_movie_lens').getOrCreate()

In [3]:
# Load both the ratings and movies data into a pandas DataFrame
ratings_df = spark.read.csv('./ml-25m/ratings.csv', header=True, inferSchema=True)
movies_df = spark.read.csv('./ml-25m/movies.csv', header=True, inferSchema=True)

TypeError: csv() got an unexpected keyword argument 'compression'

In [141]:
# Left join ratings on movies
join_df = ratings_df.join(movies_df, on='movieId', how='left') \
    .select(col('userId'), col('movieId'), col('rating'), col('title'), col('genres')) \
    .sort('userId')

# Drop original and intermediate genres columns
join_df = join_df.drop(col('genres'))

join_df.show()

+------+-------+------+--------------------+
|userId|movieId|rating|               title|
+------+-------+------+--------------------+
|     1|    296|   5.0| Pulp Fiction (1994)|
|     1|    306|   3.5|Three Colors: Red...|
|     1|    307|   5.0|Three Colors: Blu...|
|     1|    665|   5.0|  Underground (1995)|
|     1|    899|   3.5|Singin' in the Ra...|
|     1|   1088|   4.0|Dirty Dancing (1987)|
|     1|   1175|   3.5| Delicatessen (1991)|
|     1|   1217|   3.5|          Ran (1985)|
|     1|   1237|   5.0|Seventh Seal, The...|
|     1|   1250|   4.0|Bridge on the Riv...|
|     1|   1260|   3.5|            M (1931)|
|     1|   1653|   4.0|      Gattaca (1997)|
|     1|   2011|   2.5|Back to the Futur...|
|     1|   2012|   2.5|Back to the Futur...|
|     1|   2068|   2.5|Fanny and Alexand...|
|     1|   2161|   3.5|NeverEnding Story...|
|     1|   2351|   4.5|Nights of Cabiria...|
|     1|   2573|   4.0|        Tango (1998)|
|     1|   2632|   5.0|Saragossa Manuscr...|
|     1|  

In [142]:
(train_df, test_df) = join_df.randomSplit([0.8, 0.2])

In [143]:
# Define ALS model
als = ALS(maxIter=20, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop", nonnegative=True)

In [144]:
# Fit data and make predictions
model = als.fit(train_df)
predictions = model.transform(test_df)

In [145]:
# Evaluate model using RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

print("Root Mean Squared Error = {:.4f}".format(rmse))

Root Mean Squared Error = 1.4716


In [146]:
def get_title(movie_id):
    return (movies_df.filter(movies_df.movieId == movie_id).collect()[0][1])

In [None]:
# Stop SparkSession
spark.stop()