In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('RS').getOrCreate()
# load dataset
df = spark.read.csv('/usr/dataset/ml-20m/ratings.csv', inferSchema=True, header=True)



In [3]:
df.show(10)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|      2|   3.5|1112486027|
|     1|     29|   3.5|1112484676|
|     1|     32|   3.5|1112484819|
|     1|     47|   3.5|1112484727|
|     1|     50|   3.5|1112484580|
|     1|    112|   3.5|1094785740|
|     1|    151|   4.0|1094785734|
|     1|    223|   4.0|1112485573|
|     1|    253|   4.0|1112484940|
|     1|    260|   4.0|1112484826|
+------+-------+------+----------+
only showing top 10 rows



In [4]:
userCount_df = df.groupBy('userId').count().orderBy('count', ascending=False)
# Spark's DataFrame --> Pandas's DataFrame
user_df = userCount_df.toPandas()

In [5]:
user_df.head()

Unnamed: 0,userId,count
0,118205,9254
1,8405,7515
2,82418,5646
3,121535,5520
4,125794,5491


In [6]:
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, IndexToString
stringIndexer = StringIndexer(inputCol='movieId', outputCol='movieId_num')
model = stringIndexer.fit(df)
new_df  = model.transform(df)
new_df.show(10)


+------+-------+------+----------+-----------+
|userId|movieId|rating| timestamp|movieId_num|
+------+-------+------+----------+-----------+
|     1|      2|   3.5|1112486027|      125.0|
|     1|     29|   3.5|1112484676|      564.0|
|     1|     32|   3.5|1112484819|       19.0|
|     1|     47|   3.5|1112484727|       23.0|
|     1|     50|   3.5|1112484580|       14.0|
|     1|    112|   3.5|1094785740|      370.0|
|     1|    151|   4.0|1094785734|      335.0|
|     1|    223|   4.0|1112485573|      106.0|
|     1|    253|   4.0|1112484940|       77.0|
|     1|    260|   4.0|1112484826|        5.0|
+------+-------+------+----------+-----------+
only showing top 10 rows



In [7]:
train_df, test_df = new_df.randomSplit([0.7, 0.3])

print('train_df, (%d, %d)'%(train_df.count(), len(train_df.columns)))
print('test_df, (%d, %d)'%(test_df.count(), len(test_df.columns)))


train_df, (14000694, 5)
test_df, (5999569, 5)


In [8]:
from pyspark.ml.recommendation import ALS
#train for model
rec = ALS(maxIter=10, regParam=0.01, userCol='userId', itemCol='movieId_num', ratingCol='rating', nonnegative=True,
                 coldStartStrategy='drop')
rs_model = rec.fit(train_df)


In [9]:
#test for model
test = test_df.select(['userId','movieId','movieId_num'])
test_pred = rs_model.transform(test)
test_pred.show()



+------+-------+-----------+----------+
|userId|movieId|movieId_num|prediction|
+------+-------+-----------+----------+
|  1645|     17|      148.0|  4.502316|
|  2142|     17|      148.0| 3.9637456|
| 10206|     17|      148.0| 3.7727768|
| 15957|     17|      148.0| 3.6664586|
| 16861|     17|      148.0|  3.666688|
| 21700|     17|      148.0| 4.0751953|
| 23364|     17|      148.0| 3.5971375|
| 24171|     17|      148.0| 3.4126422|
| 34061|     17|      148.0| 3.9972284|
| 34234|     17|      148.0|  3.999655|
| 37307|     17|      148.0|   3.05231|
| 38868|     17|      148.0| 4.1327543|
| 40107|     17|      148.0|  5.029877|
| 40653|     17|      148.0| 3.0894504|
| 43852|     17|      148.0| 3.9682527|
| 61051|     17|      148.0| 3.3542128|
| 64822|     17|      148.0|  3.730898|
| 67376|     17|      148.0| 3.1617599|
| 69048|     17|      148.0| 3.7926226|
| 70097|     17|      148.0| 3.5424604|
+------+-------+-----------+----------+
only showing top 20 rows



In [10]:
#test rmse for model
from pyspark.ml.evaluation import RegressionEvaluator

