In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

In [2]:
spark = SparkSession.builder.appName('spark_sql_demo').getOrCreate()

In [3]:
sc = spark.sparkContext

In [4]:
rating_data_body = spark.read.format('CSV').option('header','true').load(r"E:\dataset\ml-20m\ratings.csv")

In [5]:
print(rating_data_body.first())

Row(userId='1', movieId='2', rating='3.5', timestamp='1112486027')


In [9]:
rating_data_body.registerTempTable("user") #必须要注册一下表名称，不然sql会找不到表
rownum = spark.sql('select count(*) from user')
print(rownum)

DataFrame[count(1): bigint]


In [10]:
print(rownum.first())

Row(count(1)=20000263)


In [13]:
print(rownum.schema)

StructType(List(StructField(count(1),LongType,false)))


In [19]:
print(rating_data_body)

DataFrame[userId: string, movieId: string, rating: string, timestamp: string]


In [20]:
rating_data_body = rating_data_body.withColumn("rating", rating_data_body['rating'].cast('float'))
print(rating_data_body)

DataFrame[userId: string, movieId: string, rating: float, timestamp: string]


In [21]:
avg_rate=rating_data_body.groupby("movieId").avg("rating") #求每部电影的平均评分，但是报rating" is not a numeric column. 
#Aggregation function can only be applied on a numeric column
print(avg_rate)

DataFrame[movieId: string, avg(rating): double]


In [23]:
print(avg_rate.first())

Row(movieId='296', avg(rating)=4.174231169217055)


In [26]:
sc.setCheckpointDir("./chkpt")
avg_rate.checkpoint()#由于计算量比较大，不存快照每次都要重新计算，所以调用检查点来坐一下缓存，但是报没有缓存目录

DataFrame[movieId: string, avg(rating): double]

In [28]:
print(avg_rate.limit(20).show())#打印前多少行使用limit来做

+-------+------------------+
|movieId|       avg(rating)|
+-------+------------------+
|    296| 4.174231169217055|
|   1090| 3.919977226720648|
|   3959| 3.699372603694667|
|   2294| 3.303207714257601|
|   6731|3.5571184995737424|
|  48738| 3.895868364160461|
|   3210|3.6711219879518073|
|  88140|3.5536100302637266|
|    467|3.3832658569500675|
|   2088| 2.562729584628426|
|   2069| 3.806294326241135|
|  50802|  2.85519801980198|
|    829|2.6765513454146075|
|   2136| 2.849462365591398|
|  89864|3.8558174523570714|
|   2904|3.5884353741496597|
|   4821|3.1852010265183917|
|  62912|2.3253676470588234|
|  55498|2.9166666666666665|
|   2162|2.4223394055608822|
+-------+------------------+

None


In [30]:
print(avg_rate.count())

26744


In [37]:
print(avg_rate.orderBy("avg(rating)",descending=False).show())

+-------+-----------+
|movieId|avg(rating)|
+-------+-----------+
| 131152|        0.5|
| 115631|        0.5|
| 124368|        0.5|
| 117523|        0.5|
| 116608|        0.5|
| 107106|        0.5|
| 120222|        0.5|
| 111146|        0.5|
| 131062|        0.5|
| 105481|        0.5|
| 125085|        0.5|
| 106866|        0.5|
| 109359|        0.5|
| 109355|        0.5|
|  59775|        0.5|
| 121463|        0.5|
|  84162|        0.5|
| 120763|        0.5|
| 111046|        0.5|
| 117630|        0.5|
+-------+-----------+
only showing top 20 rows

None


In [41]:
print(avg_rate.orderBy(["avg(rating)"], ascending=[0]).show()) #这样才能降序排列

+-------+-----------+
|movieId|avg(rating)|
+-------+-----------+
| 113947|        5.0|
| 108527|        5.0|
|  94431|        5.0|
| 113790|        5.0|
| 109253|        5.0|
| 117606|        5.0|
| 105191|        5.0|
| 106113|        5.0|
|  93707|        5.0|
|  95979|        5.0|
| 127256|        5.0|
| 109715|        5.0|
| 106082|        5.0|
| 113860|        5.0|
|  95517|        5.0|
|  27914|        5.0|
| 129305|        5.0|
|  98761|        5.0|
| 111548|        5.0|
| 129905|        5.0|
+-------+-----------+
only showing top 20 rows

None


In [54]:
#min_max_of_movie = rating_data_body.groupby('movieId').agg({"rating":["min","max"]}) #java.util.ArrayList cannot be cast to java.lang.String

#我想知道每个电影最高评分和最低评分之箭，这样的分组方式要怎么写呢，这个agg函数究竟怎么用呢
min_max_of_movie = rating_data_body.groupby('movieId').agg({"rating":"max","rating":"min"}) #参数是一个字典，不过两个键值对只有一个起作用。。


In [55]:
print(min_max_of_movie.limit(20).show())

+-------+-----------+
|movieId|min(rating)|
+-------+-----------+
|    296|        0.5|
|   1090|        0.5|
|   3959|        0.5|
|   2294|        0.5|
|   6731|        0.5|
|  48738|        0.5|
|   3210|        0.5|
|  88140|        0.5|
|    467|        0.5|
|   2088|        0.5|
|   2069|        0.5|
|  50802|        0.5|
|    829|        0.5|
|   2136|        0.5|
|  89864|        0.5|
|   2904|        1.0|
|   4821|        0.5|
|  62912|        0.5|
|  55498|        0.5|
|   2162|        0.5|
+-------+-----------+

None


In [None]:
min_max_of_movie = rating_data_body.groupby('movieId').agg({"rating":"max,min"})  
#只能是定义好的一个函数，不能是多个函数写一起，不过源码里面说可以是一个列表Undefined function: 'max,min'

In [None]:
min_max_of_movie = rating_data_body.groupby('movieId').agg({"rating":["max","min"]})  #ArrayList cannot be cast to java.lang.String
#折腾这么写东西说实话还是直接转成sql查询就好，更加简单方便，spark sql 可以用来做大数据量的统计分析，小数据量的使用pandas来做，这样就形成了互补
#spark sql就暂时先了解到这吧，实际运用中遇到什么问题再搜索就行了

In [61]:
spark.stop()   