In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('recommender').getOrCreate()
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [3]:
df = spark.read.csv('./data/ratings.csv', inferSchema=True, header=True)

In [4]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [5]:
df.columns

['userId', 'movieId', 'rating', 'timestamp']

In [6]:
df = df.select(['userId', 'movieId', 'rating'])

In [7]:
df.head(5)


[Row(userId=1, movieId=1, rating=4.0),
 Row(userId=1, movieId=3, rating=4.0),
 Row(userId=1, movieId=6, rating=4.0),
 Row(userId=1, movieId=47, rating=5.0),
 Row(userId=1, movieId=50, rating=5.0)]

In [8]:
df.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
|     1|     70|   3.0|
|     1|    101|   5.0|
|     1|    110|   4.0|
|     1|    151|   5.0|
|     1|    157|   5.0|
|     1|    163|   5.0|
|     1|    216|   5.0|
|     1|    223|   3.0|
|     1|    231|   5.0|
|     1|    235|   4.0|
|     1|    260|   5.0|
|     1|    296|   3.0|
|     1|    316|   3.0|
|     1|    333|   5.0|
|     1|    349|   4.0|
+------+-------+------+
only showing top 20 rows



In [9]:
df.describe().show()

+-------+------------------+----------------+------------------+
|summary|            userId|         movieId|            rating|
+-------+------------------+----------------+------------------+
|  count|            100836|          100836|            100836|
|   mean|326.12756356856676|19435.2957177992| 3.501556983616962|
| stddev| 182.6184914635004|35530.9871987003|1.0425292390606342|
|    min|                 1|               1|               0.5|
|    max|               610|          193609|               5.0|
+-------+------------------+----------------+------------------+



In [10]:
training, test = df.randomSplit([0.8,0.2])

In [11]:
als = ALS(maxIter=5, regParam=0.01, userCol='userId', itemCol='movieId', ratingCol='rating')

model = als.fit(training)

predictions = model.transform(test)

In [12]:
predictions.describe().show()

+-------+------------------+-----------------+------------------+----------+
|summary|            userId|          movieId|            rating|prediction|
+-------+------------------+-----------------+------------------+----------+
|  count|             20046|            20046|             20046|     20046|
|   mean|325.51187269280655|19015.47386012172|3.4952608999301606|       NaN|
| stddev| 182.6525759900593|35179.20736270911|1.0458210032029869|       NaN|
|    min|                 1|                1|               0.5|-2.9337544|
|    max|               610|           193587|               5.0|       NaN|
+-------+------------------+-----------------+------------------+----------+



In [13]:
predictions = predictions.na.drop()
predictions.describe().show()

+-------+------------------+-----------------+------------------+------------------+
|summary|            userId|          movieId|            rating|        prediction|
+-------+------------------+-----------------+------------------+------------------+
|  count|             19244|            19244|             19244|             19244|
|   mean| 323.5494180004157|17334.90927042195|3.5078466015381418|3.3723500878183272|
| stddev|181.89029383104375|32830.74639747647| 1.041400325127412| 0.981514489861878|
|    min|                 1|                1|               0.5|        -2.9337544|
|    max|               610|           187595|               5.0|            8.5307|
+-------+------------------+-----------------+------------------+------------------+



In [14]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating')
rmse = evaluator.evaluate(predictions)
rmse

1.0709748087886573