In [1]:
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local") \
    .config('spark.sql.autoBroadcastJoinThreshold', 0) \
    .config('spark.sql.adaptive.enabled', 'false') \
    .getOrCreate()

# Данные

In [2]:
videos = spark.read.option('header', 'true').option("inferSchema", "true").csv('../datasets/USvideos.csv')
videos.show()

+-----------+--------------------+--------------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|   video_id|               title|       channel_title|category_id|                tags|  views| likes|dislikes|comment_total|      thumbnail_link| date|
+-----------+--------------------+--------------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|XpVt6Z1Gjjo|1 YEAR OF VLOGGIN...|    Logan Paul Vlogs|         24|logan paul vlog|l...|4394029|320053|    5931|        46245|https://i.ytimg.c...|13.09|
|K4wEI5zhHB0|iPhone X — Introd...|               Apple|         28|Apple|iPhone 10|i...|7860119|185853|   26679|            0|https://i.ytimg.c...|13.09|
|cLdxuaxaQwc|         My Response|           PewDiePie|         22|              [none]|5845909|576597|   39774|       170708|https://i.ytimg.c...|13.09|
|WYYvHb03Eog|Apple iPhone X fi...|           The Verge|         28|apple iph

In [3]:
videos.count()

7998

In [4]:
videos.filter(col('comment_total') == 0).count()

253

In [5]:
comments_schema = StructType([ \
    StructField("video_id", StringType(), True), \
    StructField("comment_text", StringType(), True), \
    StructField("likes", IntegerType(), True), \
    StructField("replies", IntegerType(), True)])
comments = spark.read.option('header', 'true').option("mode", "DROPMALFORMED").schema(comments_schema).csv('../datasets/UScomments.csv')
comments.show()

+-----------+--------------------+-----+-------+
|   video_id|        comment_text|likes|replies|
+-----------+--------------------+-----+-------+
|XpVt6Z1Gjjo|Logan Paul it's y...|    4|      0|
|XpVt6Z1Gjjo|I've been followi...|    3|      0|
|XpVt6Z1Gjjo|Say hi to Kong an...|    3|      0|
|XpVt6Z1Gjjo| MY FAN . attendance|    3|      0|
|XpVt6Z1Gjjo|         trending 😉|    3|      0|
|XpVt6Z1Gjjo|#1 on trending AY...|    3|      0|
|XpVt6Z1Gjjo|The end though 😭...|    4|      0|
|XpVt6Z1Gjjo|#1 trending!!!!!!!!!|    3|      0|
|XpVt6Z1Gjjo|Happy one year vl...|    3|      0|
|XpVt6Z1Gjjo|You and your shit...|    0|      0|
|XpVt6Z1Gjjo|There should be a...|    0|      0|
|XpVt6Z1Gjjo|Dear Logan, I rea...|    0|      0|
|XpVt6Z1Gjjo|Honestly Evan is ...|    0|      0|
|XpVt6Z1Gjjo|Casey is still be...|    0|      0|
|XpVt6Z1Gjjo|aw geez rick this...|    0|      0|
|XpVt6Z1Gjjo|He happy cause he...|    0|      0|
|XpVt6Z1Gjjo|Ayyyyoooo Logang ...|    1|      0|
|XpVt6Z1Gjjo|Bro y did

In [6]:
comments.count()

691722

In [7]:
# считаем файл, так как маленький то в пандас
import json
import pandas as pd

with open('../datasets/US_category_id.json') as json_data:
    category_json = json.load(json_data)

category = spark.createDataFrame(
    [ (int(c['id']), c['snippet']['title']) for c in category_json['items']],
    "id int, title string"
)

In [8]:
category.show(2)

+---+----------------+
| id|           title|
+---+----------------+
|  1|Film & Animation|
|  2|Autos & Vehicles|
+---+----------------+
only showing top 2 rows



# scored_videos. Посчитаем метрику, чисто потренироваться в группировках

1) расчет вовлеченности:
из comments:  comment_repl = sum(comment_likes) + count(comment_to_video) + sum(replies)

reactions = likes + dislikes + comment_repl

