In [23]:

from pyspark.sql import SparkSession

# $example on$
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
# $example off$

In [26]:
lines = spark.read.text("sample_movielens_ratings.txt").rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                      rating=float(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

In [27]:
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

In [29]:
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.5793116868660744


In [30]:
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)


In [31]:
# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)

In [39]:
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)

In [34]:
userRecs.show()


+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    20|[{22, 4.3497295},...|
|    10|[{92, 4.7074437},...|
|     0|[{92, 3.8185835},...|
|     1|[{62, 3.6579142},...|
|    21|[{29, 4.567669}, ...|
|    11|[{23, 5.183999}, ...|
|    12|[{17, 5.087547}, ...|
|    22|[{7, 5.6119094}, ...|
|     2|[{93, 4.9635673},...|
|    13|[{25, 4.1365027},...|
|     3|[{69, 4.6023016},...|
|    23|[{63, 6.6244283},...|
|     4|[{38, 4.203873}, ...|
|    24|[{85, 5.5591497},...|
|    14|[{29, 4.969773}, ...|
|     5|[{55, 5.127508}, ...|
|    15|[{46, 4.987389}, ...|
|    25|[{8, 5.48521}, {9...|
|    26|[{51, 5.9222693},...|
|     6|[{29, 5.6315074},...|
+------+--------------------+
only showing top 20 rows



In [36]:
movieRecs.show()


+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|     20|[{17, 4.9352827},...|
|     40|[{28, 4.489238}, ...|
|     10|[{5, 4.3297353}, ...|
|     50|[{11, 3.9272325},...|
|     80|[{26, 5.471773}, ...|
|     70|[{28, 4.9403067},...|
|     60|[{28, 3.4940999},...|
|     90|[{17, 5.0580907},...|
|     30|[{16, 5.210422}, ...|
|      0|[{28, 2.7143505},...|
|     31|[{12, 3.9774463},...|
|     81|[{11, 3.677592}, ...|
|     91|[{12, 3.2960072},...|
|      1|[{15, 3.7009394},...|
|     41|[{28, 5.38506}, {...|
|     61|[{9, 2.8258836}, ...|
|     51|[{26, 5.9222693},...|
|     21|[{17, 3.373198}, ...|
|     11|[{18, 3.8815577},...|
|     71|[{25, 3.4732723},...|
+-------+--------------------+
only showing top 20 rows



In [35]:
userSubsetRecs.show()


+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    26|[{51, 5.9222693},...|
|    19|[{51, 4.170739}, ...|
|    29|[{46, 4.8911676},...|
+------+--------------------+



In [37]:
movieSubSetRecs.show()

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|     65|[{23, 4.8739395},...|
|     26|[{17, 4.9918084},...|
|     29|[{6, 5.6315074}, ...|
+-------+--------------------+

