此篇为spark-sql的处理方式，故先读取两个样本数据用来做例子。

In [1]:
spark.version

'2.4.0'

In [2]:
import pyspark.sql.functions as F
df = spark.read.text("/usr/local/spark/spark-2.4.0-bin-hadoop2.7/data/mllib/als/sample_movielens_ratings.txt") \
    .withColumn("info",F.expr("split(value,'::')"))\
    .selectExpr("cast(info[0] as int) as user_id","cast(info[1] as int) as movie_id","cast(info[2] as int) as rating")
df.show()

+-------+--------+------+
|user_id|movie_id|rating|
+-------+--------+------+
|      0|       2|     3|
|      0|       3|     1|
|      0|       5|     2|
|      0|       9|     4|
|      0|      11|     1|
|      0|      12|     2|
|      0|      15|     1|
|      0|      17|     1|
|      0|      19|     1|
|      0|      21|     1|
|      0|      23|     1|
|      0|      26|     3|
|      0|      27|     1|
|      0|      28|     1|
|      0|      29|     1|
|      0|      30|     1|
|      0|      31|     1|
|      0|      34|     1|
|      0|      37|     1|
|      0|      41|     2|
+-------+--------+------+
only showing top 20 rows



In [3]:
df.count()
df.selectExpr("count(distinct user_id)", "count(distinct movie_id)").show()

+-----------------------+------------------------+
|count(DISTINCT user_id)|count(DISTINCT movie_id)|
+-----------------------+------------------------+
|                     30|                     100|
+-----------------------+------------------------+



In [4]:
#生成两个表，并进行表注册
df.registerTempTable("tdl_table_total")
df.filter("movie_id<70").registerTempTable("tdl_table_1")
df.filter("movie_id>30").registerTempTable("tdl_table_2")

# 1、如何进行两个表的拼接（包括按行和按列）

In [5]:
#按行，需保证两个表格字段相同，union选取不重复的值进行操作，union all则保留重复值
spark.sql(
    """
    select * from tdl_table_1
    union
    select * from tdl_table_2
    """).count()

1501

In [6]:
spark.sql(
    """
    select * from tdl_table_1
    union all
    select * from tdl_table_2
    """).count()

2087

# 2、从表1中排除表2中存在的movie_id

In [7]:
# 使用left join
spark.sql(
    """
    select a.* from 
      tdl_table_1 a
    left join
      tdl_table_2 b
    on a.movie_id=b.movie_id
    where b.movie_id is null
    """).count()

471

In [8]:
# 使用not exists
spark.sql(
    """
    select a.* from 
      tdl_table_1 a
    where not exists (select movie_id from tdl_table_2 b where a.movie_id=b.movie_id)
    """).count()

471

# 3、选择每个user_id评分最高的movie_id

In [9]:
# 利用窗口函数
spark.sql(
    """
    select user_id,movie_id as favorite_movie from
      (select *,row_number() over(partition by user_id order by rating desc) as rn from tdl_table_1)
    where rn=1
    """).show()

+-------+--------------+
|user_id|favorite_movie|
+-------+--------------+
|     28|            12|
|     26|             7|
|     27|            18|
|     12|            17|
|     22|            22|
|      1|            62|
|     13|            18|
|      6|            25|
|     16|            51|
|      3|            51|
|     20|            22|
|      5|            55|
|     19|            32|
|     15|            46|
|      9|             7|
|     17|            17|
|      4|            29|
|      8|            29|
|     23|            27|
|      7|            25|
+-------+--------------+
only showing top 20 rows



# 4、去除重复项

In [10]:
# 使用distinct，但是这种方法可能会导致数据倾斜，效率不高
spark.sql(
    """
    select distinct * from
    (select * from tdl_table_1
    union all
    select * from tdl_table_2)t
    """).count()

1501

In [11]:
# 使用窗口函数，需要根据具体场景来定，排序效率不高
spark.sql(
    """
    select user_id,movie_id,rating from
      (select *,row_number() over(partition by user_id,movie_id,rating order by rating) as rn from
        (select * from tdl_table_1
        union all
        select * from tdl_table_2)t
      )t1
    where rn=1
    """).count()

1501

In [12]:
# 使用group by，改善数据倾斜
spark.sql(
    """
    select user_id,movie_id,rating from
      (select user_id,movie_id,rating,count(*) as cnt from
        (select * from tdl_table_1
        union all
        select * from tdl_table_2)t
       group by user_id,movie_id,rating
      ) 
    """).count()

1501

# 5、求取两个movie_id被相同的人看过的次数

In [13]:
# 这种方式在数据集很大时不可取，相当于笛卡尔积
spark.sql(
    """
    select if(a.movie_id<b.movie_id, concat(a.movie_id,',',b.movie_id) ,concat(b.movie_id,',',a.movie_id)) as pair,
    count(distinct a.user_id) as cnt from 
    
      tdl_table_total a
    join
      tdl_table_total b
      
    on a.movie_id != b.movie_id
    where a.user_id = b.user_id
    group by pair
    order by cnt desc
    """).show()

+-----+---+
| pair|cnt|
+-----+---+
|29,51| 17|
| 2,50| 15|
| 6,50| 15|
| 6,88| 15|
|22,51| 14|
| 6,94| 14|
|50,88| 14|
| 6,22| 14|
|50,85| 14|
|29,55| 14|
|15,29| 14|
| 6,63| 14|
| 6,82| 14|
|51,79| 14|
| 6,12| 14|
|22,66| 14|
| 2,14| 13|
|15,67| 13|
|29,85| 13|
|36,51| 13|
+-----+---+
only showing top 20 rows



In [14]:
# 考虑到每个user_id关联的movie_id数量会较少（不可能看过所有的电影）
# 以每个user_id来聚类，然后生成的movie_list中的电影两两关联度均为1
# 实际场景中，需根据数据情况先过滤掉user_id只关联一个movie_id的情况或做些其他分组以减轻计算压力
from itertools import combinations
from pyspark.sql.types import * 
def array_cross(x):
    r = list(combinations(x, 2))
    return ["%d,%d"%(i[0],i[1]) if i[0]<i[1] else "%d,%d"%(i[1],i[0]) for i in r]
   
spark.udf.register("spark_func_array_cross", array_cross, ArrayType(StringType()))
#spark.sql("select user_id,spark_func_array_cross(collect_set(movie_id)) as movie_pair from tdl_table_total group by user_id").printSchema()
spark.sql(
    """
    select pair,count(pair) as cnt from
      (select user_id,spark_func_array_cross(collect_set(movie_id)) as movie_pair from tdl_table_total group by user_id)t
    lateral view explode(movie_pair) tf as pair
    group by pair order by cnt desc
    """).show()

+-----+---+
| pair|cnt|
+-----+---+
|29,51| 17|
| 2,50| 15|
| 6,50| 15|
| 6,88| 15|
|50,85| 14|
|51,79| 14|
|15,29| 14|
| 6,94| 14|
|22,51| 14|
|50,88| 14|
| 6,63| 14|
|29,55| 14|
| 6,82| 14|
| 6,12| 14|
| 6,22| 14|
|22,66| 14|
|50,94| 13|
|12,50| 13|
|36,51| 13|
|15,67| 13|
+-----+---+
only showing top 20 rows

