In [156]:
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local") \
    .config('spark.sql.autoBroadcastJoinThreshold', 0) \
    .config('spark.sql.adaptive.enabled', 'false') \
    .getOrCreate()

In [157]:
videos = spark.read.option('header', 'true').option("inferSchema", "true").csv('../datasets/USvideos.csv')
videos.show()

+-----------+--------------------+--------------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|   video_id|               title|       channel_title|category_id|                tags|  views| likes|dislikes|comment_total|      thumbnail_link| date|
+-----------+--------------------+--------------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+
|XpVt6Z1Gjjo|1 YEAR OF VLOGGIN...|    Logan Paul Vlogs|         24|logan paul vlog|l...|4394029|320053|    5931|        46245|https://i.ytimg.c...|13.09|
|K4wEI5zhHB0|iPhone X ‚Äî Introd...|               Apple|         28|Apple|iPhone 10|i...|7860119|185853|   26679|            0|https://i.ytimg.c...|13.09|
|cLdxuaxaQwc|         My Response|           PewDiePie|         22|              [none]|5845909|576597|   39774|       170708|https://i.ytimg.c...|13.09|
|WYYvHb03Eog|Apple iPhone X fi...|           The Verge|         28|apple i

In [158]:
comments_schema = StructType([ \
    StructField("video_id", StringType(), True), \
    StructField("comment_text", StringType(), True), \
    StructField("likes", IntegerType(), True), \
    StructField("replies", IntegerType(), True)])
comments = spark.read.option('header', 'true').option("mode", "DROPMALFORMED").schema(comments_schema).csv('../datasets/UScomments.csv')
comments.show()

+-----------+--------------------+-----+-------+
|   video_id|        comment_text|likes|replies|
+-----------+--------------------+-----+-------+
|XpVt6Z1Gjjo|Logan Paul it's y...|    4|      0|
|XpVt6Z1Gjjo|I've been followi...|    3|      0|
|XpVt6Z1Gjjo|Say hi to Kong an...|    3|      0|
|XpVt6Z1Gjjo| MY FAN . attendance|    3|      0|
|XpVt6Z1Gjjo|         trending üòâ|    3|      0|
|XpVt6Z1Gjjo|#1 on trending AY...|    3|      0|
|XpVt6Z1Gjjo|The end though üò≠...|    4|      0|
|XpVt6Z1Gjjo|#1 trending!!!!!!!!!|    3|      0|
|XpVt6Z1Gjjo|Happy one year vl...|    3|      0|
|XpVt6Z1Gjjo|You and your shit...|    0|      0|
|XpVt6Z1Gjjo|There should be a...|    0|      0|
|XpVt6Z1Gjjo|Dear Logan, I rea...|    0|      0|
|XpVt6Z1Gjjo|Honestly Evan is ...|    0|      0|
|XpVt6Z1Gjjo|Casey is still be...|    0|      0|
|XpVt6Z1Gjjo|aw geez rick this...|    0|      0|
|XpVt6Z1Gjjo|He happy cause he...|    0|      0|
|XpVt6Z1Gjjo|Ayyyyoooo Logang ...|    1|      0|
|XpVt6Z1Gjjo|Bro

In [159]:
# –ü–æ–¥–≥–æ—Ç–æ–≤–∫–∞ –¥–∞—Ç–∞—Å–µ—Ç–æ–≤
# –≤—ã–±–∏—Ä–∞–µ–º –∏–Ω—Ñ–æ—Ä–º–∞—Ü–∏—é –ø–æ –≤–∏–¥–µ–æ —Ç–æ–ª—å–∫–æ –∑–∞ –ø–æ—Å–ª–µ–¥–Ω–∏–π –¥–µ–Ω—å

from pyspark.sql.window import Window

by_date = Window.partitionBy('video_id').orderBy(desc('date'))
last_video_stats = videos.withColumn('row', row_number().over(by_date).alias('row'))\
  .where(col('row') == 1).drop("row")

