In [42]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType, IntegerType, DoubleType
import pandas as pd

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession

from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, sum

from pyspark.sql.column import Column
from pyspark.sql.column import _to_java_column
from pyspark.sql.column import _to_seq

import timeit

In [1]:



spark = SparkSession.builder.master("local") \
    .config('spark.sql.autoBroadcastJoinThreshold', 0) \
    .config('spark.sql.adaptive.enabled', 'false') \
    .getOrCreate()

In [2]:
videos = spark.read.option('header', 'true').option("inferSchema", "true").csv('../datasets/USvideos.csv')
videos.show()

+-----------+--------------------+--------------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|   video_id|               title|       channel_title|category_id|                tags|  views| likes|dislikes|comment_total|      thumbnail_link| date|
+-----------+--------------------+--------------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|XpVt6Z1Gjjo|1 YEAR OF VLOGGIN...|    Logan Paul Vlogs|         24|logan paul vlog|l...|4394029|320053|    5931|        46245|https://i.ytimg.c...|13.09|
|K4wEI5zhHB0|iPhone X — Introd...|               Apple|         28|Apple|iPhone 10|i...|7860119|185853|   26679|            0|https://i.ytimg.c...|13.09|
|cLdxuaxaQwc|         My Response|           PewDiePie|         22|              [none]|5845909|576597|   39774|       170708|https://i.ytimg.c...|13.09|
|WYYvHb03Eog|Apple iPhone X fi...|           The Verge|         28|apple iph

In [3]:
comments_schema = StructType([ \
    StructField("video_id", StringType(), True), \
    StructField("comment_text", StringType(), True), \
    StructField("likes", IntegerType(), True), \
    StructField("replies", IntegerType(), True)])
comments = spark.read.option('header', 'true').option("mode", "DROPMALFORMED").schema(comments_schema).csv('../datasets/UScomments.csv')
comments.show()

+-----------+--------------------+-----+-------+
|   video_id|        comment_text|likes|replies|
+-----------+--------------------+-----+-------+
|XpVt6Z1Gjjo|Logan Paul it's y...|    4|      0|
|XpVt6Z1Gjjo|I've been followi...|    3|      0|
|XpVt6Z1Gjjo|Say hi to Kong an...|    3|      0|
|XpVt6Z1Gjjo| MY FAN . attendance|    3|      0|
|XpVt6Z1Gjjo|         trending 😉|    3|      0|
|XpVt6Z1Gjjo|#1 on trending AY...|    3|      0|
|XpVt6Z1Gjjo|The end though 😭...|    4|      0|
|XpVt6Z1Gjjo|#1 trending!!!!!!!!!|    3|      0|
|XpVt6Z1Gjjo|Happy one year vl...|    3|      0|
|XpVt6Z1Gjjo|You and your shit...|    0|      0|
|XpVt6Z1Gjjo|There should be a...|    0|      0|
|XpVt6Z1Gjjo|Dear Logan, I rea...|    0|      0|
|XpVt6Z1Gjjo|Honestly Evan is ...|    0|      0|
|XpVt6Z1Gjjo|Casey is still be...|    0|      0|
|XpVt6Z1Gjjo|aw geez rick this...|    0|      0|
|XpVt6Z1Gjjo|He happy cause he...|    0|      0|
|XpVt6Z1Gjjo|Ayyyyoooo Logang ...|    1|      0|
|XpVt6Z1Gjjo|Bro y did

In [4]:
%%time
videos.write \
    .bucketBy(16, 'video_id') \
    .saveAsTable('videos_bucketed4', format='csv', mode='overwrite')
    
comments.write \
    .bucketBy(16, 'video_id') \
    .saveAsTable('comments_bucketed4', format='csv', mode='overwrite')


CPU times: user 8.42 ms, sys: 8.38 ms, total: 16.8 ms
Wall time: 10.7 s


In [None]:
%%time
videos \
    .join(comments, videos['video_id'] == comments['video_id']) \
    .count()

