## Python使用SparkALS矩阵分解实现电影推荐

背景知识：
* 协同过滤：简单来说是利用某兴趣相投、拥有共同经验之群体的喜好来推荐用户感兴趣的信息，即群体的智慧
* 矩阵分解：将（用户、物品、行为）矩阵分解成（用户、隐向量）和（物品，隐向量）两个子矩阵，通过隐向量实现推荐
* ALS：交替最小二乘法，先假设U的初始值U(0)，可以根据U(0)可以计算出V(0)，再根据V(0)计算出U(1)，迭代到收敛

演示目标：
1. 实现矩阵分解，得到user embedding和item embedding
2. 对于目标user，近邻搜索得到推荐的item列表（需要去除已看、需要查询电影名称）

延伸：
1. user embedding自身的搜索，可以实现兴趣相投的人的推荐
2. item embedding自身的搜索，可以实现相关推荐

In [1]:
import pandas as pd
import numpy as np
import json

import findspark
findspark.init()

from pyspark.sql import SparkSession

### 1. Pyspark读取CSV数据

In [2]:
spark = SparkSession \
    .builder \
    .appName("PySpark ALS") \
    .getOrCreate()

sc = spark.sparkContext

In [4]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

In [5]:
# 指定excel的解析字段类型
customSchema = T.StructType([
    T.StructField("userId", T.IntegerType(), True),        
    T.StructField("movieId", T.IntegerType(), True),
    T.StructField("rating", T.FloatType(), True),
    T.StructField("timestamp", T.LongType(), True),
])

In [6]:
df = spark.read.csv(
    "./datas/ml-latest-small/ratings.csv", 
    header=True,
    schema=customSchema
)
df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [7]:
df.select("userId").distinct().count()

610

In [8]:
df.select("movieId").distinct().count()

9724

In [9]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)
 |-- timestamp: long (nullable = true)



### 2. 实现SparkALS的矩阵分解

In [10]:
from pyspark.ml.recommendation import ALS

In [11]:
als = ALS(
    maxIter=5, 
    regParam=0.01, 
    userCol="userId", 
    itemCol="movieId", 
    ratingCol="rating",
    coldStartStrategy="drop")

# 实现训练
model = als.fit(df)

#### 保存user embedding

In [12]:
model.userFactors.show(5)