# –∏—Å–ø–æ–ª—å–∑—É–µ–º bucketing –¥–ª—è –∏—Å—Ö–æ–¥–Ω—ã—Ö –¥–∞—Ç–∞—Å–µ—Ç–æ–≤ —á—Ç–æ–±—ã –æ–±–ª–µ–≥—á–∏—Ç—å join,
# —Ç–∞–∫ –∫–∞–∫ partion skewing –Ω–µ –æ–±–Ω–∞—Ä—É–∂–µ–Ω
# partitioning –Ω–µ –ø—Ä–∏–º–µ–Ω–∏–º (—Ç.–∫. –∫–ª—é—á –≥—Ä—É–ø–ø–∏—Ä–æ–≤–∫–∏ 'video_id' –Ω–µ —Ñ–∏–∫—Å–∏—Ä–æ–≤–∞–Ω)
buckets_num = 16
last_video_stats\
        .write.bucketBy(buckets_num, 'video_id')\
        .saveAsTable('videos', format='parquet', mode='overwrite')
bucketed_videos = spark.sql('select * from videos')
bucketed_videos.show()

comments\
        .write.bucketBy(buckets_num, 'video_id')\
        .saveAsTable('comments', format='parquet', mode='overwrite')
bucketed_comments = spark.sql('select * from comments')
bucketed_comments.show()


+-----------+--------------------+--------------------+-----------+--------------------+--------+------+--------+-------------+--------------------+-----+
|   video_id|               title|       channel_title|category_id|                tags|   views| likes|dislikes|comment_total|      thumbnail_link| date|
+-----------+--------------------+--------------------+-----------+--------------------+--------+------+--------+-------------+--------------------+-----+
|5ggZ9jIHnr8|Alesso & Anitta -...|              Alesso|         10|alesso anitta is ...|14849524|618436|  108966|        84942|https://i.ytimg.c...|20.10|
|7Lyo5dCig-U|BBC Anchor Gets L...|         NewsFunnies|         25|news bloopers|blo...|  273256|  2408|     121|          263|https://i.ytimg.c...|14.09|
|B_CHjYoqPUU|Casually Explaine...|  Casually Explained|         23|is she into you|d...| 1516624| 76278|    1541|         3989|https://i.ytimg.c...|15.09|
|Km_u51OE3VA|Mosaic: Official ...|                 HBO|          1|HBO

In [160]:
#1. scored_videos - –¥–∞—Ç–∞—Å–µ—Ç –Ω–∞ –æ—Å–Ω–æ–≤–µ —Ñ–∞–π–ª–∞ USvideos.csv —Å –¥–æ–±–∞–≤–ª–µ–Ω–∏–µ–º –∫–æ–ª–æ–Ω–∫–∏,
# —Å–æ–¥–µ—Ä–∂–∞—â–µ–π —Å–∫–æ—Ä (–ø–æ–∫–∞–∑–∞—Ç–µ–ª—å –∫–∞—á–µ—Å—Ç–≤–∞) –≤–∏–¥–µ–æ. –ù–∏–∫—Ç–æ –Ω–µ –∑–Ω–∞–µ—Ç, –∫–∞–∫ —Å—á–∏—Ç–∞—Ç—å —Å–∫–æ—Ä,
# –ø–æ—ç—Ç–æ–º—É —Ñ–æ—Ä–º—É–ª—É –ø—Ä–µ–¥–ª–∞–≥–∞–µ—Ç—Å—è –ø—Ä–∏–¥—É–º–∞—Ç—å –≤–∞–º. –ù–æ –æ–Ω–∞ –¥–æ–ª–∂–Ω–∞ –≤–∫–ª—é—á–∞—Ç—å –≤ —Å–µ–±—è –ø—Ä–æ—Å–º–æ—Ç—Ä—ã,
# –ª–∞–π–∫–∏, –¥–∏–∑–ª–∞–π–∫–∏ –≤–∏–¥–µ–æ, –ª–∞–π–∫–∏ –∏ –¥–∏–∑–ª–∞–π–∫–∏ –∫ –∫–æ–º–º–µ–Ω—Ç–∞—Ä–∏—è–º –∫ —ç—Ç–æ–º—É –≤–∏–¥–µ–æ.

import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf('double')
def video_score_udf(views: pd.Series, likes: pd.Series, dislikes: pd.Series, comment_likes: pd.Series, comment_replies: pd.Series) -> pd.Series:
    # some 'magic' score formula
    return views/10 + likes*100 - dislikes*10 + comment_likes*5 + comment_replies*10

agg_comments = bucketed_comments\
                .groupby('video_id')\
                .agg(sum('likes').alias('comment_likes'), sum('replies').alias('comment_replies'))