In [5]:
%%time
videos_bucketed = spark.table('videos_bucketed4')
comments_bucketed = spark.table('comments_bucketed4')

CPU times: user 2.95 ms, sys: 0 ns, total: 2.95 ms
Wall time: 109 ms


In [6]:
%%time
comments_agg = comments_bucketed.select(col('video_id'),col('likes')) \
    .groupBy(col('video_id')) \
    .agg( sum(col('likes')).alias('likes_cnt'), count(col('video_id')).alias('comments_cnt'))
comments_agg.show()

+-----------+---------+------------+
|   video_id|likes_cnt|comments_cnt|
+-----------+---------+------------+
|zgLtEob6X-Q|       48|         700|
|B7YaMkCl3XA|       66|         299|
|6vGg-jJl30A|       13|         200|
|bp6uJJJMaLs|      645|         200|
|Pp19TkIU_fw|     2339|         173|
|u6iVspBWzZU|       15|         200|
|wGQtrwey-TI|      837|         200|
|ykvX-E1nuag|        2|         200|
|AR4UgRJOUQY|       28|         100|
|Zy6vBxqlapw|       58|         100|
|Lv5DFKceFac|       37|         100|
|9YyB6sQ4iwA|      369|         300|
|IYvEhgYy35I|      171|         600|
|JZDM1bLn7sM|       10|         200|
|tBN9kLaS-uw|       73|         700|
|bvim4rsNHkQ|       78|         700|
|zKriLekFPwg|      463|         700|
|4F2KWDQQMhY|       63|         500|
|z5eG8fD-hQw|       23|         598|
|FfRGxN2zeWU|      474|         500|
+-----------+---------+------------+
only showing top 20 rows

CPU times: user 12.1 ms, sys: 2.93 ms, total: 15.1 ms
Wall time: 915 ms


In [None]:
# Для оптимизации выбрано бакетирование, так как почти все записи из USvideos.csv присутствуют в аггрегированном датасете по комментариям comments_agg на основе  UScomments.csv (7728 из 7998), а значит фильтр Блума не логично применять, 
# так как он больше ориентирован на оптимизацию при запросе несуществующих значений из множества, а у нас слишком маленький процент таких случаев.
# "Солить" не имеет смысла, так как соединение будет практически 1 к 1, т.е. без значительного перекоса в партициях, когда такая оптимизация имеет смысл.
# Broadcast по условиям задачи нельзя применять, поэтому остается бакетирование.

In [24]:
@pandas_udf(DoubleType())
def score_calculation( views: pd.Series , likes: pd.Series , dislikes: pd.Series , likes_cnt: pd.Series , comments_cnt: pd.Series ) ->  pd.Series :
    return (views + 2 * (likes - dislikes) + 3 * likes_cnt + 4 * comments_cnt) 

In [26]:
%%time        
scored_videos = videos_bucketed \
    .join(comments_agg, videos_bucketed['video_id'] == comments_agg['video_id'],'left') \
    .withColumn('score',round(score_calculation("views", "likes", "dislikes", "likes_cnt", "comments_cnt"), 0).cast(IntegerType())) \
    .select(videos_bucketed['video_id'],'title','channel_title','category_id','tags','views','likes','dislikes','score') \

scored_videos.show()    

+-----------+--------------------+------------------+-----------+--------------------+-------+-----+--------+-------+
|   video_id|               title|     channel_title|category_id|                tags|  views|likes|dislikes|  score|
+-----------+--------------------+------------------+-----------+--------------------+-------+-----+--------+-------+
|6vGg-jJl30A|THIS MADE MY DAD ...|       Nile Wilson|         17|nile wilson|nile ...| 185541|12179|     115| 210508|
|6vGg-jJl30A|THIS MADE MY DAD ...|       Nile Wilson|         17|nile wilson|nile ...| 214484|12908|     125| 240889|
|9YyB6sQ4iwA|iPhone X and iPho...|          iJustine|         28|ijustine|iphone x...| 761705|32608|    1562| 826104|
|AR4UgRJOUQY|What Does Your Se...|       AsapSCIENCE|         28|Search History|De...|1244953|29748|    2639|1299655|
|B7YaMkCl3XA|Hurricane Irma de...|Al Jazeera English|         25|5573051142001|ame...| 382525| 1521|     270| 386421|
|B7YaMkCl3XA|Hurricane Irma de...|Al Jazeera English|   

