In [1]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel

In [2]:
spark = SparkSession.builder.master("local").getOrCreate()

In [3]:
movie_ratings = spark.read.json('./data/ratings.json')
movie_ratings

DataFrame[movie_id: bigint, rating: bigint, timestamp: double, user_id: bigint]

In [4]:
sorted_ratings = movie_ratings.orderBy('timestamp')

In [5]:
df = sorted_ratings.toPandas()
df

Unnamed: 0,movie_id,rating,timestamp,user_id
0,858,4,956678732.0,6040
1,2384,4,956678754.0,6040
2,593,5,956678754.0,6040
3,1961,4,956678777.0,6040
4,1419,3,956678856.0,6040
5,213,5,956678856.0,6040
6,3111,5,956678856.0,6040
7,573,4,956678856.0,6040
8,3505,4,956678856.0,6040
9,1734,2,956678881.0,6040


In [6]:
from pyspark.sql.functions import percent_rank
from pyspark.sql import Window

df = sorted_ratings.withColumn("rank", percent_rank().over(Window.partitionBy().orderBy("timestamp")))

In [7]:
train_df = df.where("rank <= .9").drop("rank")
train_df.count()

647956

In [8]:
test_df = df.where("rank > .9").drop("rank")
test_df.count()

71993

In [9]:
train_df.persist()
test_df.persist()

DataFrame[movie_id: bigint, rating: bigint, timestamp: double, user_id: bigint]

In [10]:
als = ALS(maxIter=10,rank=10, regParam=0.1, userCol="user_id", 
          itemCol="movie_id", ratingCol="rating", coldStartStrategy='drop')

# fit the ALS model to the training set
model = als.fit(train_df)

In [11]:
predictions = model.transform(test_df)
predictions.persist()

DataFrame[movie_id: bigint, rating: bigint, timestamp: double, user_id: bigint, prediction: float]

In [12]:
predictions.orderBy('prediction', ascending=False).show(10)

+--------+------+------------+-------+----------+
|movie_id|rating|   timestamp|user_id|prediction|
+--------+------+------------+-------+----------+
|     260|     3| 9.7516759E8|   1343|  5.222582|
|    1198|     4|9.75167516E8|   1343|  5.169668|
|     745|     5|9.75106081E8|   1917|  5.166789|
|     858|     5|9.75166908E8|   1343|   5.14365|
|    2858|     5|9.75107749E8|   1917|  5.107285|
|    1148|     5|9.75106081E8|   1917| 5.0931473|
|    1136|     5|9.75107837E8|   1917|  5.069777|
|    3787|     4|9.75142374E8|   1315| 5.0483994|
|     912|     5|9.75167344E8|   1343|  5.027323|
|    1223|     5|9.75106081E8|   1917|  5.023661|
+--------+------+------------+-------+----------+
only showing top 10 rows



In [13]:
recs = model.recommendForAllUsers(numItems=10)

In [14]:
recs.persist().show()

