In [26]:
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local") \
    .config('spark.sql.autoBroadcastJoinThreshold', 0) \
    .config('spark.sql.adaptive.enabled', 'false') \
    .getOrCreate()

spark

In [27]:
# Imports
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType, IntegerType, DoubleType
import pandas as pd

from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, sum

from pyspark.sql.column import Column
from pyspark.sql.column import _to_java_column
from pyspark.sql.column import _to_seq
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType
import timeit

In [28]:
videos = spark.read.option('header', 'true').option("inferSchema", "true").csv('../datasets/USvideos.csv')
# videos.show()

In [29]:
comments_schema = StructType([ \
    StructField("video_id", StringType(), True), \
    StructField("comment_text", StringType(), True), \
    StructField("likes", IntegerType(), True), \
    StructField("replies", IntegerType(), True)])
comments = spark.read.option('header', 'true').option("mode", "DROPMALFORMED").schema(comments_schema).csv('../datasets/UScomments.csv')
# comments.show()

In [30]:
category = (spark.read.option("multiline","true")
                .json("../datasets/US_category_id.json")
                .select(explode("items"))
                .select(col("col.id").alias("id"),col("col.snippet.title").alias("category"))
     )

In [31]:
# !pip install --no-cache-dir matplotlib
# !pip install --no-cache-dir seaborn

# Пункт 1

In [32]:
# !mkdir -p /home/jovyan/notebooks/spark-warehouse
# !pwd
# !ls -lah /home/jovyan/notebooks/
# !cd ..
# !chmod -R 777 /home/jovyan/notebooks


In [33]:
videos.write \
    .bucketBy(16, "video_id") \
    .saveAsTable('videos_bucketed', format='csv', mode='overwrite',path='file:///home/jovyan/tables/videos_bucketed')
    
comments.write \
    .bucketBy(16, "video_id") \
    .saveAsTable('comments_bucketed', format='csv', mode='overwrite',path='file:///home/jovyan/tables/comments_bucketed')


videos_bucketed = spark.table('videos_bucketed')
comments_bucketed = spark.table('comments_bucketed')

In [34]:
scored_videos_bfr_agg = (videos_bucketed
                .join(comments_bucketed, ['video_id'],"left")
                .select(
                       videos_bucketed['video_id']
                       , videos_bucketed['title']
                       , videos_bucketed['channel_title']
                       , videos_bucketed['category_id']
                       , videos_bucketed['tags']
                       , videos_bucketed['views']
                       , videos_bucketed['likes']
                       , videos_bucketed['dislikes']
                       , videos_bucketed['comment_total']
                       , videos_bucketed['thumbnail_link']
                       , to_date(concat(videos_bucketed['date'],lit(".2023")),"dd.MM.yyyy").alias("date")
                       , comments_bucketed['comment_text']
                       , comments_bucketed['likes'].alias("commnet_likes")
                       , comments_bucketed['replies'].alias("commnet_replies")
                )
                )

In [35]:
@pandas_udf(DoubleType())
def calc_score(views: pd.Series, likes: pd.Series, dislikes: pd.Series, commnet_likes: pd.Series, commnet_replies: pd.Series) -> pd.Series:
    return (views + commnet_likes + commnet_replies) / dislikes * likes

# Using UDF with select()
# scored_videos.select(to_upper("views","likes","dislikes","commnet_likes","commnet_replies")).show()

# Using UDF with withColumn()
# scored_videos.withColumn("upper_col",to_upper("views","likes","dislikes","commnet_likes","commnet_replies")).show()

In [36]:
windowDate = Window.partitionBy("video_id").orderBy(col("date").desc())


scored_videos = (scored_videos_bfr_agg.groupBy("video_id"
                       , "title"
                       , "channel_title"
                       , "category_id"
                       , "tags"
                       , "views"
                       , "likes"
                       , "dislikes"
                       , "comment_total"
                       , "thumbnail_link"
                       , "date")
                       .agg(sum("commnet_likes").alias("sum_likes"),         
                            sum("commnet_replies").alias("sum_replies")
                           )
                       .withColumn("row",row_number().over(windowDate))
                       .filter(col("row") == 1)
                       .drop("row")
                       .withColumn("score" 
                                   , round(calc_score("views"
                                                , "likes"
                                                , "dislikes"
                                                , "sum_likes"
                                                , "sum_likes"
                                           ), 0).cast(IntegerType())
                       )
                       .drop("sum_likes")
                       .drop("sum_replies")
)