test_pred = rs_model.transform(test_df)

evaluate_result = RegressionEvaluator(metricName='rmse', predictionCol='prediction', labelCol='rating')
rmse = evaluate_result.evaluate(test_pred)
print('test rmse is %f'%rmse)


test rmse is 0.812720


In [11]:
#e.g. recommend movie for user 66
# total movies
nunique_movies = new_df.select('movieId').distinct()
a = nunique_movies.alias('a')
user_id = 66
# user_id = 66，看过的电影
watched_movies = new_df.filter(new_df['userId'] == user_id).select('movieId').distinct()
b = watched_movies.alias('b')
# a join b
total_movies = a.join(b, a.movieId == b.movieId, how='left')
user_66_not_watched_movies = total_movies.where(col('b.movieId').isNull()).select(a.movieId).distinct()
user_66_not_watched_movies = user_66_not_watched_movies.withColumn('userId', lit(int(user_id)))
user_66_not_watched_movies.show(10, False)

+-------+------+
|movieId|userId|
+-------+------+
|148    |66    |
|463    |66    |
|471    |66    |
|496    |66    |
|833    |66    |
|1088   |66    |
|1238   |66    |
|1342   |66    |
|1591   |66    |
|1645   |66    |
+-------+------+
only showing top 10 rows



In [12]:
#recommendation for all users
rec = rs_model.recommendForAllUsers(10)

In [13]:
df_rec= rec.toPandas()

In [14]:
df_rec = df_rec.sort_values(by='userId')

In [17]:
df_rec

Unnamed: 0,userId,recommendations
29648,1,"[(22066, 8.84438419342041), (20707, 8.31094455..."
120632,2,"[(20707, 12.660033226013184), (22066, 11.45084..."
35169,3,"[(16722, 9.65412712097168), (17848, 9.25004005..."
70733,4,"[(22228, 9.858083724975586), (20707, 9.6429424..."
45554,5,"[(20707, 11.174495697021484), (22617, 9.709850..."
...,...,...
107366,138489,"[(20707, 9.303915977478027), (22617, 7.7146339..."
17942,138490,"[(15122, 7.545895576477051), (20707, 7.2241487..."
17943,138491,"[(18904, 18.075780868530273), (17444, 13.94156..."
110210,138492,"[(17848, 11.822572708129883), (16722, 9.032630..."


In [16]:
val = df_rec.values

In [30]:
recommendation = dict()
for line in val:
    user = line[0]
    items = [r[0] for r in line[1]]
    recommendation[user] = items

In [31]:
import pickle
pickle_topk = {i:line for i,line in enumerate(topk_tl)}
fw = open('recommendation1.pkl','wb')  
pickle.dump(topk_tl, fw, -1)  


{1: [22066, 20707, 21242, 21097, 19264, 18631, 17848, 19262, 22228, 16722],
 2: [20707, 22066, 22617, 19264, 18731, 20163, 20459, 17453, 18763, 18555],
 3: [16722, 17848, 22066, 20707, 15756, 20766, 19793, 19264, 21242, 15293],
 4: [22228, 20707, 19264, 16024, 17848, 22222, 17113, 16736, 22617, 19262],
 5: [20707, 22617, 22228, 21995, 19264, 18979, 20459, 16024, 17442, 17113],
 6: [20707, 22228, 22617, 20459, 21097, 18631, 22681, 21995, 16736, 20673],
 7: [20707, 22228, 22617, 17453, 16632, 14956, 13819, 16305, 17823, 21784],
 8: [20707, 22617, 22228, 17848, 16722, 21995, 16632, 17658, 20459, 18183],
 9: [17453, 14333, 13211, 16426, 14956, 16722, 14633, 16632, 17650, 18068],
 10: [20707, 17453, 20459, 14956, 22617, 16632, 14887, 16305, 13211, 15943],
 11: [22066, 22228, 18904, 21242, 22617, 17113, 21097, 19264, 13282, 19262],
 12: [17848, 16722, 17312, 21066, 16736, 13979, 21097, 14919, 18904, 22228],
 13: [22066, 22228, 22617, 19264, 16024, 17848, 20707, 17113, 16722, 18904],
 14: [20