3) важность видео в категории:
engagement = 100 * views/sum(views)

4) score = engagement*reactions


Так как comments имеет очень разное количество по отношению к video_id, то буду использовать salt для расчета агрегатов при группировке
Мне кажется, что фильтр Блума тут не даст выигрыш, так как нет комментов у 253 видео (это 3%), а для броадкаста первый датафрейм считаем слишком большим.

## Расчет comment_repl

In [9]:
salt_count = 2
salted_comments = comments.withColumn('salt', (rand() * salt_count).cast('int'))
salted_comments.show()

+-----------+--------------------+-----+-------+----+
|   video_id|        comment_text|likes|replies|salt|
+-----------+--------------------+-----+-------+----+
|XpVt6Z1Gjjo|Logan Paul it's y...|    4|      0|   0|
|XpVt6Z1Gjjo|I've been followi...|    3|      0|   0|
|XpVt6Z1Gjjo|Say hi to Kong an...|    3|      0|   1|
|XpVt6Z1Gjjo| MY FAN . attendance|    3|      0|   0|
|XpVt6Z1Gjjo|         trending 😉|    3|      0|   0|
|XpVt6Z1Gjjo|#1 on trending AY...|    3|      0|   1|
|XpVt6Z1Gjjo|The end though 😭...|    4|      0|   1|
|XpVt6Z1Gjjo|#1 trending!!!!!!!!!|    3|      0|   1|
|XpVt6Z1Gjjo|Happy one year vl...|    3|      0|   1|
|XpVt6Z1Gjjo|You and your shit...|    0|      0|   0|
|XpVt6Z1Gjjo|There should be a...|    0|      0|   1|
|XpVt6Z1Gjjo|Dear Logan, I rea...|    0|      0|   0|
|XpVt6Z1Gjjo|Honestly Evan is ...|    0|      0|   1|
|XpVt6Z1Gjjo|Casey is still be...|    0|      0|   0|
|XpVt6Z1Gjjo|aw geez rick this...|    0|      0|   1|
|XpVt6Z1Gjjo|He happy cause he

In [10]:
import pyspark.sql.functions as F


comments_grouped = salted_comments.groupBy( "salt", "video_id") \
    .agg(F.sum("likes").alias("sum_likes"), \
         F.sum("replies").alias("sum_replies"), \
         F.count("likes").alias("counts")) \
# тут можно было бы сделать еще группировку, но я хочу посчитать еще engagement, поэтому video тоже буду солить

comments_grouped = comments_grouped.withColumn('comment_repl', F.col('sum_likes') + F.col('sum_replies') + F.col('counts'))

In [11]:
comments_grouped.show(2)

+----+-----------+---------+-----------+------+------------+
|salt|   video_id|sum_likes|sum_replies|counts|comment_repl|
+----+-----------+---------+-----------+------+------------+
|   1|S6eJbe5GyDo|      135|         39|   106|         280|
|   0|gQvHtXWlXDE|      500|         41|   104|         645|
+----+-----------+---------+-----------+------+------------+
only showing top 2 rows



In [12]:
videos_salt = videos.withColumn('salt', explode(array([lit(i) for i in list(range(salt_count))])))

In [13]:
videos_comment_repl = videos_salt.join(comments_grouped.select('video_id', 'salt', 'comment_repl') , on=['video_id', 'salt'], how='left')

In [14]:
videos_comment_repl = videos_comment_repl.fillna(value=0, subset = 'comment_repl')

In [15]:
videos_comment_repl = videos_comment_repl.withColumn('reactions', col('likes') + col('dislikes') + col('comment_repl')).drop('likes', 'dislikes', 'comment_repl')

In [16]:
videos_comment_repl.count()

15996

In [17]:
engagement_group = videos_comment_repl.groupBy( "salt", "category_id").agg(F.sum("views").alias("sum_views"))

In [18]:
# видно, что категория 43 очень мелкая, по сравнению с остальными
engagement_group.show()

