In [1]:
#-*- coding=utf-8 -*-
"""
spark_itemCF
Author: shi zheyang
Date:2020/6/4
"""
from pyspark.sql import SparkSession
from pyspark import SparkContext
from operator import itemgetter
import numpy as np
from pyspark.sql.functions import *
import time
# 设置每部电影取至多前K相关部电影,及每个用户至多推荐N部没看过的电影
K = 30
N = 50

In [2]:
# 加载数据
start = time.time()
spark = SparkSession.builder.appName("spark_itemCF").\
config('spark.driver.memory', '12g').getOrCreate() # config('spark.executor.memory','.12g').
ratings = spark.read.option("header", "true").csv("../data/ml-20m/ratings.csv")
ratings.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|      2|   3.5|1112486027|
|     1|     29|   3.5|1112484676|
|     1|     32|   3.5|1112484819|
|     1|     47|   3.5|1112484727|
|     1|     50|   3.5|1112484580|
|     1|    112|   3.5|1094785740|
|     1|    151|   4.0|1094785734|
|     1|    223|   4.0|1112485573|
|     1|    253|   4.0|1112484940|
|     1|    260|   4.0|1112484826|
|     1|    293|   4.0|1112484703|
|     1|    296|   4.0|1112484767|
|     1|    318|   4.0|1112484798|
|     1|    337|   3.5|1094785709|
|     1|    367|   3.5|1112485980|
|     1|    541|   4.0|1112484603|
|     1|    589|   3.5|1112485557|
|     1|    593|   3.5|1112484661|
|     1|    653|   3.0|1094785691|
|     1|    919|   3.5|1094785621|
+------+-------+------+----------+
only showing top 20 rows



In [3]:
# 基本的数据统计
# 转成相应的类型
ratings_cast = ratings.select(col("userId").cast('int'), col("movieId").cast('int'), col("rating").cast('float'))
# 打印出dataframe的结构
ratings_cast.printSchema()
# 统计各列的大致情况
def count_distinct(dataframe):
    """
    Args:
        dataframe: spark dataframe 
    Returns:
    """
    for column in dataframe.columns:
        print(column, "个数为:", dataframe.select(column).distinct().count())
count_distinct(ratings_cast)
ratings_cast = ratings_cast.na.replace('','NA')

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)

userId 个数为: 138493
movieId 个数为: 26744
rating 个数为: 10


In [4]:
# 切割为训练集和测试集
train, test = ratings_cast.randomSplit([0.8, 0.2], seed=2018)
print("训练集长度为：", train.count())
print("测试集长度为：", test.count())

训练集长度为： 16001063
测试集长度为： 3999200


In [5]:
# 计算得出相关矩阵wab = abs(N(a) and N(b)) / sqrt(N(a) or N(b)),得到物品的相似矩阵
# 计算每部电影看过的人数，计算一个流行度
num_raters_item = train\
.groupBy("movieId")\
.count()\
.alias("nri")\
.orderBy("movieId", ascending=True)
num_raters_item.show()

+-------+-----+
|movieId|count|
+-------+-----+
|      1|39724|
|      2|17813|
|      3|10174|
|      4| 2213|
|      5| 9715|
|      6|19079|
|      7|10373|
|      8| 1132|
|      9| 3103|
|     10|23148|
|     11|14501|
|     12| 3065|
|     13| 1191|
|     14| 4815|
|     15| 2330|
|     16|14038|
|     17|16577|
|     18| 4138|
|     19|16729|
|     20| 3299|
+-------+-----+
only showing top 20 rows



