## 任务４：用pyspark读取hive表，计算最近3个月的活跃用户　　

其中活跃用户定义为：至少有3次电影评分，且至少有1次为电影打标签

In [30]:
# -*- coding: utf-8 -*
from pyspark.sql import SparkSession, HiveContext

spark = SparkSession.builder.enableHiveSupport().appName("test").getOrCreate()

In [33]:
# 读取hive表
movie = spark.read.table('test.movie')
rating = spark.read.table('test.rating')
tag = spark.read.table('test.tag')

# 去除第一行的列名
movie = movie.where(movie['movieid'] > 0)
rating = rating.where(rating['time_stamp'] > 1)
tag = tag.where(tag['time_stamp'] > 1)

In [46]:
# 日期转换函数
import time
def timestamp2datatime(time_stamp):
    timeArray = time.localtime(int(time_stamp))
    otherStyleTime = time.strftime("%Y%m%d%H%M%S", timeArray)
    return otherStyleTime

# 注册udf函数
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
timestamp2datatime = udf(timestamp2datatime, returnType=StringType())
rating1 = rating.withColumn("data_time", timestamp2datatime(rating["time_stamp"]))
tag1 = tag.withColumn("data_time", timestamp2datatime(tag["time_stamp"]))

In [42]:
rating1.orderBy(rating1.data_time.desc()).show(10)

+------+-------+------+----------+--------------+
|userid|movieid|rating|time_stamp|     data_time|
+------+-------+------+----------+--------------+
| 85523| 149406|   4.5|1574327703|20191121171503|
|  8642| 122914|   4.0|1574327549|20191121171229|
| 85523| 204704|   4.0|1574327533|20191121171213|
|  8642| 202101|   2.0|1574327512|20191121171152|
| 85523| 168252|   4.0|1574327479|20191121171119|
| 85523| 109487|   4.5|1574327445|20191121171045|
| 85523| 195165|   3.0|1574327424|20191121171024|
| 85523|  60069|   4.5|1574327406|20191121171006|
| 85523| 122912|   4.5|1574327399|20191121170959|
| 85523|  59315|   4.0|1574327393|20191121170953|
+------+-------+------+----------+--------------+
only showing top 10 rows



### 评分表中最近的日期为20191121，因此以该日期为最近一天，查询最近3个月的活跃用户  
 其中活跃用户定义为：至少有3次电影评分，且至少有1次为电影打标签

In [45]:
rating2 = rating1.where(rating1['data_time']>'20190821000000')
rating2.count()

328573

In [53]:
# 近三个月为电影打标签的用户id
tag2 = tag1.where(tag1['data_time']>'20190821000000')
from pyspark.sql import functions as F 
userid_list = tag2.select(F.collect_set('userid').alias('userid')).first()['userid']
print(type(userid_list))
print(userid_list[:10])

<class 'list'>
['61920', '144421', '138288', '8164', '28583', '3150', '90103', '65716', '46110', '46633']


In [56]:
# 1. 在评分表中标注一下近三个月打过标签的用户id
def tag_user(userid):
    if userid in userid_list:
        return 1
    else:
        return 0

from pyspark.sql.types import IntegerType
tag_user = udf(tag_user, returnType=IntegerType())
rating3 = rating2.withColumn("tag", tag_user(rating2["userid"]))

In [68]:
# 2. 查询近三个月至少有三次电影评分的用户，即为活跃用户
result = rating3.groupBy("userid").agg(
    F.count("userid").alias("user_count"),
    F.sum("tag").alias("tag_sum")
)

final_result = result.where(result['user_count']>=3).where(result['tag_sum'] >= 1)
final_result.show()

+------+----------+-------+
|userid|user_count|tag_sum|
+------+----------+-------+
| 38672|         8|      8|
| 57085|        20|     20|
|100735|         9|      9|
|101272|       199|    199|
|116194|       141|    141|
| 35095|         6|      6|
| 39692|        31|     31|
| 77965|        33|     33|
|114553|        94|     94|
| 29301|        35|     35|
|134633|        33|     33|
| 30815|       343|    343|
| 36159|        23|     23|
| 50929|       280|    280|
|113800|        83|     83|
|116582|       165|    165|
|   647|        16|     16|
| 45004|         4|      4|
| 98738|        35|     35|
|137652|        67|     67|
+------+----------+-------+
only showing top 20 rows



In [69]:
# 最终的活跃用户
final_result.select("userid").show()

+------+
|userid|
+------+
| 38672|
| 57085|
|100735|
|101272|
|116194|
| 35095|
| 39692|
| 77965|
|114553|
| 29301|
|134633|
| 30815|
| 36159|
| 50929|
|113800|
|116582|
|   647|
| 45004|
| 98738|
|137652|
+------+
only showing top 20 rows