+----+-----------+----------+
|salt|category_id| sum_views|
+----+-----------+----------+
|   0|         25| 337817005|
|   1|         25| 337817005|
|   0|         20|  55848709|
|   1|         20|  55848709|
|   1|          2|  70492489|
|   1|         22| 856891892|
|   1|         43|     16985|
|   1|          1| 392920664|
|   0|         22| 856891892|
|   0|         10|1475992648|
|   1|         26| 467360108|
|   0|         27| 182892545|
|   0|         28| 473462041|
|   0|         17| 298658073|
|   1|         19|  22273972|
|   1|         23| 936255337|
|   1|         27| 182892545|
|   0|         15|  75562909|
|   0|         24|1848944372|
|   0|          1| 392920664|
+----+-----------+----------+
only showing top 20 rows



In [19]:
# так как маленький, то broadcast, только работает ли он с left
videos_comment_engagement = videos_comment_repl.join(broadcast(engagement_group),  on=['category_id', 'salt'], how='left')

In [20]:
videos_comment_engagement.columns

['category_id',
 'salt',
 'video_id',
 'title',
 'channel_title',
 'tags',
 'views',
 'comment_total',
 'thumbnail_link',
 'date',
 'reactions',
 'sum_views']

In [21]:
# избавляемся от соли. 
videos_with_score = (
    videos_comment_engagement
        .groupBy( "video_id")
        .agg(F.max("views").alias("views"),
             F.sum("reactions").alias("reactions"),
             F.sum("sum_views").alias("sum_views_category"),
            )
)
videos_with_score.show()

+-----------+-------+---------+------------------+
|   video_id|  views|reactions|sum_views_category|
+-----------+-------+---------+------------------+
|4yCkkOvIkUI|   5662|      381|       11093666232|
|K7pQsR8WFSo| 633374|   225466|        1463140360|
|g_ekn1gjBq0|   9534|     1024|        1869440432|
|TzyraAp3jaY|  84257|    10992|        1463140360|
|xPS7bqBePSs|  35945|    15533|        8855955888|
|xNddRhpx5tA| 969749|   225426|        3929206640|
|Bo-qp-Zu0OY|  21654|      461|          44547944|
|tUXLO8Dtvq4| 445638|    29748|         731570180|
|_r5eTelhpmQ|  51914|    21611|       14759926480|
|aRgTLb5EbiQ|  22165|    23336|        3738880864|
|JkqTeQHFoBY| 138900|    13424|        3929206640|
|rn5Xgak1zzA| 283819|    83442|        3427567568|
|7TN09IP5JuI|5066207|  1758564|        4673601080|
|dInwVhRtN4E|2629650|   826663|        5500889296|
|WQjO1mMCPg4|2318203|   677816|        9362553370|
|RE-far-FvRs| 323280|   132044|        8568918920|
|eHq6ZA6uKOg|  50747|     6378|

In [22]:
videos_with_score = videos_with_score.withColumn('scored_videos', col('reactions') * col('views')/(col('sum_views_category')+1))

In [23]:
scored_videos = videos.join(videos_with_score.select('video_id', 'scored_videos'), on='video_id', how='inner')

In [24]:
scored_videos.show(2)

+-----------+--------------------+-------------+-----------+------+-----+-----+--------+-------------+--------------------+-----+--------------------+
|   video_id|               title|channel_title|category_id|  tags|views|likes|dislikes|comment_total|      thumbnail_link| date|       scored_videos|
+-----------+--------------------+-------------+-----------+------+-----+-----+--------+-------------+--------------------+-----+--------------------+
|4yCkkOvIkUI|EXCLUSIVE: Zonniq...|     YBF Chic|         24|[none]| 2306|    7|       1|            0|https://i.ytimg.c...|04.10|1.944552823829308...|
|4yCkkOvIkUI|EXCLUSIVE: Zonniq...|     YBF Chic|         24|[none]| 4937|   19|      21|           12|https://i.ytimg.c...|05.10|1.944552823829308...|
+-----------+--------------------+-------------+-----------+------+-----+-----+--------+-------------+--------------------+-----+--------------------+
only showing top 2 rows



In [25]:
scored_videos.count()

7998

# categories_score
датасет по категориям, в котором присутствуют следующие поля: Название категории (не id, он непонятный для аналитиков!). Медиана показателя score из датасета scored_videos по каждой категории.

