# Spark Lens

A collaborative filtering movie recommender system using the Movie Lens dataset and Spark ML.

## Import libraries

In [1]:
import pyspark
from pyspark.sql.types import *
from pyspark.ml.feature import Imputer
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

import numpy as np

## Initialize Spark

In [2]:
# Create Spark Session and Spark Context
spark = pyspark.sql.SparkSession \
    .builder \
    .appName("spark_lens") \
    .master("local[2]") \
    .getOrCreate()
sc = spark.sparkContext

## Load and clean the movie lens ratings data

In [3]:
# Load data into an RDD.
ratings_rdd = sc.textFile('data/u.data')

In [4]:
ratings_rdd.take(10)

['196\t242\t3\t881250949',
 '186\t302\t3\t891717742',
 '22\t377\t1\t878887116',
 '244\t51\t2\t880606923',
 '166\t346\t1\t886397596',
 '298\t474\t4\t884182806',
 '115\t265\t2\t881171488',
 '253\t465\t5\t891628467',
 '305\t451\t3\t886324817',
 '6\t86\t3\t883603013']

In [5]:
# Split rows into fields.
ratings_rdd = ratings_rdd.map(lambda row: row.split('\t'))

In [6]:
ratings_rdd.take(10)

[['196', '242', '3', '881250949'],
 ['186', '302', '3', '891717742'],
 ['22', '377', '1', '878887116'],
 ['244', '51', '2', '880606923'],
 ['166', '346', '1', '886397596'],
 ['298', '474', '4', '884182806'],
 ['115', '265', '2', '881171488'],
 ['253', '465', '5', '891628467'],
 ['305', '451', '3', '886324817'],
 ['6', '86', '3', '883603013']]

In [7]:
# Cast fields to types and drop timestamp.
def cast(row):
  user, movie, rating, timestamp = row
  return (int(user), int(movie), float(rating))
ratings_rdd = ratings_rdd.map(cast)

In [8]:
ratings_rdd.take(10)

[(196, 242, 3.0),
 (186, 302, 3.0),
 (22, 377, 1.0),
 (244, 51, 2.0),
 (166, 346, 1.0),
 (298, 474, 4.0),
 (115, 265, 2.0),
 (253, 465, 5.0),
 (305, 451, 3.0),
 (6, 86, 3.0)]

In [9]:
# Create a schema for our Spark DataFrame.
schema = StructType([
    StructField('user', IntegerType(), True),
    StructField('movie', IntegerType(), True),
    StructField('rating', DoubleType(), True)
])

In [10]:
# Create our Spark DataFrame.
ratings_df = spark.createDataFrame(ratings_rdd, schema)

In [11]:
# Show first 10 rows.
ratings_df.show(10)

+----+-----+------+
|user|movie|rating|
+----+-----+------+
| 196|  242|   3.0|
| 186|  302|   3.0|
|  22|  377|   1.0|
| 244|   51|   2.0|
| 166|  346|   1.0|
| 298|  474|   4.0|
| 115|  265|   2.0|
| 253|  465|   5.0|
| 305|  451|   3.0|
|   6|   86|   3.0|
+----+-----+------+
only showing top 10 rows



In [12]:
# Describe DataFrame.
ratings_df.describe().show()

+-------+------------------+-----------------+------------------+
|summary|              user|            movie|            rating|
+-------+------------------+-----------------+------------------+
|  count|            100000|           100000|            100000|
|   mean|         462.48475|        425.53013|           3.52986|
| stddev|266.61442012750945|330.7983563255858|1.1256735991443179|
|    min|                 1|                1|               1.0|
|    max|               943|             1682|               5.0|
+-------+------------------+-----------------+------------------+



In [13]:
# Register temp table.
ratings_df.registerTempTable('ratings')
# Query table for number of nulls.
spark.sql('''
    SELECT COUNT(rating) AS nulls
    FROM ratings 
    WHERE rating=null
''').show()

+-----+
|nulls|
+-----+
|    0|
+-----+



## Train/test split

In [14]:
# Create a random 80/20 train/test split.
train_df, test_df = ratings_df.randomSplit([0.8, 0.2])

In [15]:
train_df.count()

80163

In [16]:
test_df.count()

19837

## Model

In [17]:
# Use Spark's implementation of an Alternating Least Squares (ALS) matrix factorization model.
als_model  = ALS( \
    userCol='user', \
    itemCol='movie', \
    ratingCol='rating', \
    nonnegative=True, \
    regParam=0.1, \
    rank=10 \
)

In [18]:
# Train recommender.
recommender = als_model.fit(train_df)

## Predict

In [19]:
# Make predictions for entire test set.
test_df = recommender.transform(test_df)

In [20]:
test_df.show(10)

+----+-----+------+----------+
|user|movie|rating|prediction|
+----+-----+------+----------+
| 633|  148|   1.0| 3.4631267|
| 224|  148|   3.0| 3.3561032|
| 328|  148|   3.0| 3.1503906|
| 120|  148|   3.0| 2.7938714|
|  92|  148|   2.0| 2.7115803|
| 374|  148|   4.0| 3.2940223|
| 834|  148|   4.0| 3.1915097|
|  59|  148|   3.0|  2.879704|
| 244|  148|   2.0|  2.708571|
| 706|  148|   4.0| 3.4192502|
+----+-----+------+----------+
only showing top 10 rows



## Evaluate

In [21]:
test_pd = test_df.toPandas()
test_pd = test_df.toPandas().fillna(test_pd['rating'].mean())

In [22]:
test_pd['squared_error'] = (test_pd['rating'] - test_pd['prediction'])**2

In [23]:
# Calculate RMSE
np.sqrt(sum(test_pd['squared_error']) / len(test_pd))

0.920786933145924