In [10]:
from pyspark.sql import SparkSession

from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [55]:
spark = SparkSession.builder.appName('als_movielens').getOrCreate()
# data = spark.read.csv('data/movielens/ml-latest-small/ratings.csv', inferSchema=True, header=True)

# # Try on 1m ratings dataset.
# data = spark.read.csv('data/movielens/ml-1m/ratings.dat', inferSchema=True, header=False, sep=':')
# data = data.drop('_c1', '_c3', '_c5')
# data = data.toDF('userId', 'movieId', 'rating', 'timestamp')

# Try on 10m ratings dataset! (265 Mb)
data = spark.read.csv('data/movielens/ml-10m/ratings.dat', inferSchema=True, header=False, sep=':')
data = data.drop('_c1', '_c3', '_c5')
data = data.toDF('userId', 'movieId', 'rating', 'timestamp')

In [56]:
data.describe().show()

+-------+-----------------+-----------------+------------------+--------------------+
|summary|           userId|          movieId|            rating|           timestamp|
+-------+-----------------+-----------------+------------------+--------------------+
|  count|         10000054|         10000054|          10000054|            10000054|
|   mean|35869.85940925919|4120.291476526027| 3.512421932921562|1.0326063546983086E9|
| stddev|20585.33735467861|8938.402117920672|1.0604184716263516|1.1596396220252858E8|
|    min|                1|                1|               0.5|           789652009|
|    max|            71567|            65133|               5.0|          1231131736|
+-------+-----------------+-----------------+------------------+--------------------+



In [57]:
training, test = data.randomSplit([0.7, 0.3])

In [58]:
model = ALS(maxIter=5, regParam=0.01, userCol='userId', itemCol='movieId', ratingCol='rating')
model_fit = model.fit(training)

In [59]:
predictions = model_fit.transform(test)

In [60]:
predictions.show(5)

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
| 50581|    148|   2.0| 838308516| 2.7593236|
| 38139|    148|   2.0| 965940170|  2.451095|
| 24254|    148|   3.0|1049216998| 2.7592301|
| 27042|    148|   3.0| 831639909| 2.8193898|
| 24407|    148|   4.0| 998799394|  2.771439|
+------+-------+------+----------+----------+
only showing top 5 rows



In [62]:
predictions.describe().show()

+-------+------------------+------------------+------------------+--------------------+----------+
|summary|            userId|           movieId|            rating|           timestamp|prediction|
+-------+------------------+------------------+------------------+--------------------+----------+
|  count|           2999463|           2999463|           2999463|             2999463|   2999463|
|   mean| 35886.23962956036|4125.3596873840415| 3.512418056165387|1.0326756724195204E9|       NaN|
| stddev|20581.840605106532| 8949.521545343912|1.0606260603252557|1.1594009282944754E8|       NaN|
|    min|                 1|                 1|               0.5|           789652009|-5.5549273|
|    max|             71567|             65133|               5.0|          1231131137|       NaN|
+-------+------------------+------------------+------------------+--------------------+----------+



In [63]:
predictions = predictions.dropna(subset=['prediction'])

In [64]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
rmse = evaluator.evaluate(predictions)

# With the ml-latest-small dataset, rmse was about 1.1, but for the ml-1m dataset, rmse is around 0.9, an improvement, and rmse for ml-10m is 0.83, yet another minor improvement.
print('Root-mean-square error = ' + str(rmse))

Root-mean-square error = 0.8301464608086143


In [66]:
# Serve recommendations for a specific user.  This would normally be done on movies they haven't seen, but rather than generate that list here it's simply done on the test set.
userId = 24254
single_user = test.filter(test['userId']==userId).select(['movieId','userId','rating'])

recommendations = model_fit.transform(single_user)
recommendations.orderBy('prediction', ascending=False).show()

+-------+------+------+----------+
|movieId|userId|rating|prediction|
+-------+------+------+----------+
|   2997| 24254|   5.0|  4.537716|
|    541| 24254|   3.0| 4.4854784|
|   1251| 24254|   3.0| 4.4721036|
|   5893| 24254|   5.0| 4.3591785|
|   6001| 24254|   4.0| 4.3497186|
|   2599| 24254|   5.0| 4.3256216|
|   4973| 24254|   3.0| 4.3173676|
|   1244| 24254|   5.0|  4.300537|
|   2289| 24254|   5.0|  4.288149|
|   3019| 24254|   4.0| 4.2383404|
|   4378| 24254|   3.0| 4.2343783|
|   2064| 24254|   5.0|  4.231061|
|   1095| 24254|   5.0|  4.206632|
|   2020| 24254|   5.0| 4.1950746|
|    800| 24254|   5.0| 4.1841664|
|   2973| 24254|   5.0|    4.1702|
|   2300| 24254|   5.0| 4.1480427|
|   5617| 24254|   4.0| 4.1236672|
|   1635| 24254|   5.0|  4.122512|
|   5902| 24254|   4.0| 4.1212077|
+-------+------+------+----------+
only showing top 20 rows

