In [133]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window

In [134]:
spark = SparkSession.builder.appName("loco").getOrCreate()

In [135]:
class Constants:
    LIKED_PATH = "assignment_data/reel_liked_data"
    UNLIKED_PATH = "assignment_data/reel_unliked_data"
    WATCHED_PATH = "assignment_data/reel_watched_data"
    DATA_SINK = ""

In [231]:
class TestCases:

    test_liked_only_event =  {'reel_liked_data': [{"event_name": "reel_liked", "timestamp": 1000000004, "server_timestamp": 1000000004, "user_uid": "ML0ZSX", "streamer_uid": "GV24MY5", "reel_uid":"881e4c8b-99af-4156-a325-a2068f5bf873", "category_uid": "bgmi", "origin_platform": "ios", "app_version": "5.7.9", "reel_duration": 100, "like_time": 19}
                                            ],
                        'reel_unliked_data': [{"event_name": "reel_unliked", "timestamp": 1000000003, "server_timestamp": 1000000003, "user_uid": "ML0ZSX", "streamer_uid": "GV24MY5", "reel_uid": "881e4c8b-99af-4156-a325-a2068f5bf873", "category_uid": "bgmi", "origin_platform": "ios", "app_version": "5.7.9", "reel_duration": 100, "unlike_time": 38}
],
                        'reel_watched_data': [{"event_name": "reel_watched", "timestamp": 1000000002, "server_timestamp": 1000000002, "user_uid": "ML0ZSX", "streamer_uid": "GV24MY5", "reel_uid": "881e4c8b-99af-4156-a325-a2068f5bf873", "category_uid": "bgmi", "origin_platform": "ios", "app_version": "5.7.9", "reel_watch_duration": 72, "reel_duration": 100, "replay_count": 2},
                                              {"event_name": "reel_watched", "timestamp": 1000000001, "server_timestamp": 1000000001, "user_uid": "ML0ZSX", "streamer_uid": "GV24MY5", "reel_uid": "881e4c8b-99af-4156-a325-a2068f5bf873", "category_uid": "bgmi", "origin_platform": "ios", "app_version": "5.7.9", "reel_watch_duration": 36, "reel_duration": 100, "replay_count": 1}
],
                        'expected_result': []}

    test_unliked_only_event = {'reel_liked_data': [],
                          'reel_unliked_data': [],
                          'reel_watched_data': [],
                          'expected_result': []}

    test_only_watched_event = {'reel_liked_data': [],
                               'reel_unliked_data': [],
                               'reel_watched_data': [],
                               'expected_result': []}

In [232]:
liked_df = spark.createDataFrame(TestCases.test_liked_only_event['reel_liked_data'])
unliked_df = spark.createDataFrame(TestCases.test_liked_only_event['reel_unliked_data'])
watched_df = spark.createDataFrame(TestCases.test_liked_only_event['reel_watched_data'])

In [233]:
# liked_df = spark.CreateDataFrame(Constants.LIKED_PATH)
# unliked_df = spark.read.json(Constants.UNLIKED_PATH)
# watched_df = spark.read.json(Constants.WATCHED_PATH)

In [239]:
from pyspark.sql import functions as F

