In [4]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel

In [2]:
spark = SparkSession.builder.master("local").getOrCreate()

In [3]:
movie_ratings = spark.read.json('./data/ratings.json')
movie_ratings

DataFrame[movie_id: bigint, rating: bigint, timestamp: double, user_id: bigint]

In [20]:
sorted_ratings = movie_ratings.orderBy('timestamp')

In [None]:
sorted_ratings.persist()

In [None]:
import pandas as pd
sorted_rates = sorted_ratings.toPandas()

In [70]:
sorted_rates.loc[sorted_rates['user_id']==4169]

Unnamed: 0,movie_id,rating,timestamp,user_id
235213,2653,4,965308192.0,4169
235214,1268,5,965308192.0,4169
235215,423,3,965308192.0,4169
235216,2617,4,965308192.0,4169
235230,3788,3,965308281.0,4169
235231,3841,2,965308281.0,4169
235232,3801,5,965308281.0,4169
235242,3833,1,965308319.0,4169
235243,3794,4,965308319.0,4169
235244,3811,5,965308319.0,4169


In [50]:
als = ALS(maxIter=5,rank=4, regParam=0.01, userCol="user_id", itemCol="movie_id", ratingCol="rating")

# fit the ALS model to the training set
model = als.fit(sorted_ratings)

In [51]:
test_ratings = spark.read.json('./data/requests.json')

In [52]:
predictions = model.transform(test_ratings)
predictions.persist()

DataFrame[movie_id: bigint, rating: double, timestamp: double, user_id: bigint, prediction: float]

In [53]:
predictions.show(10)

+--------+------+-------------+-------+----------+
|movie_id|rating|    timestamp|user_id|prediction|
+--------+------+-------------+-------+----------+
|     148|   NaN| 9.77959026E8|     53|       NaN|
|     148|   NaN| 9.76559602E8|   4169|  3.618781|
|     148|   NaN| 9.89024856E8|   5333| 2.3874927|
|     148|   NaN| 9.77005381E8|   4387|  2.011783|
|     148|   NaN| 9.66907208E8|   3539| 2.1715593|
|     148|   NaN| 9.76266538E8|    840| 2.3249354|
|     148|   NaN| 9.76841639E8|    216|       NaN|
|     148|   NaN| 9.76191154E8|    482|       NaN|
|     148|   NaN|1.029283935E9|    752| 3.6588159|
|     148|   NaN|1.026978024E9|    424|       NaN|
+--------+------+-------------+-------+----------+
only showing top 10 rows



In [33]:
recs = model.recommendForAllUsers(numItems=10)

In [35]:
recs.persist().show()

+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|   1580|[[3382, 7.1791215...|
|   4900|[[1369, 11.796747...|
|   5300|[[3906, 9.129992]...|
|   1591|[[2342, 10.337679...|
|   4101|[[776, 9.843668],...|
|   1342|[[776, 8.866562],...|
|   2122|[[1369, 8.457092]...|
|   2142|[[776, 7.9119825]...|
|    833|[[2342, 14.143307...|
|   5803|[[2765, 11.610508...|
|   3794|[[776, 9.103578],...|
|   1645|[[2342, 14.344983...|
|   3175|[[980, 12.535967]...|
|   4935|[[1369, 11.401782...|
|   2366|[[3140, 10.158358...|
|   2866|[[980, 7.4728885]...|
|   5156|[[3906, 14.380634...|
|   3997|[[2825, 10.380199...|
|   1088|[[980, 9.787393],...|
|   1238|[[729, 12.920483]...|
+-------+--------------------+
only showing top 20 rows



In [71]:
recs[recs['user_id'] == 53].first()['recommendations']

TypeError: 'NoneType' object is not subscriptable

In [45]:
re = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = re.evaluate(model.transform(sorted_ratings))
print("Root Mean-squared error = " + str(rmse))

Mean-square error = 0.8274513863316048