In [None]:
scored_videos.select('category_id').distinct().count()

In [None]:
# Задание 2.

In [8]:
categories = (spark.read.option("multiline","true") 
    .json("../datasets/US_category_id.json") 
    .select(explode("items"))
    .select(col("col.id").alias("id"),col("col.snippet.title").alias("category")))
categories.show()

In [40]:
scored_videos.groupBy('category_id').count().show()

+-----------+-----+
|category_id|count|
+-----------+-----+
|         28|  512|
|         27|  334|
|         26|  870|
|         22|  882|
|          1|  378|
|         20|   82|
|         19|   48|
|         15|  116|
|         43|    2|
|         17|  410|
|         23|  755|
|         10| 1252|
|         25|  626|
|         24| 1601|
|         29|   14|
|          2|  116|
+-----------+-----+



In [None]:
# Большой перекос в кол-ве строк при соединении scored_videos по category_id с category, чтобы не допустить перекоса в партициях, будем "подсаливать" оба датафрейма.

In [31]:
categories_salt= categories \
            .select(col('id'), col('category'), explode(array([lit(i) for i in range(10)])).alias('salt'))
categories_salt.show()

+---+----------------+----+
| id|        category|salt|
+---+----------------+----+
|  1|Film & Animation|   0|
|  1|Film & Animation|   1|
|  1|Film & Animation|   2|
|  1|Film & Animation|   3|
|  1|Film & Animation|   4|
|  1|Film & Animation|   5|
|  1|Film & Animation|   6|
|  1|Film & Animation|   7|
|  1|Film & Animation|   8|
|  1|Film & Animation|   9|
|  2|Autos & Vehicles|   0|
|  2|Autos & Vehicles|   1|
|  2|Autos & Vehicles|   2|
|  2|Autos & Vehicles|   3|
|  2|Autos & Vehicles|   4|
|  2|Autos & Vehicles|   5|
|  2|Autos & Vehicles|   6|
|  2|Autos & Vehicles|   7|
|  2|Autos & Vehicles|   8|
|  2|Autos & Vehicles|   9|
+---+----------------+----+
only showing top 20 rows



In [32]:
scored_videos_salt =  scored_videos.withColumn('salt', round((rand() * 10),0).cast(IntegerType()))
scored_videos_salt.show()

+-----------+--------------------+--------------------+-----------+--------------------+-------+-----+--------+-------+----+
|   video_id|               title|       channel_title|category_id|                tags|  views|likes|dislikes|  score|salt|
+-----------+--------------------+--------------------+-----------+--------------------+-------+-----+--------+-------+----+
|--JinobXWPk|DANGEROUS Jungle ...|    Brave Wilderness|         15|adventure|adventu...|1319945|38949|     533|1397234|   2|
|-LoSw4o2zDQ|How to Make Pushe...|    kawaiisweetworld|         26|recipe|recipes|ho...|  47204| 3942|      24|  57941|   7|
|-LoSw4o2zDQ|How to Make Pushe...|    kawaiisweetworld|         26|recipe|recipes|ho...|  56094| 4430|      27|  67801|   7|
|-LoSw4o2zDQ|How to Make Pushe...|    kawaiisweetworld|         26|recipe|recipes|ho...|  62600| 4715|      29|  74873|   6|
|0lDRz8qmXpE|Standing Up For Y...|        CollegeHumor|         23|Collegehumor|CH o...| 261158|17223|    1506| 294098|   3|


