In [1]:
""" 导入模块 """
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, IndexToString
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

### 创建SparkSession对象
spark = SparkSession.builder.appName('lin_reg').getOrCreate()

In [2]:
""" 读取数据集 """
df = spark.read.csv(r"G:\LKM\PySark机器学习、自然语言处理与推荐系统\推荐系统\movie_ratings_df.csv", inferSchema=True, header=True)

### 验证数据集的形状结构
(df.count(), len(df.columns))

(100000, 3)

In [3]:
### 验证输入值的数据类型
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)



In [6]:
### 使用rand函数查看DataFrame中的几行数据
df.orderBy(rand()).show(10, False)

+------+--------------------------------------+------+
|userId|title                                 |rating|
+------+--------------------------------------+------+
|581   |Big Night (1996)                      |5     |
|725   |Leaving Las Vegas (1995)              |4     |
|16    |Aladdin (1992)                        |5     |
|830   |Toy Story (1995)                      |4     |
|750   |Devil's Own, The (1997)               |3     |
|149   |Cop Land (1997)                       |2     |
|561   |Interview with the Vampire (1994)     |1     |
|174   |Monty Python and the Holy Grail (1974)|1     |
|399   |Godfather, The (1972)                 |2     |
|416   |Godfather, The (1972)                 |5     |
+------+--------------------------------------+------+
only showing top 10 rows



In [7]:
### 用户降序
df.groupBy('userId').count().orderBy('count', ascending=False).show(10, False)

+------+-----+
|userId|count|
+------+-----+
|405   |737  |
|655   |685  |
|13    |636  |
|450   |540  |
|276   |518  |
|416   |493  |
|537   |490  |
|303   |484  |
|234   |480  |
|393   |448  |
+------+-----+
only showing top 10 rows



In [8]:
### 用户升序
df.groupBy('userId').count().orderBy('count', ascending=True).show(10, False)

+------+-----+
|userId|count|
+------+-----+
|732   |20   |
|631   |20   |
|572   |20   |
|93    |20   |
|300   |20   |
|636   |20   |
|926   |20   |
|596   |20   |
|685   |20   |
|34    |20   |
+------+-----+
only showing top 10 rows



In [None]:
### title计数降序
df.groupBy('title').count().orderBy('count', ascending=False).show(10, False)

In [None]:
### title计数升序
df.groupBy('title').count().orderBy('count', ascending=True).show(10, False)

In [None]:
""" 特征工程 """
### 对title列输出新的列
stringIndexer = StringIndexer(inputCol='title', outputCol='title_new')
model = stringIndexer.fit(df)
indexed = model.transform(df)

indexed.show(10)

In [None]:
### 重新验证电影计数
indexed.groupBy('title_new').count().orderBy('count', ascending=False).show(10, False)

In [None]:
""" 划分数据集 """
train, test = indexed.randomSplit([0.75, 0.25])

train.count()

In [None]:
""" 构建和训练推荐系统模型 """
rec = ALS(maxIter=10, regParam=0.01, userCol='userId', itemCol='title_new', ratingCol='rating', nonnegative=True, coldStartStrategy='drop')
rec_model = rec.fit(train)

In [None]:
""" 基于测试数据进行预测和评估 """
predicted_ratings = rec_model.transform(test)
predicted_ratings.printSchema()

In [None]:
""" 评估模型 """
evaluator = RegressionEvaluator(metricName='rmse', predictionCol='prediction', labelCol='rating')
rmse = evaluator.evaluate(predicted_ratings)
rmse

In [None]:
""" 创建存储独立的电影的DataFrame """
unique_movies = indexed.select('title_new').distinct()
unique_movies.count()

In [None]:
a = unique_movies.alias('a')
a.show()

In [None]:
""" 以userId=85为例子 """
user_id = 85

In [None]:
""" 过滤活动用户已经评过分或已经观看过的电影 """
watched_movies = indexed.filter(indexed['userId'] == user_id).select('title_new').distinct()
watched_movies.count()

In [None]:
b = watched_movies.alias('b')
b.show()

In [None]:
""" 通过过滤空值找出推荐的电影 """
total_movies = a.join(b, a.title_new == b.title_new, how='left')
total_movies.show(10, False)

In [None]:
remaining_movies = total_movies.where(col('b.title_new').isNull()).select(a.title_new).distinct()
remaining_movies.count()