scored_videos = bucketed_videos.join(agg_comments, 'video_id', 'left')\
    .withColumn('score', video_score_udf('views', 'likes', 'dislikes', 'comment_likes', 'comment_replies'))
scored_videos.show()

+-----------+--------------------+--------------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+-------------+---------------+------------+
|   video_id|               title|       channel_title|category_id|                tags|  views| likes|dislikes|comment_total|      thumbnail_link| date|comment_likes|comment_replies|       score|
+-----------+--------------------+--------------------+-----------+--------------------+-------+------+--------+-------------+--------------------+-----+-------------+---------------+------------+
|4F2KWDQQMhY|Riverdale: Betwee...|    Madelaine Petsch|         22|madelaine|madelai...| 284397| 23482|      56|         1674|https://i.ytimg.c...|18.09|           63|             22|   2376614.7|
|4yCkkOvIkUI|EXCLUSIVE: Zonniq...|            YBF Chic|         24|              [none]|   5662|    33|      21|           13|https://i.ytimg.c...|06.10|           36|              4|      3876.2|
|7KS2oJPzeZk|Da

In [161]:
# 2. categories_score - –¥–∞—Ç–∞—Å–µ—Ç –ø–æ –∫–∞—Ç–µ–≥–æ—Ä–∏—è–º, –≤ –∫–æ—Ç–æ—Ä–æ–º –ø—Ä–∏—Å—É—Ç—Å—Ç–≤—É—é—Ç —Å–ª–µ–¥—É—é—â–∏–µ –ø–æ–ª—è:
# –ù–∞–∑–≤–∞–Ω–∏–µ –∫–∞—Ç–µ–≥–æ—Ä–∏–∏ (–Ω–µ id, –æ–Ω –Ω–µ–ø–æ–Ω—è—Ç–Ω—ã–π –¥–ª—è –∞–Ω–∞–ª–∏—Ç–∏–∫–æ–≤!).
# –ú–µ–¥–∏–∞–Ω–∞ –ø–æ–∫–∞–∑–∞—Ç–µ–ª—è score –∏–∑ –¥–∞—Ç–∞—Å–µ—Ç–∞ scored_videos –ø–æ –∫–∞–∂–¥–æ–π –∫–∞—Ç–µ–≥–æ—Ä–∏–∏.

categories_schema = StructType([\
    StructField('items', ArrayType(\
        StructType([\
            StructField('id', StringType(), True),\
            StructField('snippet', StructType([\
                StructField('title', StringType(), True)\
            ]), True)\
        ]),\
    ), True)])

categories = spark.read.option('multiline', 'true')\
                  .schema(categories_schema)\
                  .json('../datasets/US_category_id.json')\
                  .select(explode('items').alias('category'))\
                  .select(col('category.id').alias('category_id'), col('category.snippet.title').alias('category_title'))
# categories.show()

@pandas_udf("double")
def median_udf(v: pd.Series) -> float:
    return v.median()

# broadcast join –∑–¥–µ—Å—å —É–º–µ—Å—Ç–µ–Ω —Ç–∞–∫ –∫–∞–∫ –∫–∞—Ç–µ–≥–æ—Ä–∏–π –æ—Ç–Ω–æ—Å–∏—Ç–µ–ª—å–Ω–æ –º–∞–ª–æ, –∏ –≤—Ä—è–¥–ª–∏ —ç—Ç–æ—Ç –¥–∞—Ç–∞—Å–µ—Ç —Å–∫–ª–æ–Ω–µ–Ω –∫ —Å–∏–ª—å–Ω–æ–º—É —Ä–æ—Å—Ç—É.
scored_videos.select('category_id', 'score')\
             .groupby('category_id').agg(median_udf('score').alias('median_score'))\
             .join(broadcast(categories), 'category_id', 'left')\
             .select('category_title', 'median_score')\
             .sort(desc('median_score'))\
             .show()