+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|   1580|[[557, 4.3915277]...|
|   4900|[[811, 5.7561865]...|
|   5300|[[3906, 5.609938]...|
|   1591|[[572, 5.9452987]...|
|   4101|[[3867, 5.409759]...|
|   1342|[[572, 4.6534057]...|
|   2122|[[572, 4.894262],...|
|   2142|[[572, 4.9363036]...|
|   5803|[[2675, 4.7327394...|
|   3794|[[572, 4.4466996]...|
|   1645|[[2512, 5.3855686...|
|   3175|[[3906, 5.6255136...|
|   4935|[[3906, 5.067027]...|
|   2366|[[572, 4.7037377]...|
|   2866|[[572, 4.4338784]...|
|   5156|[[557, 6.031303],...|
|   3997|[[2569, 4.913532]...|
|   1088|[[572, 5.0900693]...|
|   1238|[[557, 5.2911787]...|
|   3918|[[3906, 5.202706]...|
+-------+--------------------+
only showing top 20 rows



In [15]:
re = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = re.evaluate(model.transform(test_df))
print("Root Mean-squared error = " + str(rmse))

Root Mean-squared error = 0.8988820842637529


In [16]:
# import pickle

# pickle.dump(model2, open('als_model.pkl', 'wb'))

# als_model = pickle.load(open('als_model.pkl', 'rb'))

In [17]:
test_ratings = spark.read.json('./data/requests.json')

In [31]:
als_final = ALS(maxIter=10,rank=10, regParam=0.1, userCol="user_id", 
          itemCol="movie_id", ratingCol="rating")

# fit the ALS model to the training set
model_als = als_final.fit(train_df)

In [34]:
predictions_als = model_als.transform(test_df)
predictions_als.persist()

DataFrame[movie_id: bigint, rating: bigint, timestamp: double, user_id: bigint, prediction: float]

In [35]:
predictions_als.show(40)

+--------+------+------------+-------+----------+
|movie_id|rating|   timestamp|user_id|prediction|
+--------+------+------------+-------+----------+
|     148|     5|9.75592024E8|    673|       NaN|
|     463|     3|9.75661389E8|    660|       NaN|
|     463|     1|9.75441954E8|    746|       NaN|
|     463|     2|9.75232363E8|   1980| 2.2652779|
|     471|     5|9.74976667E8|   1395| 4.2990117|
|     471|     5|9.75500361E8|    731|       NaN|
|     471|     4|9.74981872E8|   1019|       NaN|
|     471|     4|9.75088817E8|    984|       NaN|
|     471|     3|9.75505373E8|   4653| 3.1576204|
|     471|     4|9.75438644E8|    855|       NaN|
|     471|     5|9.75620812E8|    671|       NaN|
|     471|     3|9.75665282E8|    660|       NaN|
|     471|     5|9.75414322E8|    770|       NaN|
|     471|     4|9.75529506E8|    699|       NaN|
|     471|     5|9.75233907E8|    881|       NaN|
|     471|     4|9.75334778E8|   2185|  3.534171|
|     471|     2|9.75133373E8|   1628| 3.5638027|


In [None]:
als_preds_df = predictions_als.toPandas()

In [141]:
als_preds_df.head(10)

Unnamed: 0,movie_id,rating,timestamp,user_id,prediction
0,148,5,975592024.0,673,
1,463,3,975661389.0,660,
2,463,1,975441954.0,746,
3,463,2,975232363.0,1980,2.265278
4,471,5,974976667.0,1395,4.299012
5,471,5,975500361.0,731,
6,471,4,974981872.0,1019,
7,471,4,975088817.0,984,
8,471,3,975505373.0,4653,3.15762
9,471,4,975438644.0,855,


In [80]:
recs = model_als.recommendForAllUsers(numItems=10)
recs_als = recs.toPandas()
recs_als.head()

Unnamed: 0,user_id,recommendations
0,1580,"[(557, 4.3915276527404785), (3906, 4.380768775..."
1,4900,"[(811, 5.756186485290527), (572, 5.62046861648..."
2,5300,"[(3906, 5.609938144683838), (557, 5.5178728103..."
3,1591,"[(572, 5.945298671722412), (3906, 5.7249207496..."
4,4101,"[(3867, 5.409759044647217), (3523, 5.239202022..."


In [96]:
movie_id = recs_als.loc[recs_als['user_id'] == 1580]['recommendations'][0][0]['movie_id']
movie_id

557

In [162]:
def get_top_n_movies(user_id, pred_df, n):
    "Get the top n movies from the highest predicted values"
    
    user = pred_df.loc[pred_df['user_id']==user_id]
    sorted_user = user.sort_values('prediction', ascending=False)
    
    movies = []
    count = pred_df.loc[pred_df['user_id'] == user_id]['movie_id'].count()
    
    if n > count:
        n = count
        
    for i in range(0,n):
        movie = sorted_user.iloc[i]['movie_id']
        movies.append(movie.astype('int'))
        
    return movies


In [167]:
get_top_n_movies(673, als_preds_df, 10)

[148, 1580, 3749, 3794, 858, 1084, 1483, 1507, 1721, 296]

In [168]:
get_top_n_movies(1980, als_preds_df, 10)

[3178, 164, 349, 2280, 2341, 517, 259, 2605, 3257, 2410]