In [37]:
def median_udf(df: pd.DataFrame) -> pd.DataFrame:
    return pd.DataFrame(df.groupby(["category"])["score"].median()).reset_index() 
    

In [39]:
%%time
categories_score = scored_videos_salt \
                .join(categories_salt,(scored_videos_salt['category_id'] == categories_salt['id']) & (scored_videos_salt['salt'] == categories_salt['salt'] ) ) \
                .drop("salt") \
                .groupBy('category') \
                .applyInPandas(median_udf, "category string, score int")                

categories_score.show()

+--------------------+------+
|            category| score|
+--------------------+------+
|               Shows|  9193|
|           Education|265824|
|              Gaming|285004|
|       Entertainment|422233|
|     Travel & Events|264704|
|Science & Technology|422791|
|              Sports|177493|
|       Howto & Style|304694|
|Nonprofits & Acti...| 59973|
|    Film & Animation|484151|
|      People & Blogs|304870|
|     News & Politics|188508|
|      Pets & Animals|268432|
|    Autos & Vehicles|239039|
|               Music|259075|
|              Comedy|994015|
+--------------------+------+

CPU times: user 90 ms, sys: 0 ns, total: 90 ms
Wall time: 14 s


In [None]:
# Задание 3.

In [43]:
sc = spark.sparkContext
n = 20

def udfSplitTagsUDFScalaWrapper(ipString):
    _splitTagsUDF = sc._jvm.CustomUDFs.splitTagsUDF()
    return Column(_splitTagsUDF.apply(_to_seq(sc, [ipString], _to_java_column)))

split_tags = videos.select("tags",udfSplitTagsUDFScalaWrapper(col("tags")).alias("split_tags_scala"))

timeit.timeit('split_tags.count()', number=n, globals=globals())/n

0.11648250304933754

In [44]:
@pandas_udf(ArrayType(StringType()))
def split_string(column: pd.Series) -> pd.Series:
    return column.str.split("|")


split_tags = videos.select(split_string("tags").alias("tags"))

timeit.timeit('split_tags.count()', number=n, globals=globals())/n

0.13988081095012603

In [None]:
# Функция на питоне проигрывает аналогичной на Скале, но разница небольшая на данном объеме. 

In [53]:
popular_tags = (split_tags
 .select(explode("tags").alias("tag"))
 .where("tag != '[none]'")
 .groupBy("tag").count()
 .sort(col("count").desc())
 )

popular_tags.show()

+---------+-----+
|      tag|count|
+---------+-----+
|    funny|  722|
|   comedy|  572|
|     2017|  309|
|   how to|  284|
|     vlog|  273|
|    humor|  258|
|   makeup|  254|
|    music|  250|
| tutorial|  235|
|     food|  224|
|    video|  219|
|   review|  218|
|celebrity|  211|
|     news|  211|
|   beauty|  210|
|interview|  209|
|  science|  197|
|      Pop|  190|
|  trailer|  180|
|halloween|  161|
+---------+-----+
only showing top 20 rows



In [None]:
# Задание 4.

In [47]:
!pip install --no-cache-dir mmh3 bitarray

