In [28]:
import pandas as pd
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [23]:
df = spark.read.format("csv").load("ml-latest-small/ratings.csv", inferSchema=True, header=True)
df.show(5)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|     31|   2.5|1260759144|
|     1|   1029|   3.0|1260759179|
|     1|   1061|   3.0|1260759182|
|     1|   1129|   2.0|1260759185|
|     1|   1172|   4.0|1260759205|
+------+-------+------+----------+
only showing top 5 rows



In [26]:
df.dtypes

[('userId', 'int'),
 ('movieId', 'int'),
 ('rating', 'double'),
 ('timestamp', 'int')]

In [24]:
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating")

In [25]:
model = als.fit(df)

In [29]:
def show_recommend(results, rbase, rtarget):
    df_show = []
    for result in results:
        for rec in result['recommendations']:
            df_show.append([result[rbase], rec[rtarget], rec['rating']])
    return pd.DataFrame(df_show, columns=[rbase, rtarget, "rating"])

ユーザーベースのレコメンド

In [30]:
show_recommend(model.recommendForAllUsers(5).collect()[:2], 'userId', 'movieId')

Unnamed: 0,userId,movieId,rating
0,471,83318,5.090275
1,471,67504,5.090275
2,471,83411,5.090275
3,471,83359,5.090275
4,471,59684,4.822518
5,463,67504,5.158423
6,463,83318,5.158423
7,463,83411,5.158423
8,463,83359,5.158423
9,463,26400,4.642581


アイテムベースのレコメンド

In [31]:
show_recommend(model.recommendForAllItems(5).collect()[:2], 'movieId', 'userId')

Unnamed: 0,movieId,userId,rating
0,1580,46,5.295887
1,1580,113,4.964395
2,1580,543,4.841971
3,1580,145,4.776155
4,1580,656,4.758034
5,5300,106,5.190446
6,5300,296,4.872449
7,5300,558,4.851921
8,5300,4,4.810745
9,5300,530,4.805775
