# 电影评分数据分析案例

电影评分数据集：
```
u.data -- 
The full u data set, 100000 ratings by 943 users on 1682 items.
Each user has rated at least 20 movies.
Users and items are numbered consecutively from 1.
The data is randomly ordered.
This is a tab separated list of:
user id | item id | rating | timestamp.
```
我们可以了解到：
* 数据集包含 943个用户对1682部电影的100000条评分数据
* 每个用户至少对20部电影进行了评分
* 电影ID和用户ID都是从1开始的连续编号
* 数据是随机排序的
* 数据用tab进行分割，并且包含：用户ID、电影ID、评分、时间

需求：
* 查询用户平均分
* 查询电影平均分
* 查询大于平均分的电影的数量
* 查询高分电影（>3）中打分次数最多的用户，并求出此人打的平均分
* 查询每个用户的平均打分、最低打分、最高打分
* 查询被评分超过100次的电影的平均分排名前10的电影

## 加载数据并简单看一下

In [None]:
from pyspark.sql.types import StructType, StringType, IntegerType

schema = StructType().add("user_id", StringType(), nullable=True).\
add("movie_id", StringType(), nullable=True).\
add("rank", IntegerType(), nullable=True).\
add("ts", StringType(), nullable=True)

df = spark.read.csv("/mnt/databrickscontainer1/u.data", schema=schema, sep="\t")

df.show()

## 使用DSL语法风格

In [None]:
import pyspark.sql.functions as F

* 查询用户平均分

In [None]:
display(df.groupBy("user_id").agg(F.mean("rank"), F.round(F.mean("rank"), 2).alias("avg_rank")).orderBy("avg_rank",ascending=False))

* 查询电影平均分

In [None]:
display(df.groupBy("movie_id").agg(F.mean("rank"), F.round(F.mean("rank"), 2).alias("avg_rank")).orderBy("avg_rank",ascending=False))

* 查询大于平均分的电影的数量

In [None]:
print(df.where(df["rank"] > df.select(F.mean("rank").alias("rank")).first()["rank"]).groupBy("movie_id").count().count())

display(df.where(df["rank"] > df.select(F.mean("rank").alias("rank")).first()["rank"]).groupBy("movie_id").count())

* 查询高分电影（>3）中打分次数最多的用户，并求出此人打的平均分

In [None]:
print(df.where("rank > 3").groupBy("user_id").count().orderBy("count", ascending=False).first()["user_id"])

display(df.where(df["user_id"] == df.where("rank > 3").groupBy("user_id").count().orderBy("count", ascending=False).first()["user_id"]).agg(F.mean("rank")))

* 查询每个用户的平均打分、最低打分、最高打分

In [None]:
display(df.groupBy("user_id").agg(F.mean("rank"), F.min("rank"), F.max("rank")))

* 查询被评分超过100次的电影的平均分排名前10的电影

In [None]:
df.groupBy("movie_id").agg(F.count("movie_id").alias("count"),F.mean("rank").alias("mean_rank")).show()

In [None]:
df.groupBy("movie_id").agg(F.count("movie_id").alias("count"),F.mean("rank").alias("mean_rank")).where("count > 100").show()

In [None]:
display(df.groupBy("movie_id").agg(F.count("movie_id").alias("count"),F.mean("rank").alias("mean_rank")).where("count > 100").orderBy("mean_rank", ascending=False).limit(10))

## 使用SQL语法风格

In [None]:
df.createOrReplaceTempView("movie_rank")

* 查询用户平均分

In [None]:
%sql
select user_id, mean(rank), round(mean(rank),2) from movie_rank group by user_id order by mean(rank) desc

* 查询电影平均分

In [None]:
%sql
select movie_id, mean(rank), round(mean(rank),2) from movie_rank group by movie_id order by mean(rank) desc

* 查询大于平均分的电影的数量

In [None]:
%sql

select avg(rank) from movie_rank

In [None]:
%sql
select movie_id,count(*) from movie_rank where rank > (select avg(rank) from movie_rank) group by movie_id

In [None]:
%sql
select count(*) from (
select movie_id,count(*) from movie_rank where rank > (select avg(rank) from movie_rank) group by movie_id
)

* 查询高分电影（>3）中打分次数最多的用户，并求出此人打的平均分

In [None]:
%sql

select user_id from movie_rank where rank > 3 group by user_id order by count(1) desc limit 1

In [None]:
%sql
select user_id, avg(rank), round(avg(rank), 2) from movie_rank where user_id = (select user_id from movie_rank where rank > 3 group by user_id order by count(1) desc limit 1) group by user_id

* 查询每个用户的平均打分、最低打分、最高打分

In [None]:
%sql
select user_id, avg(rank), min(rank), max(rank) from movie_rank group by user_id

* 查询被评分超过100次的电影的平均分排名前10的电影

In [None]:
%sql
select movie_id, count(*), mean(rank) from movie_rank group by movie_id having count(1) > 100 order by 3 desc limit 10