class Clips():
    def __init__(self, watched_df, liked_df, unliked_df):
        self.watched_df = watched_df
        self.liked_df = liked_df
        self.unliked_df = unliked_df

    def calculate_watched_info(self):
        self.watched_df = self.watched_df.withColumn(
        "watched_ratio",
        F.when(
            ((F.col("origin_platform") == "android") &
            (F.col("app_version") >= "2.3.0")) | (F.col("origin_platform") == "ios"),
            F.col("reel_watch_duration") / F.col("reel_duration")
        ).otherwise(
            F.when(
                (F.col("origin_platform") == "android") &
                (F.col("app_version") < "2.3.0") &
                (F.col("replay_count") > 0),
                (F.col("reel_duration") * F.col("replay_count") + F.col("reel_watch_duration")) / F.col("reel_duration")
            )
        )
    )

    def calculate_liked_events(self):
        self.liked_events = self.liked_df.groupBy("user_uid", "reel_uid").agg(F.max("timestamp").alias("liked_timestamp"))

    def calculate_unliked_events(self):
        self.unliked_events = self.unliked_df.groupBy("user_uid", "reel_uid").agg(F.max("timestamp").alias("unliked_timestamp"))

    def calculate_interests(self):
        self.interests_df = self.watched_df.join(
            self.liked_events,
            (self.watched_df["user_uid"] == self.liked_events["user_uid"]) &
            (self.watched_df["reel_uid"] == self.liked_events["reel_uid"]) &
            (self.watched_df["timestamp"] <= self.liked_events["liked_timestamp"]),
            "left"
        ).join(
            self.unliked_events,
            (self.watched_df["user_uid"] == self.unliked_events["user_uid"]) &
            (self.watched_df["reel_uid"] == self.unliked_events["reel_uid"]) &
            (self.watched_df["timestamp"] <= self.unliked_events["unliked_timestamp"]),
            "left"
        ).withColumn(
            "liked",
            F.when(F.col("liked_timestamp").isNotNull() & (F.col("liked_timestamp") >= F.col("unliked_timestamp")), True).otherwise(False)
        ).withColumn(
            "unliked",
            F.when(F.col("unliked_timestamp").isNotNull() & (F.col("unliked_timestamp") >= F.col("liked_timestamp")), True).otherwise(False)
        ).withColumn(
        "is_interested",
        F.when(
            F.col("unliked"),
            False
        ).otherwise(
            F.when(
                F.col("liked"),
                True
            ).otherwise(
                F.when(
                    (~F.col("liked") & ~F.col("unliked")) &
                    (F.col("watched_ratio") > 0.6),
                    True
                ).otherwise(False)
            )
        )).select(
                self.watched_df["user_uid"],
                self.watched_df["reel_uid"],
                self.watched_df["timestamp"],
                "liked",
                "unliked",
                "watched_ratio",
                "is_interested"
            )

    def get_latest_interests(self):
        latest_status_df = self.interests_df.groupBy("user_uid", "reel_uid").agg(
                            F.max("timestamp").alias("latest_timestamp")
                        )

        self.interests_df = self.interests_df.join(
                        latest_status_df,
                        (self.interests_df["user_uid"] == latest_status_df["user_uid"]) &
                        (self.interests_df["reel_uid"] == latest_status_df["reel_uid"]) &
                        (self.interests_df["timestamp"] == latest_status_df["latest_timestamp"]),
                        "inner"
                    )

    def run(self):
        self.calculate_liked_events()
        self.calculate_unliked_events()
        self.calculate_watched_info()
        self.calculate_interests()
        self.get_latest_interests()

In [240]:
cl = Clips(watched_df, liked_df, unliked_df)

In [241]:
cl.run()

In [243]:
cl.interests_df.rdd.map(lambda x : x.asDict(True)).take(1)[0]

{'user_uid': 'ML0ZSX',
 'reel_uid': '881e4c8b-99af-4156-a325-a2068f5bf873',
 'timestamp': 1000000002,
 'liked': True,
 'unliked': False,
 'watched_ratio': 0.72,
 'is_interested': True,
 'latest_timestamp': 1000000002}

In [230]:
cl.watched_df.show(truncate=False)