In [6]:
# 记录看过电影A，有看过电影B的数量
# 把每个用户的item集合收集起来
import math 
group_user = train\
.groupBy("userId")\
.agg(collect_set("movieId").alias("movieId_set"))
group_user.show()
group_user.printSchema()
# 两两取出itemlist中的物品进行，组合，然后标志为1，最后再进行相加
from pyspark.sql.types import ArrayType, IntegerType, StringType, StructType, StructField, FloatType
def compute_group(item_list):
    for i in item_list:
        for j in item_list:
            if i == j:
                continue
            else:
                count = 1 / math.log(1 + len(item_list)
                print(count)
                return i, j, 1 / math.log(1 + len(item_list)) # 惩罚热门的user
array_schema = StructType([
    StructField('movieId_i', StringType(), nullable=False),
    StructField('movieId_j', StringType(), nullable=False),
    StructField('count', IntegerType(), nullable=False)
])
item_pair_score = udf(lambda x: compute_group(x), array_schema)
# 定义一个结构列进行返回，然后使用下面方式，可以直接重命名为Strutype里面定义的名字
train_item_pair = group_user.withColumn("newCol", item_pair_score("movieId_set")).select("newCol.*")
train_item_pair.show()
# 将刚才的物品对进行合并
train_item_pair_count = train_item_pair.groupBy("movieId_i", "movieId_j").agg(sum("count").alias("score")).orderBy("score", ascending=False)
train_item_pair_count.show()
# 然后计算共现矩阵N ∩ M / srqt(N * M),
# 把每个电影的数量join到表中
final_train_ip_count_i = train_item_pair_count.join(num_raters_item.withColumnRenamed("movieId", "movieId_i").withColumnRenamed("count", "count_i"), "movieId_i")
final_train_ip_count = final_train_ip_count_i.join(num_raters_item.withColumnRenamed("movieId", "movieId_j").withColumnRenamed("count", "count_j"), "movieId_j")
final_train_ip_count = final_train_ip_count.filter("count_i > 0").filter("count_j > 0")
final_train_ip_count.show()
# 然后计算公式
from pyspark.sql.window import *
final_sim = final_train_ip_count.withColumn("result", col("score") / sqrt(col("count_i") * col("count_j"))).select("movieId_i", "movieId_j", "result")
final_sim = final_sim.withColumn("rank", row_number().over(Window.partitionBy("movieId_i").orderBy(desc("result")))).filter("rank <="+ str(K)).drop("rank")
final_sim = final_sim.orderBy("movieId_i", desc("result"))
final_sim.show()

+------+--------------------+
|userId|         movieId_set|
+------+--------------------+
|   148|[1453, 3987, 916,...|
|   463|[277, 235, 329, 4...|
|   471|[4189, 356, 3702,...|
|   496|[356, 1953, 1476,...|
|   833|[161, 589, 153, 3...|
|  1088|[102, 9, 640, 662...|
|  1238|[2186, 60, 2203, ...|
|  1342|[2019, 168, 4226,...|
|  1580|[915, 356, 4823, ...|
|  1591|[356, 1446, 32, 1...|
|  1645|[356, 589, 306, 5...|
|  1829|[356, 1222, 4643,...|
|  1959|[466, 3173, 2340,...|
|  2122|[44195, 589, 6542...|
|  2142|[996, 1083, 1104,...|
|  2366|[161, 539, 357, 5...|
|  2659|[2919, 204, 1222,...|
|  2866|[3660, 1809, 306,...|
|  3175|[2114, 6332, 7494...|
|  3749|[2716, 3093, 1221...|
+------+--------------------+
only showing top 20 rows

root
 |-- userId: integer (nullable = true)
 |-- movieId_set: array (nullable = true)
 |    |-- element: integer (containsNull = true)

+---------+---------+-----+
|movieId_i|movieId_j|count|
+---------+---------+-----+
|     1453|     3987|    0|
|     

In [7]:
# 通过相关矩阵来进行推荐
# 对于每一个用户，把他看过的电影以及相应评价取出来，然后找出对应的30部电影，每部相关电影得分是原始评分x相应评价
user_movie = ratings.select(col("userId").cast('int'), col("movieId").cast('int'), col("rating").cast('float')).select("userId", "movieId", "rating").withColumnRenamed("movieId", "movieId_i").join(final_sim, "movieId_i", "right").select("userId", "movieId_i", "rating","movieId_j", "result").orderBy("userId", "movieId_i", desc("result"))
user_movie.show()
# 通过原始评分x相应评价，得到新的ratings
compute_rating = udf(lambda x, y: x*y, FloatType())
final_sim_new = user_movie.withColumn("final_result", compute_rating('rating', 'result')).drop("rating", "result").orderBy("userId", desc("final_result"))
final_sim_new.show()
# left_anti 过滤出左表中，不在右表的部分，即去除掉已经看过的电影
final_sim_recommendation = final_sim_new.drop("movieId_i").withColumnRenamed("movieId_j", "movieId").join(ratings_cast.select("userId", "movieId"), ["userId", "movieId"], "left_anti")
print(final_sim_new.count())
final_sim_recommendation.show()
# 然后从中选取N个作为最后的推荐结果
final = final_sim_recommendation.withColumn("rank", row_number().over(Window.partitionBy("userId").orderBy(desc("final_result")))).filter("rank <=" + str(N)).drop("rank")
print(final.count())
final.show()


+------+---------+------+---------+------+
|userId|movieId_i|rating|movieId_j|result|
+------+---------+------+---------+------+
|     1|     1090|   4.0|     1374|   0.0|
|     1|     1090|   4.0|     2717|   0.0|
|     1|     1090|   4.0|     5418|   0.0|
|     1|     1090|   4.0|      648|   0.0|
|     1|     1090|   4.0|     1722|   0.0|
|     1|     1090|   4.0|     1222|   0.0|
|     1|     1090|   4.0|      800|   0.0|
|     1|     1090|   4.0|     2092|   0.0|
|     1|     1090|   4.0|      380|   0.0|
|     1|     1090|   4.0|     1207|   0.0|
|     1|     1090|   4.0|     2819|   0.0|
|     1|     1090|   4.0|    46578|   0.0|
|     1|     1090|   4.0|      836|   0.0|
|     1|     1090|   4.0|     1396|   0.0|
|     1|     1090|   4.0|     2688|   0.0|
|     1|     1090|   4.0|     3624|   0.0|
|     1|     1090|   4.0|     1084|   0.0|
|     1|     1090|   4.0|     3450|   0.0|
|     1|     1090|   4.0|      432|   0.0|
|     1|     1090|   4.0|     3654|   0.0|
+------+---

In [8]:
# 保存中间结果和最后的推荐列表
# 可以考虑Hbase，HDFS，及其他存储方式
# 中间结果
# df_csv.write.csv('../output/rating.csv', header = True, mode = 'error') #保存数据
# df_csv.write.json('../output/rating.json',mode = 'overwrite')
# final_sim.write.csv("final_sim.csv", header=True, mode="error")
# final\
# .groupBy("userId")\
# .agg(collect_set("movieId").alias("movie_list"))\
# .select("userId", col("movie_list").cast("string"))\
# .write\
# .csv("final_recommendation.csv", header=True, mode="error")

In [9]:
# 总共耗时
print("总时长：", time.time() - start, "s")

总时长： 369.71377754211426 s