In [26]:
# расчет медианы
@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def pd_median(pdf):
    return pdf.median()



In [27]:
median_score = scored_videos.select('category_id', 'scored_videos').groupby("category_id").agg(pd_median(col('scored_videos')).alias('score_median'))

In [28]:
median_score = scored_videos.select('category_id', 'scored_videos').groupby("category_id").agg(pd_median(col('scored_videos')).alias('score_median'))

так как таблица с category маленькая, то сдалаем бродкаст

In [29]:
categories_score = median_score.join(broadcast(category.withColumnRenamed('id', 'category_id')), 'category_id').drop('category_id')

In [30]:
categories_score.show()

+------------------+--------------------+
|      score_median|               title|
+------------------+--------------------+
|10.865004454315752|Science & Technology|
| 8.836079961055406|       Howto & Style|
|13.710283821498654|           Education|
| 4.379790432612726|      People & Blogs|
| 9.154454749075859|    Film & Animation|
|19.979364803259166|              Gaming|
|118.57770709587794|     Travel & Events|
|  39.6904969663218|      Pets & Animals|
| 105.3166718181952|               Shows|
|1.6719997335631602|              Sports|
| 36.93927473818007|              Comedy|
|2.2245003633497435|               Music|
|1.2663582742774129|     News & Politics|
|2.8589205865131477|       Entertainment|
| 4.450700413833994|Nonprofits & Acti...|
|  5.55108388678234|    Autos & Vehicles|
+------------------+--------------------+



# popular_tags - датасет по самым популярным тэгам 
(название тэга + количество видео с этим тэгом). В исходном датасете тэги лежат строкой в поле tags. Другие разработчики уже сталкивались с подобной задачей, поэтому написали Scala-функцию для разбиения тегов (код функции - в разделе Примечания). Но не доверяйте им вслепую! Обязательно напишите свою UDF-функцию разбиения строки на тэги и сравните время работы с её Scala-версией. Можно замерять своими силами, а можно воспользоваться библиотекой timeit. Стандартные функции Spark из пакета pyspark.sq.functions использовать нельзя, нужно написать свою функцию.

In [31]:
videos.show(1)

+-----------+--------------------+----------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|   video_id|               title|   channel_title|category_id|                tags|  views| likes|dislikes|comment_total|      thumbnail_link| date|
+-----------+--------------------+----------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|XpVt6Z1Gjjo|1 YEAR OF VLOGGIN...|Logan Paul Vlogs|         24|logan paul vlog|l...|4394029|320053|    5931|        46245|https://i.ytimg.c...|13.09|
+-----------+--------------------+----------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
only showing top 1 row



In [32]:
from pyspark.sql.column import Column
from pyspark.sql.column import _to_java_column
from pyspark.sql.column import _to_seq
from pyspark.sql import SparkSession
import timeit

#sc = spark.sparkContext
def splitTags(tagsString):   
    sc = spark.sparkContext
    _ipToIntUDF = sc._jvm.CustomUDFs.splitTagsUDF()
    return Column(_ipToIntUDF.apply(_to_seq(sc, [tagsString], _to_java_column)))


In [33]:
@pandas_udf('array<string>', PandasUDFType.SCALAR) 
def pd_splitTags(tagsString): 
    return tagsString.str.split('|') 


In [34]:
def scala_split():
    videos.withColumn("split_tags", splitTags(col("tags")))

def pandas_split():
    videos.withColumn("split_tags", pd_splitTags(col("tags")))


In [35]:
# создала чисто для проверки
temp = (
    videos
    .select(splitTags(col("tags")).alias("scala_split_tags"), pd_splitTags(col("tags")).alias("pandas_split_tags"))
)

In [36]:
from chispa.column_comparer import assert_column_equality

# тут ошибок нет, значит все ок
assert_column_equality(temp, "scala_split_tags", "scala_split_tags")

In [37]:
print(f'Scala: {timeit.timeit(stmt=scala_split, number=1000)}')
print(f'pandas_udf: {timeit.timeit(stmt=pandas_split, number=1000)}')