+--------------------+------------+
|      category_title|median_score|
+--------------------+------------+
|              Comedy|  2514279.95|
|       Howto & Style|   1242152.2|
|               Music|   1062702.8|
|      People & Blogs|   994081.15|
|Science & Technology|   953925.45|
|           Education|    907899.1|
|       Entertainment|    843028.0|
|     Travel & Events|    786572.8|
|      Pets & Animals|    622508.6|
|    Film & Animation|    595508.5|
|              Gaming|    431630.9|
|    Autos & Vehicles|    188666.5|
|              Sports|    143808.1|
|     News & Politics|    116656.8|
|Nonprofits & Acti...|    46172.65|
|               Shows|     12907.6|
+--------------------+------------+



In [162]:
# 3. popular_tags - –¥–∞—Ç–∞—Å–µ—Ç –ø–æ —Å–∞–º—ã–º –ø–æ–ø—É–ª—è—Ä–Ω—ã–º —Ç—ç–≥–∞–º (–Ω–∞–∑–≤–∞–Ω–∏–µ —Ç—ç–≥–∞ + –∫–æ–ª–∏—á–µ—Å—Ç–≤–æ –≤–∏–¥–µ–æ —Å —ç—Ç–∏–º —Ç—ç–≥–æ–º).
# –í –∏—Å—Ö–æ–¥–Ω–æ–º –¥–∞—Ç–∞—Å–µ—Ç–µ —Ç—ç–≥–∏ –ª–µ–∂–∞—Ç —Å—Ç—Ä–æ–∫–æ–π –≤ –ø–æ–ª–µ tags. –î—Ä—É–≥–∏–µ —Ä–∞–∑—Ä–∞–±–æ—Ç—á–∏–∫–∏ —É–∂–µ —Å—Ç–∞–ª–∫–∏–≤–∞–ª–∏—Å—å —Å –ø–æ–¥–æ–±–Ω–æ–π –∑–∞–¥–∞—á–µ–π,
# –ø–æ—ç—Ç–æ–º—É –Ω–∞–ø–∏—Å–∞–ª–∏ Scala-—Ñ—É–Ω–∫—Ü–∏—é –¥–ª—è —Ä–∞–∑–±–∏–µ–Ω–∏—è —Ç–µ–≥–æ–≤. –ù–æ –Ω–µ –¥–æ–≤–µ—Ä—è–π—Ç–µ –∏–º –≤—Å–ª–µ–ø—É—é!
# –û–±—è–∑–∞—Ç–µ–ª—å–Ω–æ –Ω–∞–ø–∏—à–∏—Ç–µ —Å–≤–æ—é —Ñ—É–Ω–∫—Ü–∏—é —Ä–∞–∑–±–∏–µ–Ω–∏—è —Å—Ç—Ä–æ–∫–∏ –Ω–∞ —Ç—ç–≥–∏ –∏ —Å—Ä–∞–≤–Ω–∏—Ç–µ –≤—Ä–µ–º—è —Ä–∞–±–æ—Ç—ã —Å –µ—ë Scala-–≤–µ—Ä—Å–∏–µ–π.
# –ú–æ–∂–Ω–æ –∑–∞–º–µ—Ä—è—Ç—å —Å–≤–æ–∏–º–∏ —Å–∏–ª–∞–º–∏, –∞ –º–æ–∂–Ω–æ –≤–æ—Å–ø–æ–ª—å–∑–æ–≤–∞—Ç—å—Å—è –±–∏–±–ª–∏–æ—Ç–µ–∫–æ–π timeit.
from pyspark.sql.column import Column, _to_java_column, _to_seq
import pandas as pd
from pyspark.sql.functions import pandas_udf
from timeit import Timer

def split_tags_scala(col):
    sc = spark.sparkContext
    _split_tags_udf = sc._jvm.CustomUDFs.splitTagsUDF()
    return Column(_split_tags_udf.apply(_to_seq(sc, [col], _to_java_column)))

popular_tags_scala = bucketed_videos\
                          .select(split_tags_scala('tags').alias('tags'))\
                          .select(explode('tags').alias('tag'))\
                          .groupby('tag')\
                          .count()\
                          .sort(desc('count'))

scala_udf_timer = Timer(lambda: popular_tags_scala.show(10))
print('Populat tags with scala udf took: ' + str(scala_udf_timer.timeit(number=1)))


@pandas_udf('array<string>')
def split_tags_pandas(v: pd.Series) -> pd.Series:
    return v.apply(lambda tags: tags.split('|'))

