In [1]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

spark = SparkSession.builder.appName("SimpleApp3").getOrCreate()

ratings = spark.read.text("../data/sample_movielens_ratings.txt")\
.rdd.toDF()\
.selectExpr("split(value , '::') as col")\
.selectExpr(
"cast(col[0] as int) as userId",
"cast(col[1] as int) as movieId",
"cast(col[2] as float) as rating",
"cast(col[3] as long) as timestamp")


In [2]:
ratings.show(10)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     0|      2|   3.0|1424380312|
|     0|      3|   1.0|1424380312|
|     0|      5|   2.0|1424380312|
|     0|      9|   4.0|1424380312|
|     0|     11|   1.0|1424380312|
|     0|     12|   2.0|1424380312|
|     0|     15|   1.0|1424380312|
|     0|     17|   1.0|1424380312|
|     0|     19|   1.0|1424380312|
|     0|     21|   1.0|1424380312|
+------+-------+------+----------+
only showing top 10 rows



In [3]:
training, test = ratings.randomSplit([0.8, 0.2])

In [5]:
als = ALS()\
.setMaxIter(5)\
.setRegParam(0.01)\
.setUserCol("userId")\
.setItemCol("movieId")\
.setRatingCol("rating")
print (als.explainParams())
alsModel = als.fit(training)
predictions = alsModel.transform(test)

alpha: alpha for implicit preference (default: 1.0)
blockSize: block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data. (default: 4096)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
coldStartStrategy: strategy for dealing with unknown or new users/items at prediction time. This may be useful in cross-validation or production scenarios, for handling user/item ids the model has not seen in the training data. Supported values: 'nan', 'drop'. (default: nan)
finalStorageLevel: StorageLevel for ALS model factors. (default: MEMORY_AND_DISK)
implicitPrefs: whether to use implicit preference (default: False)
intermediateStorageLevel: StorageLevel for interme

In [6]:
alsModel.recommendForAllUsers(10)\
.selectExpr("userId", "explode(recommendations)").show()
alsModel.recommendForAllItems(10)\
.selectExpr("movieId", "explode(recommendations)").show()

+------+---------------+
|userId|            col|
+------+---------------+
|    28|{90, 5.3888936}|
|    28|{92, 4.9344115}|
|    28|{12, 4.7196984}|
|    28|{81, 4.5641074}|
|    28| {2, 4.5159183}|
|    28|{49, 4.1611114}|
|    28| {8, 4.0457015}|
|    28|{41, 3.7586837}|
|    28| {32, 3.553004}|
|    28|{87, 3.4894001}|
|    26|{46, 6.4369264}|
|    26| {64, 5.467854}|
|    26|{79, 5.2971153}|
|    26|{24, 5.1968746}|
|    26|{23, 5.0114326}|
|    26| {7, 4.9094357}|
|    26|{94, 4.6878347}|
|    26|{88, 4.6268067}|
|    26|{80, 4.1739163}|
|    26|{68, 4.1596293}|
+------+---------------+
only showing top 20 rows

+-------+---------------+
|movieId|            col|
+-------+---------------+
|     31| {23, 4.384329}|
|     31|{17, 4.2648764}|
|     31|{12, 3.4872496}|
|     31| {10, 3.413122}|
|     31|{26, 3.2221746}|
|     31|{14, 3.0931547}|
|     31|  {6, 2.681456}|
|     31|  {7, 2.585241}|
|     31|  {8, 2.555776}|
|     31| {15, 2.511889}|
|     85|{16, 5.1661696}|
|     85| 

In [7]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator()\
.setMetricName("rmse")\
.setLabelCol("rating")\
.setPredictionCol("prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = %f" % rmse)

Root-mean-square error = 2.052589


In [8]:
from pyspark.mllib.evaluation import RankingMetrics, RegressionMetrics
from pyspark.sql.functions import col, expr
perUserActual = predictions\
.where("rating > 2.5")\
.groupBy("userId")\
.agg(expr("collect_set(movieId) as movies"))

In [9]:
perUserActual.show()

+------+--------------------+
|userId|              movies|
+------+--------------------+
|    28|         [0, 19, 89]|
|    26|    [21, 36, 22, 73]|
|    27|        [33, 19, 75]|
|    12|[16, 64, 17, 35, 94]|
|    22|        [70, 36, 87]|
|     1|        [21, 28, 77]|
|    13|            [72, 29]|
|    16|                [94]|
|     6|                 [2]|
|     3| [34, 52, 88, 24, 7]|
|     5|[13, 49, 20, 68, 55]|
|    19|[74, 32, 54, 94, 98]|
|    15|            [64, 26]|
|     9|        [70, 43, 95]|
|    17|    [49, 46, 56, 90]|
|     4|        [60, 52, 87]|
|     8|        [31, 58, 62]|
|    23|    [48, 30, 50, 18]|
|    10|            [89, 25]|
|    25|            [12, 34]|
+------+--------------------+
only showing top 20 rows