+-----------+------------+------------+---------------+-------------+------------------------------------+-------------------+------------+----------------+------------+----------+--------+-------------+
|app_version|category_uid|event_name  |origin_platform|reel_duration|reel_uid                            |reel_watch_duration|replay_count|server_timestamp|streamer_uid|timestamp |user_uid|watched_ratio|
+-----------+------------+------------+---------------+-------------+------------------------------------+-------------------+------------+----------------+------------+----------+--------+-------------+
|5.7.9      |bgmi        |reel_watched|ios            |100          |881e4c8b-99af-4156-a325-a2068f5bf874|72                 |2           |1000000002      |GV24MY5     |1000000002|ML0ZSX  |0.72         |
|5.7.9      |bgmi        |reel_watched|ios            |100          |881e4c8b-99af-4156-a325-a2068f5bf874|36                 |1           |1000000001      |GV24MY5     |1000000001|ML0Z

In [114]:
interests.interests_df.withColumn("max_timestamp", F.max(F.col("timestamp").over(Window.partitionBy(F.col("user_uid"), F.col("reel_uid")))).show(truncate=False)

SyntaxError: incomplete input (821100100.py, line 1)

In [85]:
interests.interests_df.where("user_uid = 'ML0ZSX' and reel_uid = 'ea783208-7a04-4740-b6bb-0a79e16967a9'").show(truncate=False)

AnalysisException: [AMBIGUOUS_REFERENCE] Reference `user_uid` is ambiguous, could be: [`user_uid`, `user_uid`, `user_uid`].; line 1 pos 0

In [37]:
liked_df.where("user_uid = 'ML0ZSX' and reel_uid = 'ea783208-7a04-4740-b6bb-0a79e16967a9'").show(truncate=False)

+-----------+------------+----------+---------+---------------+-------------+------------------------------------+----------------+------------+----------+--------+
|app_version|category_uid|event_name|like_time|origin_platform|reel_duration|reel_uid                            |server_timestamp|streamer_uid|timestamp |user_uid|
+-----------+------------+----------+---------+---------------+-------------+------------------------------------+----------------+------------+----------+--------+
|5.7.9      |freefire    |reel_liked|41       |ios            |101          |ea783208-7a04-4740-b6bb-0a79e16967a9|1682907843      |OHK73W2     |1682907833|ML0ZSX  |
|5.7.9      |freefire    |reel_liked|94       |ios            |101          |ea783208-7a04-4740-b6bb-0a79e16967a9|1682921251      |OHK73W2     |1682921242|ML0ZSX  |
|5.7.9      |freefire    |reel_liked|33       |ios            |101          |ea783208-7a04-4740-b6bb-0a79e16967a9|1682921444      |OHK73W2     |1682921429|ML0ZSX  |
|5.7.9    

In [131]:
ws.watched_df.where("watched_ratio < 0.6").show(truncate=False)

+-----------+------------+------------+---------------+-------------+------------------------------------+-------------------+------------+----------------+------------+----------+--------+-------------------+
|app_version|category_uid|event_name  |origin_platform|reel_duration|reel_uid                            |reel_watch_duration|replay_count|server_timestamp|streamer_uid|timestamp |user_uid|watched_ratio      |
+-----------+------------+------------+---------------+-------------+------------------------------------+-------------------+------------+----------------+------------+----------+--------+-------------------+
|2.3.0      |freefire    |reel_watched|android        |101          |ea783208-7a04-4740-b6bb-0a79e16967a9|33                 |0           |1682938906      |OHK73W2     |1682938905|ABNRHP  |0.32673267326732675|
|5.7.9      |bgmi        |reel_watched|ios            |61           |881e4c8b-99af-4156-a325-a2068f5bf873|33                 |0           |1682938907      |GV24

In [41]:
ws.watched_df.where("user_uid = 'ML0ZSX' and reel_uid = 'ea783208-7a04-4740-b6bb-0a79e16967a9'").show(truncate=False)

+-----------+------------+------------+---------------+-------------+------------------------------------+-------------------+------------+----------------+------------+----------+--------+-------------------+
|app_version|category_uid|event_name  |origin_platform|reel_duration|reel_uid                            |reel_watch_duration|replay_count|server_timestamp|streamer_uid|timestamp |user_uid|watched_ratio      |
+-----------+------------+------------+---------------+-------------+------------------------------------+-------------------+------------+----------------+------------+----------+--------+-------------------+
|5.7.9      |freefire    |reel_watched|ios            |101          |ea783208-7a04-4740-b6bb-0a79e16967a9|361                |3           |1682938915      |OHK73W2     |1682938894|ML0ZSX  |3.5742574257425743 |
|5.7.9      |freefire    |reel_watched|ios            |101          |ea783208-7a04-4740-b6bb-0a79e16967a9|225                |2           |1682939642      |OHK7

In [28]:
ws.watched_df.show()

+-----------+------------+------------+---------------+-------------+--------------------+-------------------+------------+----------------+------------+----------+--------+-------------------+
|app_version|category_uid|  event_name|origin_platform|reel_duration|            reel_uid|reel_watch_duration|replay_count|server_timestamp|streamer_uid| timestamp|user_uid|      watched_ratio|
+-----------+------------+------------+---------------+-------------+--------------------+-------------------+------------+----------------+------------+----------+--------+-------------------+
|      1.7.8|    freefire|reel_watched|        android|          120|578cf68e-3b2e-492...|                 17|           1|      1682942397|     9UWROC5|1682942384|  S5128I| 1.1416666666666666|
|      1.7.8|        bgmi|reel_watched|        android|           61|881e4c8b-99af-415...|                  0|           1|      1682938874|     GV24MY5|1682938853|  S5128I|                1.0|
|      1.7.8|        bgmi|reel

In [56]:
liked_df.groupBy("user_uid", "reel_uid").agg(F.max("timestamp")).show(truncate=False)

+--------+------------------------------------+--------------+
|user_uid|reel_uid                            |max(timestamp)|
+--------+------------------------------------+--------------+
|ML0ZSX  |5033a27d-ed9e-4d9a-8bf2-6eb0b39dd90c|1682977842    |
|S5128I  |578cf68e-3b2e-492c-9d55-64209729e759|1682975139    |
|S5128I  |ea783208-7a04-4740-b6bb-0a79e16967a9|1682984388    |
|9UASVK  |b02b9538-5627-47cf-a5c8-b8dae3298304|1682977813    |
|2S89IY  |578cf68e-3b2e-492c-9d55-64209729e759|1682985385    |
|9UASVK  |881e4c8b-99af-4156-a325-a2068f5bf873|1682984855    |
|ABNRHP  |5033a27d-ed9e-4d9a-8bf2-6eb0b39dd90c|1682975511    |
|2S89IY  |ea783208-7a04-4740-b6bb-0a79e16967a9|1682984646    |
|2S89IY  |5033a27d-ed9e-4d9a-8bf2-6eb0b39dd90c|1682970898    |
|ABNRHP  |b02b9538-5627-47cf-a5c8-b8dae3298304|1682972994    |
|ABNRHP  |ea783208-7a04-4740-b6bb-0a79e16967a9|1682983349    |
|ML0ZSX  |578cf68e-3b2e-492c-9d55-64209729e759|1682971066    |
|S5128I  |881e4c8b-99af-4156-a325-a2068f5bf873|16829858

In [63]:
ls.liked_events.show()

+--------+--------------------+---------------+
|user_uid|            reel_uid|liked_timestamp|
+--------+--------------------+---------------+
|  ML0ZSX|5033a27d-ed9e-4d9...|     1682977842|
|  S5128I|578cf68e-3b2e-492...|     1682975139|
|  S5128I|ea783208-7a04-474...|     1682984388|
|  9UASVK|b02b9538-5627-47c...|     1682977813|
|  2S89IY|578cf68e-3b2e-492...|     1682985385|
|  9UASVK|881e4c8b-99af-415...|     1682984855|
|  ABNRHP|5033a27d-ed9e-4d9...|     1682975511|
|  2S89IY|ea783208-7a04-474...|     1682984646|
|  2S89IY|5033a27d-ed9e-4d9...|     1682970898|
|  ABNRHP|b02b9538-5627-47c...|     1682972994|
|  ABNRHP|ea783208-7a04-474...|     1682983349|
|  ML0ZSX|578cf68e-3b2e-492...|     1682971066|
|  S5128I|881e4c8b-99af-415...|     1682985853|
|  ML0ZSX|881e4c8b-99af-415...|     1683049057|
|  S5128I|5033a27d-ed9e-4d9...|     1682974714|
|  2S89IY|8f5f09ae-31fe-47a...|     1682985154|
|  S5128I|8f5f09ae-31fe-47a...|     1682980508|
|  ABNRHP|578cf68e-3b2e-492...|     1682

In [151]:
ws.watched_df.where("reel_uid = '881e4c8b-99af-4156-a325-a2068f5bf873' and app_version >= '2.3.0' and user_uid = 'ML0ZSX'").show(truncate=False)

+-----------+------------+------------+---------------+-------------+------------------------------------+-------------------+------------+----------------+------------+----------+--------+-------------------+
|app_version|category_uid|event_name  |origin_platform|reel_duration|reel_uid                            |reel_watch_duration|replay_count|server_timestamp|streamer_uid|timestamp |user_uid|watched_ratio      |
+-----------+------------+------------+---------------+-------------+------------------------------------+-------------------+------------+----------------+------------+----------+--------+-------------------+
|5.7.9      |bgmi        |reel_watched|ios            |61           |881e4c8b-99af-4156-a325-a2068f5bf873|33                 |0           |1682938907      |GV24MY5     |1682938887|ML0ZSX  |0.5409836065573771 |
|5.7.9      |bgmi        |reel_watched|ios            |61           |881e4c8b-99af-4156-a325-a2068f5bf873|9                  |0           |1682938954      |GV24

In [97]:
ls.liked_df.where("reel_uid = '881e4c8b-99af-4156-a325-a2068f5bf873' and app_version < '2.3.0'").show(truncate=False)

+-----------+------------+----------+---------+---------------+-------------+------------------------------------+----------------+------------+----------+--------+
|app_version|category_uid|event_name|like_time|origin_platform|reel_duration|reel_uid                            |server_timestamp|streamer_uid|timestamp |user_uid|
+-----------+------------+----------+---------+---------------+-------------+------------------------------------+----------------+------------+----------+--------+
|1.7.8      |bgmi        |reel_liked|10       |android        |61           |881e4c8b-99af-4156-a325-a2068f5bf873|1682953122      |GV24MY5     |1682953113|S5128I  |
|1.7.8      |bgmi        |reel_liked|57       |android        |61           |881e4c8b-99af-4156-a325-a2068f5bf873|1682953122      |GV24MY5     |1682953120|S5128I  |
|1.7.8      |bgmi        |reel_liked|52       |android        |61           |881e4c8b-99af-4156-a325-a2068f5bf873|1682907508      |GV24MY5     |1682907412|S5128I  |
|1.7.8    

In [157]:
liked_df = spark.createDataFrame(TestCases.test_liked_only_event['reel_liked_data']).show(truncate=False)

+-----------+------------+----------+---------+---------------+-------------+------------------------------------+----------------+------------+----------+--------+
|app_version|category_uid|event_name|like_time|origin_platform|reel_duration|reel_uid                            |server_timestamp|streamer_uid|timestamp |user_uid|
+-----------+------------+----------+---------+---------------+-------------+------------------------------------+----------------+------------+----------+--------+
|5.7.9      |bgmi        |reel_liked|19       |ios            |61           |881e4c8b-99af-4156-a325-a2068f5bf873|1682964102      |GV24MY5     |1682964255|ML0ZSX  |
+-----------+------------+----------+---------+---------------+-------------+------------------------------------+----------------+------------+----------+--------+