scored_videos.show(10)

+-----------+--------------------+--------------------+-----------+--------------------+-------+-----+--------+-------------+--------------------+----------+--------+
|   video_id|               title|       channel_title|category_id|                tags|  views|likes|dislikes|comment_total|      thumbnail_link|      date|   score|
+-----------+--------------------+--------------------+-----------+--------------------+-------+-----+--------+-------------+--------------------+----------+--------+
|--JinobXWPk|DANGEROUS Jungle ...|    Brave Wilderness|         15|adventure|adventu...|1319945|38949|     533|         6768|https://i.ytimg.c...|2023-10-20|96457820|
|-LoSw4o2zDQ|How to Make Pushe...|    kawaiisweetworld|         26|recipe|recipes|ho...|  62600| 4715|      29|          483|https://i.ytimg.c...|2023-10-14|10362269|
|0lDRz8qmXpE|Standing Up For Y...|        CollegeHumor|         23|Collegehumor|CH o...| 467127|24509|    2302|         3058|https://i.ytimg.c...|2023-10-15| 4975593

# Пункт 2

In [37]:
def get_median(df: pd.DataFrame) -> pd.DataFrame:
    return pd.DataFrame(df.groupby(["category"])["score"].median()).reset_index() 

In [38]:
# так как таблица category маленькая ее можно заброадкастить
bc_category = broadcast(category)

categories_score = (scored_videos 
    .join(bc_category, scored_videos['category_id'] == bc_category['id']) 
    .select(bc_category["category"],scored_videos["score"])
    .groupby("category").applyInPandas(get_median, "category string, score int")
)

categories_score.show(100)

# categories_score.show()

+--------------------+----------+
|            category|     score|
+--------------------+----------+
|               Shows|    113341|
|           Education|  14420106|
|              Gaming|   7802270|
|       Entertainment|  11426582|
|     Travel & Events|  12359104|
|Science & Technology|  11559558|
|              Sports|   3406357|
|       Howto & Style|  13730344|
|Nonprofits & Acti...|1074189006|
|    Film & Animation|  13132268|
|      People & Blogs|  10955716|
|     News & Politics|    926484|
|      Pets & Animals|  15239595|
|    Autos & Vehicles|   5718967|
|               Music|  16500100|
|              Comedy|  23844659|
+--------------------+----------+



# Пункт 3

In [39]:
sc = spark.sparkContext
n = 20

def udfSplitTagsUDFScalaWrapper(ipString):
    _splitTagsUDF = sc._jvm.CustomUDFs.splitTagsUDF()
    return Column(_splitTagsUDF.apply(_to_seq(sc, [ipString], _to_java_column)))

split_tags = videos.select("tags",udfSplitTagsUDFScalaWrapper(col("tags")).alias("split_tags_scala"))

timeit.timeit('split_tags.count()', number=n, globals=globals())/n

0.17926490550016752

In [40]:
@pandas_udf(ArrayType(StringType()))
def split_string(column: pd.Series) -> pd.Series:
    return column.str.split("|")


split_tags = videos.select(split_string("tags").alias("tags"))

timeit.timeit('split_tags.count()', number=n, globals=globals())/n

0.18536641635000706

In [41]:
popular_tags = (split_tags
 .select(explode("tags").alias("tag"))
 .where("tag != '[none]'")
 .groupBy("tag").count()
 .sort(col("count").desc())
 )

popular_tags.show(10)


+--------+-----+
|     tag|count|
+--------+-----+
|   funny|  722|
|  comedy|  572|
|    2017|  309|
|  how to|  284|
|    vlog|  273|
|   humor|  258|
|  makeup|  254|
|   music|  250|
|tutorial|  235|
|    food|  224|
+--------+-----+
only showing top 10 rows



# Пункт 4

In [42]:
# videos.count()
# comments.count()
videos.show(5)

+-----------+--------------------+----------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|   video_id|               title|   channel_title|category_id|                tags|  views| likes|dislikes|comment_total|      thumbnail_link| date|
+-----------+--------------------+----------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|XpVt6Z1Gjjo|1 YEAR OF VLOGGIN...|Logan Paul Vlogs|         24|logan paul vlog|l...|4394029|320053|    5931|        46245|https://i.ytimg.c...|13.09|
|K4wEI5zhHB0|iPhone X — Introd...|           Apple|         28|Apple|iPhone 10|i...|7860119|185853|   26679|            0|https://i.ytimg.c...|13.09|
|cLdxuaxaQwc|         My Response|       PewDiePie|         22|              [none]|5845909|576597|   39774|       170708|https://i.ytimg.c...|13.09|
|WYYvHb03Eog|Apple iPhone X fi...|       The Verge|         28|apple iphone x ha...|2642103| 24975| 