In [None]:
remaining_movies = remaining_movies.withColumn('userId', lit(int(user_id)))
remaining_movies.show(10, False)

In [None]:
""" 过滤具有最高预测评分的一些排在前面的推荐影片 """
recommendations = rec_model.transform(remaining_movies).orderBy('prediction', ascending=False)
recommendations.show(5, False)

In [None]:
""" 使用IndexToString函数来创建一个可以返回电影名称的额外列 """
movie_title = IndexToString(inputCol='title_new', outputCol='title', labels=model.labels)
final_recommendations = movie_title.transform(recommendations)
final_recommendations.show(10, False)

In [9]:
""" 使用IndexToString函数来创建一个可以返回电影名称的额外列 """
movie_title = IndexToString(inputCol='title_new', outputCol='title', labels=model.labels)
final_recommendations = movie_title.transform(recommendations)
final_recommendations.show(10, False)

+-----------------------------+-----+
|title                        |count|
+-----------------------------+-----+
|Star Wars (1977)             |583  |
|Contact (1997)               |509  |
|Fargo (1996)                 |508  |
|Return of the Jedi (1983)    |507  |
|Liar Liar (1997)             |485  |
|English Patient, The (1996)  |481  |
|Scream (1996)                |478  |
|Toy Story (1995)             |452  |
|Air Force One (1997)         |431  |
|Independence Day (ID4) (1996)|429  |
+-----------------------------+-----+
only showing top 10 rows



In [10]:
### title计数升序
df.groupBy('title').count().orderBy('count', ascending=True).show(10, False)

+-----------------------------------------+-----+
|title                                    |count|
+-----------------------------------------+-----+
|Aiqing wansui (1994)                     |1    |
|Mad Dog Time (1996)                      |1    |
|Lashou shentan (1992)                    |1    |
|Fear, The (1995)                         |1    |
|Next Step, The (1995)                    |1    |
|Target (1995)                            |1    |
|Leopard Son, The (1996)                  |1    |
|Vie est belle, La (Life is Rosey) (1987) |1    |
|Modern Affair, A (1995)                  |1    |
|JLG/JLG - autoportrait de d�cembre (1994)|1    |
+-----------------------------------------+-----+
only showing top 10 rows



In [11]:
""" 特征工程 """
### 对title列输出新的列
stringIndexer = StringIndexer(inputCol='title', outputCol='title_new')
model = stringIndexer.fit(df)
indexed = model.transform(df)

indexed.show(10)

+------+------------+------+---------+
|userId|       title|rating|title_new|
+------+------------+------+---------+
|   196|Kolya (1996)|     3|    287.0|
|    63|Kolya (1996)|     3|    287.0|
|   226|Kolya (1996)|     5|    287.0|
|   154|Kolya (1996)|     3|    287.0|
|   306|Kolya (1996)|     5|    287.0|
|   296|Kolya (1996)|     4|    287.0|
|    34|Kolya (1996)|     5|    287.0|
|   271|Kolya (1996)|     4|    287.0|
|   201|Kolya (1996)|     4|    287.0|
|   209|Kolya (1996)|     4|    287.0|
+------+------------+------+---------+
only showing top 10 rows



In [12]:
### 重新验证电影计数
indexed.groupBy('title_new').count().orderBy('count', ascending=False).show(10, False)

+---------+-----+
|title_new|count|
+---------+-----+
|0.0      |583  |
|1.0      |509  |
|2.0      |508  |
|3.0      |507  |
|4.0      |485  |
|5.0      |481  |
|6.0      |478  |
|7.0      |452  |
|8.0      |431  |
|9.0      |429  |
+---------+-----+
only showing top 10 rows



In [13]:
""" 划分数据集 """
train, test = indexed.randomSplit([0.75, 0.25])

train.count()

75012

In [14]:
""" 构建和训练推荐系统模型 """
rec = ALS(maxIter=10, regParam=0.01, userCol='userId', itemCol='title_new', ratingCol='rating', nonnegative=True, coldStartStrategy='drop')
rec_model = rec.fit(train)

In [15]:
""" 基于测试数据进行预测和评估 """
predicted_ratings = rec_model.transform(test)
predicted_ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- title_new: double (nullable = false)
 |-- prediction: float (nullable = false)



In [17]:
""" 评估模型 """
evaluator = RegressionEvaluator(metricName='rmse', predictionCol='prediction', labelCol='rating')
rmse = evaluator.evaluate(predicted_ratings)
rmse