Collecting mmh3
  Downloading mmh3-4.0.0-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (68 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m68.3/68.3 kB[0m [31m1.3 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting bitarray
  Downloading bitarray-2.7.6-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (282 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m282.5/282.5 kB[0m [31m7.4 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: mmh3, bitarray
Successfully installed bitarray-2.7.6 mmh3-4.0.0


In [48]:
import math
import mmh3
from bitarray import bitarray
  
  
class BloomFilter(object):
  
    '''
    Class for Bloom filter, using murmur3 hash function
    '''
  
    def __init__(self, items_count, fp_prob):
        '''
        items_count : int
            Number of items expected to be stored in bloom filter
        fp_prob : float
            False Positive probability in decimal
        '''
        self.items_count = items_count
        
        # False possible probability in decimal
        self.fp_prob = fp_prob
  
        # Size of bit array to use
        self.size = self.get_size(items_count, fp_prob)
  
        # number of hash functions to use
        self.hash_count = self.get_hash_count(self.size, items_count)
  
        # Bit array of given size
        self.bit_array = bitarray(self.size)
  
        # initialize all bits as 0
        self.bit_array.setall(0)
  
    def add(self, item):
        '''
        Add an item in the filter
        '''
        digests = []
        for i in range(self.hash_count):
  
            # create digest for given item.
            # i work as seed to mmh3.hash() function
            # With different seed, digest created is different
            digest = mmh3.hash(item, i) % self.size
            digests.append(digest)
  
            # set the bit True in bit_array
            self.bit_array[digest] = True
        
    def union(self, other):
        """ Calculates the union of the two underlying bitarrays and returns
        a new bloom filter object."""
        new_bloom = self.copy()
        new_bloom.bit_array = new_bloom.bit_array | other.bit_array
        return new_bloom
  
    def check(self, item):
        '''
        Check for existence of an item in filter
        '''
        for i in range(self.hash_count):
            digest = mmh3.hash(item, i) % self.size
            if self.bit_array[digest] == False:
  
                # if any of bit is False then,its not present
                # in filter
                # else there is probability that it exist
                return False
        return True
    
    def copy(self):
        """Return a copy of this bloom filter.
        """
        new_filter = BloomFilter(self.items_count, self.fp_prob)
        new_filter.bit_array = self.bit_array.copy()
        return new_filter
    
    def set_bit_array(self, bit_array):
        self.bit_array = bit_array
  
    @classmethod
    def get_size(self, n, p):
        '''
        Return the size of bit array(m) to used using
        following formula
        m = -(n * lg(p)) / (lg(2)^2)
        n : int
            number of items expected to be stored in filter
        p : float
            False Positive probability in decimal
        '''
        m = -(n * math.log(p))/(math.log(2)**2)
        return int(m)
  
    @classmethod
    def get_hash_count(self, m, n):
        '''
        Return the hash function(k) to be used using
        following formula
        k = (m/n) * lg(2)
  
        m : int
            size of bit array
        n : int
            number of items expected to be stored in filter
        '''
        k = (m/n) * math.log(2)
        return int(k)

In [49]:
videos_cats = (videos
 .select("video_id",split_string("tags").alias("tags"))
 .select("video_id",explode("tags").alias("tag"))
 .where("tag = 'cat'")
)

videos_cats.count()

48

In [50]:
# В данном случае для соединения лучше подойдет фильтр Блума, так как video_id после фильтрации по тегу cats становится высоко селективной колонкой и при поиске нужного video_id 
# будут преобладать неуспешные попытки.
# Создаем фильтр

filterSize = 31771
prob = 0.05

def fill_bloom_filter(bf, items):
    for i in items:
        bf.add(str(i[0]))
    return bf

bloom_filter = BloomFilter(filterSize, prob)

general_bit_array = videos_cats.select(col('video_id')).rdd \
    .mapPartitions(lambda p: [fill_bloom_filter(BloomFilter(filterSize, prob), p).bit_array]) \
    .reduce(lambda a, b: a.bit_array | b.bit_array)

bloom_filter.set_bit_array(general_bit_array)

maybe_in_bf = udf(lambda video_id: bloom_filter.check(str(video_id)))


In [None]:
top_comments_cat = (comments
    .filter(maybe_in_bf(col('video_id')) == True)
    .join(videos_cats, videos_cats['video_id'] == comments['video_id'])
    .select(comments["comment_text"],comments["likes"])
    .groupBy("comment_text").count()
    .sort(col("count").desc())
)


In [52]:
top_comments_cat.show(5)

+--------------------+-----+
|        comment_text|count|
+--------------------+-----+
|SIMON...PLEASE PL...|   25|
|During the winter...|   25|
|Ah! My cat loves ...|   25|
|My 1 year old bla...|   25|
|No need to buy to...|   25|
+--------------------+-----+
only showing top 5 rows