popular_tags_pandas = bucketed_videos\
                           .select(split_tags_pandas('tags').alias('tags'))\
                           .select(explode('tags').alias('tag'))\
                           .groupby('tag')\
                           .count()\
                           .sort(desc('count'))

pandas_udf_timer = Timer(lambda: popular_tags_pandas.show(10))
print('Populat tags with scala udf took: ' + str(pandas_udf_timer.timeit(number=1)))

# –õ–æ–∫–∞–ª—å–Ω—ã–µ –∑–∞–ø—É—Å–∫–∏ –ø–æ–∫–∞–∑—ã–≤–∞—é—Ç –Ω–µ–∑–Ω–∞—á–∏—Ç–µ–ª—å–Ω–æ–µ –æ—Å—Ç–∞–≤–∞–Ω–∏–µ –ø–æ –≤—Ä–µ–º–µ–Ω–∏ —Ä–µ–∞–ª–∏–∑–∞—Ü–∏–∏ –Ω–∞ pandas (5-10%)
# –ù–∞—Ç–∏–≤–Ω—É—é —Å–∫–∞–ª–∞ —Ä–µ–∞–ª–∏–∑–∞—Ü–∏—é —Å–ª–æ–∂–Ω–æ –æ–±–æ–≥–Ω–∞—Ç—å, —Ç–∞–∫ –∫–∞–∫ scala udf –≤—ã–ø–æ–ª–Ω—è–µ—Ç—Å—è –≤ —Ç–æ–π –∂–µ —Å–∞–º–æ–π jvm –≤–æ—Ä–∫–µ—Ä–∞
# Pandas —Ä–µ–∞–ª–∏–∑–∞—Ü–∏—è –æ—Å–æ–±–æ –Ω–µ —É—Å—Ç—É–ø–∞–µ—Ç —Ç–∞–∫ –∫–∞–∫ –∏—Å–ø–æ–ª—å–∑—É–µ—Ç—Å—è ArrowEvalPython

+------+-----+
|   tag|count|
+------+-----+
| funny|  217|
|comedy|  163|
|[none]|  144|
|  2017|   93|
| humor|   92|
|how to|   84|
|makeup|   77|
| music|   74|
|  vlog|   73|
| video|   71|
+------+-----+
only showing top 10 rows

Populat tags with scala udf took: 2.665118793025613
+------+-----+
|   tag|count|
+------+-----+
| funny|  217|
|comedy|  163|
|[none]|  144|
|  2017|   93|
| humor|   92|
|how to|   84|
|makeup|   77|
| music|   74|
|  vlog|   73|
| video|   71|
+------+-----+
only showing top 10 rows

Populat tags with scala udf took: 2.9980885009281337


In [163]:
!pip install --no-cache-dir mmh3 bitarray

import math
import mmh3
from bitarray import bitarray

class BloomFilter(object):
  
    '''
    Class for Bloom filter, using murmur3 hash function
    '''
  
    def __init__(self, items_count, fp_prob):
        '''
        items_count : int
            Number of items expected to be stored in bloom filter
        fp_prob : float
            False Positive probability in decimal
        '''
        self.items_count = items_count
        
        # False possible probability in decimal
        self.fp_prob = fp_prob
  
        # Size of bit array to use
        self.size = self.get_size(items_count, fp_prob)
  
        # number of hash functions to use
        self.hash_count = self.get_hash_count(self.size, items_count)
  
        # Bit array of given size
        self.bit_array = bitarray(self.size)
  
        # initialize all bits as 0
        self.bit_array.setall(0)
  
    def add(self, item):
        '''
        Add an item in the filter
        '''
        digests = []
        for i in range(self.hash_count):
  
            # create digest for given item.
            # i work as seed to mmh3.hash() function
            # With different seed, digest created is different
            digest = mmh3.hash(item, i) % self.size
            digests.append(digest)
  
            # set the bit True in bit_array
            self.bit_array[digest] = True
        
    def union(self, other):
        """ Calculates the union of the two underlying bitarrays and returns
        a new bloom filter object."""
        new_bloom = self.copy()
        new_bloom.bit_array = new_bloom.bit_array | other.bit_array
        return new_bloom
  
    def check(self, item):
        '''
        Check for existence of an item in filter
        '''
        for i in range(self.hash_count):
            digest = mmh3.hash(item, i) % self.size
            if self.bit_array[digest] == False:
  
                # if any of bit is False then,its not present
                # in filter
                # else there is probability that it exist
                return False
        return True
    
    def copy(self):
        """Return a copy of this bloom filter.
        """
        new_filter = BloomFilter(self.items_count, self.fp_prob)
        new_filter.bit_array = self.bit_array.copy()
        return new_filter
    
    def set_bit_array(self, bit_array):
        self.bit_array = bit_array
  
    @classmethod
    def get_size(self, n, p):
        '''
        Return the size of bit array(m) to used using
        following formula
        m = -(n * lg(p)) / (lg(2)^2)
        n : int
            number of items expected to be stored in filter
        p : float
            False Positive probability in decimal
        '''
        m = -(n * math.log(p))/(math.log(2)**2)
        return int(m)
  
    @classmethod
    def get_hash_count(self, m, n):
        '''
        Return the hash function(k) to be used using
        following formula
        k = (m/n) * lg(2)
  
        m : int
            size of bit array
        n : int
            number of items expected to be stored in filter
        '''
        k = (m/n) * math.log(2)
        return int(k)
        