1.0223097211337286

In [18]:
""" 创建存储独立的电影的DataFrame """
unique_movies = indexed.select('title_new').distinct()
unique_movies.count()

1664

In [19]:
a = unique_movies.alias('a')
a.show()

+---------+
|title_new|
+---------+
|    305.0|
|    596.0|
|    299.0|
|    769.0|
|    692.0|
|    934.0|
|   1051.0|
|    496.0|
|    558.0|
|    170.0|
|    184.0|
|    576.0|
|    147.0|
|    810.0|
|    720.0|
|    782.0|
|   1369.0|
|   1587.0|
|    160.0|
|    608.0|
+---------+
only showing top 20 rows



In [20]:
""" 以userId=85为例子 """
user_id = 85

In [21]:
""" 过滤活动用户已经评过分或已经观看过的电影 """
watched_movies = indexed.filter(indexed['userId'] == user_id).select('title_new').distinct()
watched_movies.count()

287

In [22]:
b = watched_movies.alias('b')
b.show()

+---------+
|title_new|
+---------+
|    305.0|
|    558.0|
|    147.0|
|     70.0|
|     67.0|
|      8.0|
|    168.0|
|     69.0|
|      0.0|
|    249.0|
|    365.0|
|    142.0|
|    724.0|
|   1131.0|
|    154.0|
|    112.0|
|    124.0|
|    997.0|
|    253.0|
|    331.0|
+---------+
only showing top 20 rows



In [23]:
""" 通过过滤空值找出推荐的电影 """
total_movies = a.join(b, a.title_new == b.title_new, how='left')
total_movies.show(10, False)

+---------+---------+
|title_new|title_new|
+---------+---------+
|305.0    |305.0    |
|596.0    |null     |
|299.0    |null     |
|769.0    |null     |
|692.0    |null     |
|934.0    |null     |
|1051.0   |null     |
|496.0    |null     |
|558.0    |558.0    |
|170.0    |null     |
+---------+---------+
only showing top 10 rows



In [25]:
remaining_movies = total_movies.where(col('b.title_new').isNull()).select(a.title_new).distinct()
remaining_movies.count()

1377

In [26]:
remaining_movies = remaining_movies.withColumn('userId', lit(int(user_id)))
remaining_movies.show(10, False)

+---------+------+
|title_new|userId|
+---------+------+
|596.0    |85    |
|299.0    |85    |
|769.0    |85    |
|692.0    |85    |
|934.0    |85    |
|1051.0   |85    |
|496.0    |85    |
|170.0    |85    |
|184.0    |85    |
|576.0    |85    |
+---------+------+
only showing top 10 rows



In [27]:
""" 过滤具有最高预测评分的一些排在前面的推荐影片 """
recommendations = rec_model.transform(remaining_movies).orderBy('prediction', ascending=False)
recommendations.show(5, False)

+---------+------+----------+
|title_new|userId|prediction|
+---------+------+----------+
|1207.0   |85    |5.16905   |
|1198.0   |85    |4.6603785 |
|928.0    |85    |4.658463  |
|1411.0   |85    |4.649621  |
|1289.0   |85    |4.638052  |
+---------+------+----------+
only showing top 5 rows



In [28]:
""" 使用IndexToString函数来创建一个可以返回电影名称的额外列 """
movie_title = IndexToString(inputCol='title_new', outputCol='title', labels=model.labels)
final_recommendations = movie_title.transform(recommendations)
final_recommendations.show(10, False)

+---------+------+----------+------------------------------------------------------------------+
|title_new|userId|prediction|title                                                             |
+---------+------+----------+------------------------------------------------------------------+
|1207.0   |85    |5.16905   |Aparajito (1956)                                                  |
|1198.0   |85    |4.6603785 |Pather Panchali (1955)                                            |
|928.0    |85    |4.658463  |Paradise Lost: The Child Murders at Robin Hood Hills (1996)       |
|1411.0   |85    |4.649621  |Boys, Les (1997)                                                  |
|1289.0   |85    |4.638052  |World of Apu, The (Apur Sansar) (1959)                            |
|1347.0   |85    |4.5744815 |Angel Baby (1995)                                                 |
|1393.0   |85    |4.5722694 |Schizopolis (1996)                                                |
|1518.0   |85    |4.4683747 |S