+---+--------------------+
| id|            features|
+---+--------------------+
| 10|[-0.8156561, -0.4...|
| 20|[-0.75606096, -0....|
| 30|[-0.46179363, 0.0...|
| 40|[0.58260286, -0.5...|
| 50|[0.13523939, -0.5...|
+---+--------------------+
only showing top 5 rows



In [13]:
model.userFactors.count()

610

In [14]:
model.userFactors.select("id", "features") \
           .toPandas() \
           .to_csv('./datas/movielens_sparkals_user_embedding.csv', index=False)

#### 保存item embedding

In [15]:
model.itemFactors.show(5)

+---+--------------------+
| id|            features|
+---+--------------------+
| 10|[-0.34947553, -2....|
| 20|[0.28214583, -1.0...|
| 30|[0.5308755, -3.25...|
| 40|[0.019388972, -0....|
| 50|[-0.32320693, -3....|
+---+--------------------+
only showing top 5 rows



In [16]:
model.itemFactors.count()

9724

In [17]:
model.itemFactors.select("id", "features") \
           .toPandas() \
           .to_csv('./datas/movielens_sparkals_item_embedding.csv', index=False)

### 4. 对于给定用户算出可能最喜欢的10个电影

思路：
1. 查询目标用户的embedding
2. 计算目标用户embedding跟所有movie embedding的sim value
3. 计算用户看过的集合
4. 第2步骤过滤掉看过的集合，然后挑选出前10个电影

In [18]:
# 目标用户ID
target_user_id = 1

#### 4.1 读取多份数据 

In [19]:
df_movie = pd.read_csv("./datas/ml-latest-small/movies.csv")
df_movie_embedding = pd.read_csv("./datas/movielens_sparkals_item_embedding.csv")
df_rating = pd.read_csv("./datas/ml-latest-small/ratings.csv")
df_user_embedding = pd.read_csv("./datas/movielens_sparkals_user_embedding.csv")

In [20]:
# embedding从字符串向量化
df_movie_embedding["features"] = df_movie_embedding["features"].map(lambda x : np.array(json.loads(x)))
df_user_embedding["features"] = df_user_embedding["features"].map(lambda x : np.array(json.loads(x)))

#### 4.2 查询用户的embedding

In [21]:
df_user_embedding.head(3)

Unnamed: 0,id,features
0,10,"[-0.8156561255455017, -0.43886810541152954, 0...."
1,20,"[-0.7560609579086304, -0.798147439956665, 0.19..."
2,30,"[-0.4617936313152313, 0.014611267484724522, 0...."


In [22]:
user_embedding = df_user_embedding[df_user_embedding["id"] == target_user_id].iloc[0, 1]
user_embedding

array([-0.67742395, -0.35053211,  0.28796506, -0.15564868,  0.22799958,
       -0.17749633,  0.31119213,  0.29915455, -0.46975872,  0.7388013 ])

#### 4.3 计算userembedding和所有itemembedding的相似度

In [23]:
df_movie_embedding.head(3)

Unnamed: 0,id,features
0,10,"[-0.34947553277015686, -2.3883156776428223, 0...."
1,20,"[0.28214582800865173, -1.0096689462661743, 0.8..."
2,30,"[0.5308755040168762, -3.254995107650757, 0.523..."


In [24]:
# 余弦相似度
from scipy.spatial import distance
df_movie_embedding["sim_value"] = (
    df_movie_embedding["features"].map(lambda x : 1 - distance.cosine(user_embedding, x)))

In [25]:
df_movie_embedding.head(3)

Unnamed: 0,id,features,sim_value
0,10,"[-0.34947553277015686, -2.3883156776428223, 0....",0.804382
1,20,"[0.28214582800865173, -1.0096689462661743, 0.8...",0.679451
2,30,"[0.5308755040168762, -3.254995107650757, 0.523...",0.477883


#### 4.4 计算用户看过的movieId集合

In [26]:
df_rating.head(3)

Unnamed: 0,userId,movieId,rating,timestamp
0,1,1,4.0,964982703
1,1,3,4.0,964981247
2,1,6,4.0,964982224


In [27]:
# 筛选、查询单列、去重、变成set
watched_ids = set(df_rating[df_rating["userId"] == target_user_id]["movieId"].unique())
len(watched_ids)

232

#### 4.5 筛选出推荐的10个电影ID

In [28]:
df_movie_embedding.head(3)

Unnamed: 0,id,features,sim_value
0,10,"[-0.34947553277015686, -2.3883156776428223, 0....",0.804382
1,20,"[0.28214582800865173, -1.0096689462661743, 0.8...",0.679451
2,30,"[0.5308755040168762, -3.254995107650757, 0.523...",0.477883


In [29]:
# 筛选ID列表
df_target_movieIds = (
    df_movie_embedding[~df_movie_embedding["id"].isin(watched_ids)]
        .sort_values(by="sim_value", ascending=False)
        .head(10)
        [["id", "sim_value"]]
)
df_target_movieIds

Unnamed: 0,id,sim_value
7835,198,0.967769
3862,174053,0.963555
2438,7362,0.936003
4753,101864,0.934407
4601,61024,0.933375
7027,2517,0.932116
4001,1544,0.931911
4254,4954,0.9307
6731,118696,0.928198
9010,3259,0.928142


#### 4.6 查询ID的电影名称信息展现给用户

In [30]:
pd.merge(
    left=df_target_movieIds,
    right=df_movie,
    left_on="id",
    right_on="movieId"
)[["movieId", "title", "genres", "sim_value"]]

Unnamed: 0,movieId,title,genres,sim_value
0,198,Strange Days (1995),Action|Crime|Drama|Mystery|Sci-Fi|Thriller,0.967769
1,174053,Black Mirror: White Christmas (2014),Drama|Horror|Mystery|Sci-Fi|Thriller,0.963555
2,7362,Taking Lives (2004),Crime|Drama|Thriller,0.936003
3,101864,Oblivion (2013),Action|Adventure|Sci-Fi|IMAX,0.934407
4,61024,Pineapple Express (2008),Action|Comedy|Crime,0.933375
5,2517,Christine (1983),Horror,0.932116
6,1544,"Lost World: Jurassic Park, The (1997)",Action|Adventure|Sci-Fi|Thriller,0.931911
7,4954,Ocean's Eleven (a.k.a. Ocean's 11) (1960),Comedy|Crime,0.9307
8,118696,The Hobbit: The Battle of the Five Armies (2014),Adventure|Fantasy,0.928198
9,3259,Far and Away (1992),Adventure|Drama|Romance,0.928142