Scala: 7.90579439800058
pandas_udf: 4.358572983997874


Получен странный результат - pandas_udf быстрее скалы

In [38]:
v_tags = videos.select('video_id', explode(splitTags(col("tags"))).alias('tag'))

In [39]:
popular_tags = v_tags.groupBy('tag').agg(F.count("video_id").alias("count_video_id"))

In [40]:
popular_tags.sort(desc("count_video_id")).show(5)

+------+--------------+
|   tag|count_video_id|
+------+--------------+
| funny|           722|
|comedy|           572|
|[none]|           491|
|  2017|           309|
|how to|           284|
+------+--------------+
only showing top 5 rows



# Просьба от Марка: 
он любит котов (а кто не их не любит!) и хочет найти самые интересные комментарии (топ-5) к видео про котов. “Видео про котов” - видео, у которого есть тэг “cat”.

In [41]:
video_cats = videos.withColumn("split_tags", splitTags(col("tags"))).filter(array_contains(col('split_tags'), "cat"))

In [42]:
video_cats.show()

+-----------+------------------------------------+-------------+-----------+--------------------+------+-----+--------+-------------+--------------------+-----+--------------------+
|   video_id|                               title|channel_title|category_id|                tags| views|likes|dislikes|comment_total|      thumbnail_link| date|          split_tags|
+-----------+------------------------------------+-------------+-----------+--------------------+------+-----+--------+-------------+--------------------+-----+--------------------+
|Vjc459T6wX8|更なるフィット感を追求するねこ。-...|     mugumogu|         15|Maru|cat|kitty|pe...| 43199| 3263|       7|          294|https://i.ytimg.c...|13.09|[Maru, cat, kitty...|
|0Yhaei1S5oQ|                Japan's Ominous D...|      SciShow|         27|SciShow|science|H...|295156|10918|     149|          781|https://i.ytimg.c...|13.09|[SciShow, science...|
|-1fzGnFwz9M|                9 Things You Need...|  Simon's Cat|         15|cartoon|simons ca...|189414| 7

In [43]:
video_cats.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_total: integer (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- date: string (nullable = true)
 |-- split_tags: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [44]:
# get top-3
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col

#совсем маленький датафрейм на 3 значения
cats_most_liked = video_cats.groupBy('video_id').agg(F.sum('likes').alias('sum_likes')).orderBy(col('sum_likes').desc()).limit(4).select('video_id')


In [45]:
cats_comment = comments.join(broadcast(cats_most_liked), 'video_id')

In [46]:
# интересные длинные
cats_comment = cats_comment.filter(length('comment_text') > 30)

In [47]:
# интересные цитируемые
cats_comment.orderBy((col('likes')+col('replies')).desc()).distinct().select('comment_text').show(5, False)

+---------------------------------------------------------------------------------------+
|comment_text                                                                           |
+---------------------------------------------------------------------------------------+
|This is what California needs right now                                                |
|This is just like my cats is in hell that show 😂😂😂                                  |
|What the “Maned Wolf” class. Are the long legs an ability or some kind of model glitch.|
|lol Gus is like MOTHER U BETRAYED ME!                                                  |
|A rare soul? A life well done? Someone call a medium!                                  |
+---------------------------------------------------------------------------------------+
only showing top 5 rows



**Вопросы, которые возникли**:

- https://stackoverflow.com/questions/58110207/spark-how-does-salting-work-in-dealing-with-skewed-data тут есть картинка, в которой написован план. Правильно ли я понимаю, что repartion явно перед группировкой указывать не нужно, спарк сделает это автоматически?
- есть ли какая-то эвристическая формула для расчета соли?
- Следовало ли делать кэширование? Я думаю, что если бы задача звучала, как вывести отсортированные по скору топ-100 значений, то выгодно было бы закэшировать в видео video_id, category_id и используемые views| likes|dislikes|comment_total|. Посчитать на них score, найти топ, а потом еще раз считать весь video, но уже с фильтром по top-score. 

## Расчет engagement

In [None]:
# engagement = 100 * views/sum(views)
category_grouped = 