In [43]:
!pip install --no-cache-dir mmh3 bitarray



In [44]:
import math
import mmh3
from bitarray import bitarray
  
  
class BloomFilter(object):
  
    '''
    Class for Bloom filter, using murmur3 hash function
    '''
  
    def __init__(self, items_count, fp_prob):
        '''
        items_count : int
            Number of items expected to be stored in bloom filter
        fp_prob : float
            False Positive probability in decimal
        '''
        self.items_count = items_count
        
        # False possible probability in decimal
        self.fp_prob = fp_prob
  
        # Size of bit array to use
        self.size = self.get_size(items_count, fp_prob)
  
        # number of hash functions to use
        self.hash_count = self.get_hash_count(self.size, items_count)
  
        # Bit array of given size
        self.bit_array = bitarray(self.size)
  
        # initialize all bits as 0
        self.bit_array.setall(0)
  
    def add(self, item):
        '''
        Add an item in the filter
        '''
        digests = []
        for i in range(self.hash_count):
  
            # create digest for given item.
            # i work as seed to mmh3.hash() function
            # With different seed, digest created is different
            digest = mmh3.hash(item, i) % self.size
            digests.append(digest)
  
            # set the bit True in bit_array
            self.bit_array[digest] = True
        
    def union(self, other):
        """ Calculates the union of the two underlying bitarrays and returns
        a new bloom filter object."""
        new_bloom = self.copy()
        new_bloom.bit_array = new_bloom.bit_array | other.bit_array
        return new_bloom
  
    def check(self, item):
        '''
        Check for existence of an item in filter
        '''
        for i in range(self.hash_count):
            digest = mmh3.hash(item, i) % self.size
            if self.bit_array[digest] == False:
  
                # if any of bit is False then,its not present
                # in filter
                # else there is probability that it exist
                return False
        return True
    
    def copy(self):
        """Return a copy of this bloom filter.
        """
        new_filter = BloomFilter(self.items_count, self.fp_prob)
        new_filter.bit_array = self.bit_array.copy()
        return new_filter
    
    def set_bit_array(self, bit_array):
        self.bit_array = bit_array
  
    @classmethod
    def get_size(self, n, p):
        '''
        Return the size of bit array(m) to used using
        following formula
        m = -(n * lg(p)) / (lg(2)^2)
        n : int
            number of items expected to be stored in filter
        p : float
            False Positive probability in decimal
        '''
        m = -(n * math.log(p))/(math.log(2)**2)
        return int(m)
  
    @classmethod
    def get_hash_count(self, m, n):
        '''
        Return the hash function(k) to be used using
        following formula
        k = (m/n) * lg(2)
  
        m : int
            size of bit array
        n : int
            number of items expected to be stored in filter
        '''
        k = (m/n) * math.log(2)
        return int(k)

In [45]:
cat_videos = (videos
 .select("video_id",split_string("tags").alias("tags"))
 .select("video_id",explode("tags").alias("tag"))
 .where("tag = 'cat'")
)

cat_videos.count()

48

In [46]:
# Для соединения лучше подойдет фильтр блума, так как video_id после фильтрации по тегу cats становится высоко селективной колонкой
# Создаем фильтр

filterSize = 31771
prob = 0.05

def fill_bloom_filter(bf, items):
    for i in items:
        bf.add(str(i[0]))
    return bf

bloom_filter = BloomFilter(filterSize, prob)

general_bit_array = cat_videos.select(col('video_id')).rdd \
    .mapPartitions(lambda p: [fill_bloom_filter(BloomFilter(filterSize, prob), p).bit_array]) \
    .reduce(lambda a, b: a.bit_array | b.bit_array)

bloom_filter.set_bit_array(general_bit_array)

maybe_in_bf = udf(lambda video_id: bloom_filter.check(str(video_id)))


In [47]:
top_cat_comments = (comments
    .filter(maybe_in_bf(col('video_id')) == True)
    .join(cat_videos, cat_videos['video_id'] == comments['video_id'])
    .select(comments["comment_text"],comments["likes"])
    .groupBy("comment_text").count()
    .sort(col("count").desc())
)

top_cat_comments.show(5,False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|comment_text                                                                                                                                                               