In [1]:
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local") \
    .config('spark.sql.autoBroadcastJoinThreshold', 0) \
    .config('spark.sql.adaptive.enabled', 'false') \
    .getOrCreate()

In [2]:
import json
import pandas as pd
from pyspark.sql.window import Window

In [3]:
videos = spark.read.option('header', 'true').option("inferSchema", "true").csv('../datasets/USvideos.csv')

In [4]:
# берем только самые свежие данные по видео, основываясь на дате date
w = Window.partitionBy("video_id").orderBy(to_timestamp(substring(col("date"), 1, 5), 'dd.MM'))
videos_newest = videos \
    .withColumn('rn', row_number().over(w)) \
    .where(col('rn') == 1) \
    .drop('rn')

In [5]:
videos_newest.cache()
videos_newest.count()

2364

In [6]:
comments_schema = StructType([ \
    StructField("video_id", StringType(), True), \
    StructField("comment_text", StringType(), True), \
    StructField("likes", IntegerType(), True), \
    StructField("replies", IntegerType(), True)])
comments = spark.read.option('header', 'true') \
    .option("mode", "DROPMALFORMED") \
    .schema(comments_schema) \
    .csv('../datasets/UScomments.csv')

Если бы датасет comments был большого размера и возникла бы необходимость читать его с диска несколько раз, 
то эффективно было бы применить бакетирование 

In [7]:
comments_stat = comments.groupBy('video_id').agg(
    sum('likes').alias('comment_likes'), 
    sum('replies').alias('comment_replies'))

In [8]:
video_data = videos_newest.join(comments_stat, 'video_id')

video_data.cache()
video_data.count()

2266

## Добавление колонки score

In [9]:
def sum_func(likes: pd.Series, 
             dislikes: pd.Series,
             views: pd.Series,
             comment_likes: pd.Series,
             comment_replies: pd.Series,) -> pd.Series:
    return ((likes*100/(likes + dislikes + 1)) + 
            ((comment_replies + comment_likes)*100/(views + comment_likes + comment_replies)))/2

sum_func = pandas_udf(sum_func, returnType=LongType())

In [10]:
scored_videos = video_data\
    .withColumn("score",
                sum_func("likes", "dislikes", "views", "comment_likes", "comment_replies"))

In [11]:
scored_videos.cache()
scored_videos.count()

2266

In [12]:
scored_videos.select("video_id", "score").orderBy(desc("score")).show(10, False)

+-----------+-----+
|video_id   |score|
+-----------+-----+
|zAtHxJvSczA|77   |
|MRNqDbd2rdE|75   |
|50JL8DEeeS4|67   |
|gzh6tcMsyoQ|65   |
|uHYNSaW6Ttw|60   |
|J0sg_Au8zX8|58   |
|3QWQ4gN3j4E|58   |
|AYrTkoRr6hk|57   |
|6HXaooyXjds|55   |
|zyPIdeF4NFI|55   |
+-----------+-----+
only showing top 10 rows



## Медина score по категориям

In [13]:
with open("../datasets/US_category_id.json") as f:
    data = json.load(f)
    category_catalog = spark.createDataFrame(data['items'])

In [14]:
video_with_categories = scored_videos\
    .join(broadcast(category_catalog).selectExpr("id", "snippet.title as category"), 
          scored_videos.category_id == category_catalog.id)\
    .drop("id")

In [None]:
from numpy import median

@pandas_udf(DoubleType(), functionType=PandasUDFType.GROUPED_AGG)
def numpy_median(score: pd.Series) -> pd.Series:
    return median(score)

In [16]:
categories_score = video_with_categories.groupBy("category").agg(numpy_median("score"))
categories_score.show()

+--------------------+-------------------+
|            category|numpy_median(score)|
+--------------------+-------------------+
|               Shows|               47.0|
|           Education|               49.0|
|              Gaming|               48.0|
|       Entertainment|               48.0|
|     Travel & Events|               48.0|
|Science & Technology|               48.0|
|              Sports|               47.0|
|       Howto & Style|               49.0|
|Nonprofits & Acti...|               47.5|
|    Film & Animation|               48.0|
|      People & Blogs|               48.0|
|     News & Politics|               43.0|
|      Pets & Animals|               49.0|
|    Autos & Vehicles|               48.0|
|               Music|               49.0|
|              Comedy|               48.0|
+--------------------+-------------------+



## Самые популярные тэги

In [17]:
videos_newest.createOrReplaceTempView("videos_newest")

In [18]:
from pyspark.sql.column import Column, _to_java_column, _to_seq

sc = spark.sparkContext
def udfSplitTagsScalaWraper(tags_string):
    _udf = sc._jvm.CustomUDFs.splitTagsUDF()
    return Column(_udf.apply(_to_seq(sc, [tags_string], _to_java_column)))

In [19]:
%%timeit -r 5
videos_newest.select(udfSplitTagsScalaWraper(col('tags')).alias('tags_array')).count()

1.69 s ± 183 ms per loop (mean ± std. dev. of 5 runs, 1 loop each)


In [20]:
@pandas_udf(ArrayType(StringType()), functionType=PandasUDFType.SCALAR)
def pandas_split(tags_string: pd.Series) -> pd.Series:
    return tags_string.str.split('|')

In [21]:
%%timeit -r 5
videos_newest.select(pandas_split(col('tags')).alias('tags_array')).count()

1.48 s ± 156 ms per loop (mean ± std. dev. of 5 runs, 1 loop each)


In [22]:
popular_tags = videos_newest \
    .select("video_id", explode(pandas_split(col("tags"))).alias("tag"))\
    .distinct()\
    .groupBy("tag")\
    .count()

In [23]:
popular_tags.orderBy(desc("count")).show(10, False)

+------+-----+
|tag   |count|
+------+-----+
|funny |216  |
|comedy|163  |
|[none]|146  |
|humor |92   |
|2017  |92   |
|how to|84   |
|makeup|77   |
|vlog  |73   |
|music |73   |
|video |72   |
+------+-----+
only showing top 10 rows



## Топ-5 комментариев к видео про котов

в датафрейме comments есть популярные видео, на которые много комментариев

если мы будем делать join по ключу video_id, то получим перекос в партициях, 
что может привести к проблемам на экзельюторах, которые будут обрабатывать эти партиции

"посолим" датафрейм для создания более мелких партиций

In [24]:
comments_salted = comments.withColumn('salt', (rand() * 5).cast('int'))

videos_enriched = videos_newest \
    .select('video_id', 'tags', explode(array([lit(i) for i in range(5)])).alias('salt'))

df = comments_salted.join(videos_enriched, ['video_id', 'salt']).drop('salt')

In [25]:
df.select(pandas_split(col("tags")).alias("tags_array"),
            comments.comment_text,
            comments.likes)\
    .where(array_contains("tags_array", "cat")) \
    .groupBy("comment_text")\
    .agg(sum('likes').alias('likes_total')) \
    .orderBy(desc("likes_total")) \
    .show(5, False, True)

-RECORD 0--------------------------------------------------------------------------------------------------------------
 comment_text | talk about the ocean sunfish build                                                                     
 likes_total  | 3968                                                                                                   
-RECORD 1--------------------------------------------------------------------------------------------------------------
 comment_text | The second I read this title in my notification, I started to giggle.                                  
 likes_total  | 2355                                                                                                   
-RECORD 2--------------------------------------------------------------------------------------------------------------
 comment_text | Make sure to check back next Friday as we are launching our brand new animated HALLOWEEN special! 🐱🕷 
 likes_total  | 1329                      