In [None]:
# –ò –ª–∏—á–Ω–∞—è –ø—Ä–æ—Å—å–±–∞ –æ—Ç –ú–∞—Ä–∫–∞: –æ–Ω –ª—é–±–∏—Ç –∫–æ—Ç–æ–≤ (–∞ –∫—Ç–æ –Ω–µ –∏—Ö –Ω–µ –ª—é–±–∏—Ç!)
# –∏ —Ö–æ—á–µ—Ç –Ω–∞–π—Ç–∏ —Å–∞–º—ã–µ –∏–Ω—Ç–µ—Ä–µ—Å–Ω—ã–µ –∫–æ–º–º–µ–Ω—Ç–∞—Ä–∏–∏ (—Ç–æ–ø-5) –∫ –≤–∏–¥–µ–æ –ø—Ä–æ –∫–æ—Ç–æ–≤.
# ‚Äú–í–∏–¥–µ–æ –ø—Ä–æ –∫–æ—Ç–æ–≤‚Äù - –≤–∏–¥–µ–æ, —É –∫–æ—Ç–æ—Ä–æ–≥–æ –µ—Å—Ç—å —Ç—ç–≥ ‚Äúcat

videos_with_cats = bucketed_videos.where(col('tags').contains('Cat'))

# –±—É–¥–µ–º –∏—Å–ø–æ–ª—å–∑–æ–≤–∞—Ç—å —Ñ–∏–ª—å—Ç—Ä –ë–ª—É–º–∞ –¥–ª—è —Ç–æ–≥–æ —á—Ç–æ –æ—Ç—Ñ–∏–ª—å—Ç—Ä–æ–≤–∞—Ç—å –∫–æ–º–º–µ–Ω—Ç–∞—Ä–∏–∏ –ø–æ video_id –ø–µ—Ä–µ–¥ —Ç–µ–º –∫–∞–∫ join–∏—Ç—å
filterSize = 1000
prob = 0.1

def fill_bloom_filter(bf, items) -> BloomFilter:
    for i in items:
        bf.add(str(i['video_id']))
    return bf

bloom_filter = BloomFilter(filterSize, prob)

# –Ω–∞–ø–æ–ª–Ω—è–µ–º —Ñ–∏–ª—å—Ç—Ä
general_bit_array = videos_with_cats.select('video_id').rdd\
    .mapPartitions(lambda part: [fill_bloom_filter(BloomFilter(filterSize, prob), part).bit_array]) \
    .reduce(lambda a, b: a | b)

bloom_filter.set_bit_array(general_bit_array)

# —Å–æ–∑–¥–∞–µ–º udf –Ω–∞ –æ—Å–Ω–æ–≤–µ —Ñ–∏–ª—å—Ç—Ä–∞
maybe_in_bf = udf(lambda video_id: bloom_filter.check(str(video_id)))

# join–∏–º
bucketed_comments\
    .where(maybe_in_bf(col('video_id')) == True) \
    .join(videos_with_cats, 'video_id') \
    .sort(desc(bucketed_comments.likes))\
    .limit(5)\
    .select(bucketed_videos.title, bucketed_comments.likes, bucketed_videos.views, bucketed_comments.comment_text)\
    .show(truncate=False)