#  Analysis on Singapore Data Set

## Read from Delta table

In [0]:
from pyspark.sql.functions import col

# Load table from mongo_youtube_trends schema
df = spark.table("mongo_youtube_trends.singapore_trending") 
# Show schema to understand structure
df.printSchema()

root
 |-- _id: string (nullable = true)
 |-- _fivetran_synced: timestamp (nullable = true)
 |-- data: string (nullable = true)
 |-- _fivetran_deleted: boolean (nullable = true)



In [0]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, BooleanType, TimestampType
from pyspark.sql.types import *

# Define schema based on DBFS JSON
schema = StructType([
    StructField("contentDetails", StructType([
        StructField("caption", StringType(), True),
        StructField("definition", StringType(), True),
        StructField("dimension", StringType(), True),
        StructField("duration", StringType(), True),
        StructField("licensedContent", BooleanType(), True),
        StructField("projection", StringType(), True),
        StructField("regionRestriction", StructType([
            StructField("allowed", ArrayType(StringType()), True)
        ]), True)
    ]), True),

    StructField("etag", StringType(), True),
    StructField("id", StringType(), True),
    StructField("kind", StringType(), True),

    StructField("snippet", StructType([
        StructField("categoryId", StringType(), True),
        StructField("channelId", StringType(), True),
        StructField("channelTitle", StringType(), True),
        StructField("defaultAudioLanguage", StringType(), True),
        StructField("defaultLanguage", StringType(), True),
        StructField("description", StringType(), True),
        StructField("liveBroadcastContent", StringType(), True),
        StructField("localized", StructType([
            StructField("description", StringType(), True),
            StructField("title", StringType(), True)
        ]), True),
        StructField("publishedAt", StringType(), True),
        StructField("tags", ArrayType(StringType()), True),
        StructField("thumbnails", StructType([
            StructField("default", StructType([
                StructField("height", LongType(), True),
                StructField("url", StringType(), True),
                StructField("width", LongType(), True)
            ]), True),
            StructField("high", StructType([
                StructField("height", LongType(), True),
                StructField("url", StringType(), True),
                StructField("width", LongType(), True)
            ]), True),
            StructField("maxres", StructType([
                StructField("height", LongType(), True),
                StructField("url", StringType(), True),
                StructField("width", LongType(), True)
            ]), True),
            StructField("medium", StructType([
                StructField("height", LongType(), True),
                StructField("url", StringType(), True),
                StructField("width", LongType(), True)
            ]), True),
            StructField("standard", StructType([
                StructField("height", LongType(), True),
                StructField("url", StringType(), True),
                StructField("width", LongType(), True)
            ]), True)
        ]), True),
        StructField("title", StringType(), True)
    ]), True),

    StructField("statistics", StructType([
        StructField("commentCount", StringType(), True),
        StructField("favoriteCount", StringType(), True),
        StructField("likeCount", StringType(), True),
        StructField("viewCount", StringType(), True)
    ]), True),
])



In [0]:
from pyspark.sql.functions import from_json, col

# Parse JSON string into struct
parsed_df = df.withColumn("json", from_json(col("data"), schema))

parsed_df.printSchema()

root
 |-- _id: string (nullable = true)
 |-- _fivetran_synced: timestamp (nullable = true)
 |-- data: string (nullable = true)
 |-- _fivetran_deleted: boolean (nullable = true)
 |-- json: struct (nullable = true)
 |    |-- contentDetails: struct (nullable = true)
 |    |    |-- caption: string (nullable = true)
 |    |    |-- definition: string (nullable = true)
 |    |    |-- dimension: string (nullable = true)
 |    |    |-- duration: string (nullable = true)
 |    |    |-- licensedContent: boolean (nullable = true)
 |    |    |-- projection: string (nullable = true)
 |    |    |-- regionRestriction: struct (nullable = true)
 |    |    |    |-- allowed: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |-- etag: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- kind: string (nullable = true)
 |    |-- snippet: struct (nullable = true)
 |    |    |-- categoryId: string (nullable = true)
 |    |    |-- channelId: string

In [0]:
from pyspark.sql.functions import col

# Select required fields
selected_df = parsed_df.select(
    col("json.id").alias("id"),
    col("json.snippet.publishedAt").alias("publishedAt"),
    col("json.snippet.channelId").alias("channelId"),
    col("json.snippet.channelTitle").alias("channelTitle"),
    col("json.snippet.title").alias("title"),
    col("json.snippet.description").alias("description"),
    col("json.snippet.categoryId").alias("categoryId"),
    col("json.snippet.tags").alias("tags"),
    col("json.snippet.defaultLanguage").alias("defaultLanguage"),
    col("json.snippet.defaultAudioLanguage").alias("defaultAudioLanguage"),
    col("json.statistics.viewCount").alias("viewCount"),
    col("json.statistics.likeCount").alias("likeCount"),
    col("json.statistics.commentCount").alias("commentCount"),
    col("json.contentDetails.duration").alias("duration"),
    col("json.contentDetails.caption").alias("caption")
)

selected_df.show(truncate=False)


+-----------+--------------------+------------------------+-------------------+----------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
selected_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- publishedAt: string (nullable = true)
 |-- channelId: string (nullable = true)
 |-- channelTitle: string (nullable = true)
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- categoryId: string (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- defaultLanguage: string (nullable = true)
 |-- defaultAudioLanguage: string (nullable = true)
 |-- viewCount: string (nullable = true)
 |-- likeCount: string (nullable = true)
 |-- commentCount: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- caption: string (nullable = true)



In [0]:
from pyspark.sql.functions import col, when

casted_df1 = selected_df.withColumn("viewCount", when(col("viewCount") != "", col("viewCount").cast("int")).otherwise(None)) \
                       .withColumn("likeCount", when(col("likeCount") != "", col("likeCount").cast("int")).otherwise(None)) \
                       .withColumn("commentCount", when(col("commentCount") != "", col("commentCount").cast("int")).otherwise(None))




In [0]:
casted_df1.printSchema()

root
 |-- id: string (nullable = true)
 |-- publishedAt: string (nullable = true)
 |-- channelId: string (nullable = true)
 |-- channelTitle: string (nullable = true)
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- categoryId: string (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- defaultLanguage: string (nullable = true)
 |-- defaultAudioLanguage: string (nullable = true)
 |-- viewCount: integer (nullable = true)
 |-- likeCount: integer (nullable = true)
 |-- commentCount: integer (nullable = true)
 |-- duration: string (nullable = true)
 |-- caption: string (nullable = true)



In [0]:
casted_df1.select(
    "viewCount", "likeCount", "commentCount",
    "duration", "publishedAt", "channelTitle",
    "title", "categoryId", "description"
).show(3, truncate=False, vertical=True)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
casted_df1.select("title", "duration").show(10, truncate=False)

+----------------------------------------------------------------------------------------------------+-----------+
|title                                                                                               |duration   |
+----------------------------------------------------------------------------------------------------+-----------+
|We Found Mikey TALKING To a GHOST!                                                                  |PT19M32S   |
|The Conjuring: Last Rites | Official Trailer                                                        |PT2M26S    |
|Captain Prabhakaran New 4K Trailer | Vijayakanth | RK Selvamani | Ilaiyaraja | Sparrow Cinemas      |PT2M39S    |
|Pottala Muttaye | Thalaivan Thalaivii | Vijay Sethupathi,Nithya Menen |Pandiraaj |Santhosh Narayanan|PT4M17S    |
|Janaab-e-Aali Song Teaser | WAR 2 | Hrithik Roshan, NTR | Pritam, Sachet Tandon, Saaj Bhatt, Amitabh|PT35S      |
|BLAST Premier Bounty S2, Round 2 Day 1 - NAVI vs Astralis, FaZe vs VP, Spirit v

# Convert duration (ISO 8601 format) to Total Seconds

In [0]:
from pyspark.sql.functions import regexp_extract, col, when

# Regex extract hours, minutes, seconds from ISO 8601 duration format
casted_df1 = casted_df1 \
    .withColumn("hours", when(regexp_extract(col("duration"), "PT(\\d+)H", 1) != "", regexp_extract(col("duration"), "PT(\\d+)H", 1).cast("int")).otherwise(0)) \
    .withColumn("minutes", when(regexp_extract(col("duration"), "(\\d+)M", 1) != "", regexp_extract(col("duration"), "(\\d+)M", 1).cast("int")).otherwise(0)) \
    .withColumn("seconds", when(regexp_extract(col("duration"), "(\\d+)S", 1) != "", regexp_extract(col("duration"), "(\\d+)S", 1).cast("int")).otherwise(0))

# Total seconds calculate karo
casted_df1 = casted_df1.withColumn(
    "duration",
    (col("hours")*3600 + col("minutes")*60 + col("seconds"))
)

# Temporary columns drop karo
casted_df1 = casted_df1.drop("hours", "minutes", "seconds")


In [0]:
casted_df1.printSchema()

root
 |-- id: string (nullable = true)
 |-- publishedAt: string (nullable = true)
 |-- channelId: string (nullable = true)
 |-- channelTitle: string (nullable = true)
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- categoryId: string (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- defaultLanguage: string (nullable = true)
 |-- defaultAudioLanguage: string (nullable = true)
 |-- viewCount: integer (nullable = true)
 |-- likeCount: integer (nullable = true)
 |-- commentCount: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- caption: string (nullable = true)



In [0]:
casted_df1.select("title", "duration").show(20, truncate=False)

+----------------------------------------------------------------------------------------------------+--------+
|title                                                                                               |duration|
+----------------------------------------------------------------------------------------------------+--------+
|We Found Mikey TALKING To a GHOST!                                                                  |1172    |
|The Conjuring: Last Rites | Official Trailer                                                        |146     |
|Captain Prabhakaran New 4K Trailer | Vijayakanth | RK Selvamani | Ilaiyaraja | Sparrow Cinemas      |159     |
|Pottala Muttaye | Thalaivan Thalaivii | Vijay Sethupathi,Nithya Menen |Pandiraaj |Santhosh Narayanan|257     |
|Janaab-e-Aali Song Teaser | WAR 2 | Hrithik Roshan, NTR | Pritam, Sachet Tandon, Saaj Bhatt, Amitabh|35      |
|BLAST Premier Bounty S2, Round 2 Day 1 - NAVI vs Astralis, FaZe vs VP, Spirit vs G2, FURIA vs Liquid|42

# in our data distinct categoryid available

In [0]:
parsed_df.select("json.snippet.categoryId").distinct().count()

6

In [0]:
parsed_df.select("json.snippet.categoryId").distinct().show(truncate=False)

+----------+
|categoryId|
+----------+
|22        |
|10        |
|24        |
|1         |
|17        |
|20        |
+----------+



# Mapping distinct Categoryid to Category Name

In [0]:
from pyspark.sql.functions import when, col

casted_df1 = casted_df1.withColumn(
    "categoryName",
    when(col("categoryId") == "1", "Film & Animation")
    .when(col("categoryId") == "10", "Music")
    .when(col("categoryId") == "17", "Sports")
    .when(col("categoryId") == "20", "Gaming")
    .when(col("categoryId") == "22", "People & Blogs")
    .when(col("categoryId") == "24", "Entertainment")
    .otherwise("Unknown")
)


In [0]:
casted_df1.groupBy("categoryName").count().show(truncate=False)

+----------------+-----+
|categoryName    |count|
+----------------+-----+
|People & Blogs  |4    |
|Film & Animation|6    |
|Gaming          |15   |
|Sports          |1    |
|Entertainment   |7    |
|Music           |17   |
+----------------+-----+



# convert publishAt col to time stamp

In [0]:
from pyspark.sql.functions import col, to_timestamp, to_date, year, month

# Step 1: Convert to timestamp
casted_df1 = casted_df1.withColumn("publishedAt_ts", to_timestamp(col("publishedAt"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))

# Step 2: Extract only the date part
casted_df1 = casted_df1.withColumn("publishedDate", to_date(col("publishedAt_ts")))

# Step 3: Extract year and month from publishedDate
casted_df1 = casted_df1.withColumn("year", year("publishedDate")) \
                     .withColumn("month", month("publishedDate"))

# Step 4: Drop original publishedAt and timestamp
casted_df1 = casted_df1.drop("publishedAt", "publishedAt_ts")


In [0]:
casted_df1.select(
    "viewCount", "likeCount", "commentCount",
    "duration", "publishedDate", "channelTitle",
    "title", "categoryId", "description", "categoryName"
).show(3, truncate=False, vertical=True)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
from pyspark.sql.functions import col, count, when

# Check null values for all relevant columns
casted_df1.select([
    count(when(col("viewCount").isNull(), 1)).alias("null_viewCount"),
    count(when(col("likeCount").isNull(), 1)).alias("null_likeCount"),
    count(when(col("commentCount").isNull(), 1)).alias("null_commentCount"),
    count(when(col("duration").isNull(), 1)).alias("null_duration"),
    count(when(col("caption").isNull(), 1)).alias("null_caption"),
    count(when(col("categoryName").isNull(), 1)).alias("null_categoryName"),
    count(when(col("publishedDate").isNull(), 1)).alias("null_publishedDate"),
    count(when(col("year").isNull(), 1)).alias("null_year"),
    count(when(col("month").isNull(), 1)).alias("null_month")
]).show()

+--------------+--------------+-----------------+-------------+------------+-----------------+------------------+---------+----------+
|null_viewCount|null_likeCount|null_commentCount|null_duration|null_caption|null_categoryName|null_publishedDate|null_year|null_month|
+--------------+--------------+-----------------+-------------+------------+-----------------+------------------+---------+----------+
|             0|             0|                0|            0|           0|                0|                 0|        0|         0|
+--------------+--------------+-----------------+-------------+------------+-----------------+------------------+---------+----------+



# Engegment Analysis

# Top Channels by Total Views

In [0]:
top_channels = casted_df1.groupBy("channelTitle") \
    .sum("viewCount") \
    .withColumnRenamed("sum(viewCount)", "totalViews") \
    .orderBy(col("totalViews").desc())

top_channels.show(20, truncate=False)


+-------------------+----------+
|channelTitle       |totalViews|
+-------------------+----------+
|Excel Movies       |25329114  |
|Sun TV             |21923784  |
|Avatar             |20183637  |
|Warner Bros.       |15850611  |
|Sony Music South   |13191803  |
|Brawl Stars        |12932859  |
|Junglee Music Tamil|11421274  |
|YRF                |9209754   |
|BLACKPINK          |8496334   |
|Universal Pictures |8435444   |
|A24                |7633998   |
|ChappellRoanVEVO   |7495005   |
|FORMULA 1          |5712448   |
|Think Music India  |3498143   |
|Ed Sheeran         |3428391   |
|MrBeast Gaming     |2807057   |
|CaylusBlox         |2492431   |
|KreekCraft         |2473405   |
|KATSEYE            |2235365   |
|Fortnite           |2149617   |
+-------------------+----------+
only showing top 20 rows


# Top channels by total likes

In [0]:
top_liked_channels = casted_df1.groupBy("channelTitle") \
    .sum("likeCount") \
    .withColumnRenamed("sum(likeCount)", "totalLikes") \
    .orderBy(col("totalLikes").desc())
top_liked_channels.show(10, truncate=False)

+-------------------+----------+
|channelTitle       |totalLikes|
+-------------------+----------+
|Sony Music South   |947792    |
|Sun TV             |790111    |
|Avatar             |523179    |
|ChappellRoanVEVO   |436476    |
|BLACKPINK          |428653    |
|Junglee Music Tamil|190641    |
|Brawl Stars        |169518    |
|YRF                |155697    |
|KATSEYE            |139553    |
|Warner Bros.       |139418    |
+-------------------+----------+
only showing top 10 rows


# Most Liked Videos

In [0]:
# Here we can find top 5 most liked videos along with its title and chanelTitle 

In [0]:
most_liked = casted_df1.orderBy(col("likeCount").desc())
most_liked.select("title", "channelTitle", "likeCount").show(5, truncate=False)

+------------------------------------------------------------------------------------------+----------------+---------+
|title                                                                                     |channelTitle    |likeCount|
+------------------------------------------------------------------------------------------+----------------+---------+
|They Call Him OG - Firestorm Lyric Video | Pawan Kalyan | Sujeeth | Thaman S | DVV Danayya|Sony Music South|947792   |
|Coolie - Official Trailer | Superstar Rajinikanth | Sun Pictures | Lokesh | Anirudh       |Sun TV          |790111   |
|Avatar: Fire and Ash | Official Trailer                                                   |Avatar          |523179   |
|Chappell Roan - The Subway (Official Music Video)                                         |ChappellRoanVEVO|436476   |
|BLACKPINK - '뛰어(JUMP)' Live at WORLD TOUR [DEADLINE] IN GOYANG                          |BLACKPINK       |428653   |
+-----------------------------------------

# Top videos by comment count

In [0]:
most_commented = casted_df1.orderBy(col("commentCount").desc())
most_commented.select("title", "channelTitle", "commentCount").show(5, truncate=False)


+------------------------------------------------------------------------------------------+----------------+------------+
|title                                                                                     |channelTitle    |commentCount|
+------------------------------------------------------------------------------------------+----------------+------------+
|They Call Him OG - Firestorm Lyric Video | Pawan Kalyan | Sujeeth | Thaman S | DVV Danayya|Sony Music South|39250       |
|Coolie - Official Trailer | Superstar Rajinikanth | Sun Pictures | Lokesh | Anirudh       |Sun TV          |32939       |
|Avatar: Fire and Ash | Official Trailer                                                   |Avatar          |29236       |
|Chappell Roan - The Subway (Official Music Video)                                         |ChappellRoanVEVO|17392       |
|BLACKPINK - '뛰어(JUMP)' Live at WORLD TOUR [DEADLINE] IN GOYANG                          |BLACKPINK       |11235       |
+-----------------

# Like to View ratio (engagement rate)

In [0]:
from pyspark.sql.functions import expr

engagement_df = casted_df1.withColumn("likeViewRatio", expr("likeCount / viewCount"))
engagement_df.orderBy(col("likeViewRatio").desc()).select("title", "channelTitle", "likeViewRatio").show(10, truncate=False)


+------------------------------------------------------------------------------------------+----------------+--------------------+
|title                                                                                     |channelTitle    |likeViewRatio       |
+------------------------------------------------------------------------------------------+----------------+--------------------+
|우기 (YUQI) 'What It Sounds Like / KPop Demon Hunters OST' (Cover)                        |i-dle (아이들)  |0.1445722133637496  |
|DIOR大穎 - Proud Of Myself - (Official MV）                                               |DIOR大穎        |0.11276653571516813 |
|IVE 아이브 ‘Secret, Cupid’ Trailer                                                        |IVE             |0.09776322543722918 |
|[Official Trailer] REVAMP THE UNDEAD STORY                                                |GMMTV OFFICIAL​​|0.08948919865180112 |
|They Call Him OG - Firestorm Lyric Video | Pawan Kalyan | Sujeeth | Thaman S | DVV Danayya|Sony

# Analysis based on categories

# Top Categories by Likes

In [0]:
from pyspark.sql.functions import sum

casted_df1.groupBy("categoryName","channelTitle") \
    .agg(sum("likeCount").alias("totalLikes")) \
    .orderBy("totalLikes", ascending=False) \
    .show()

+----------------+--------------------+----------+
|    categoryName|        channelTitle|totalLikes|
+----------------+--------------------+----------+
|           Music|    Sony Music South|    947792|
|   Entertainment|              Sun TV|    790111|
|   Entertainment|              Avatar|    523179|
|           Music|    ChappellRoanVEVO|    436476|
|           Music|           BLACKPINK|    428653|
|           Music| Junglee Music Tamil|    190641|
|          Gaming|         Brawl Stars|    169518|
|           Music|                 YRF|    155697|
|           Music|             KATSEYE|    139553|
|Film & Animation|        Warner Bros.|    139418|
|           Music|          Ed Sheeran|    130486|
|Film & Animation|        Excel Movies|    121167|
|          Gaming|      MrBeast Gaming|    120301|
|          Sports|           FORMULA 1|    105317|
|  People & Blogs|            Fortnite|     94253|
|          Gaming|   Honkai: Star Rail|     84855|
|   Entertainment|             

# Top Categories by Comments

In [0]:
casted_df1.groupBy("categoryName","channelTitle") \
    .agg(sum("commentCount").alias("totalComments")) \
    .orderBy("totalComments", ascending=False) \
    .show()

+----------------+-------------------+-------------+
|    categoryName|       channelTitle|totalComments|
+----------------+-------------------+-------------+
|           Music|   Sony Music South|        39250|
|   Entertainment|             Sun TV|        32939|
|   Entertainment|             Avatar|        29236|
|           Music|   ChappellRoanVEVO|        17392|
|           Music|          BLACKPINK|        11235|
|Film & Animation|       Excel Movies|         9088|
|           Music|                YRF|         8095|
|           Music|         Ed Sheeran|         7873|
|  People & Blogs|           Fortnite|         7605|
|           Music|Junglee Music Tamil|         6129|
|          Gaming|        Brawl Stars|         5856|
|           Music|            KATSEYE|         5366|
|          Sports|          FORMULA 1|         4694|
|   Entertainment|                IVE|         4633|
|          Gaming|            rekrap2|         4582|
|          Gaming|  Honkai: Star Rail|        

# Average Engagement per Category

In [0]:
# create engagment column on adding likecount and commentcount

In [0]:
casted_df1 = casted_df1.withColumn(
    "engagement",
    col("likeCount") + col("commentCount")
)

In [0]:
from pyspark.sql.functions import avg

casted_df1.groupBy("categoryName") \
    .agg(avg("engagement").alias("avgEngagement")) \
    .orderBy("avgEngagement", ascending=False) \
    .show(truncate=False)

+----------------+------------------+
|categoryName    |avgEngagement     |
+----------------+------------------+
|Entertainment   |232500.0          |
|Music           |164803.11764705883|
|Sports          |110011.0          |
|Film & Animation|82786.66666666667 |
|People & Blogs  |52808.25          |
|Gaming          |43187.13333333333 |
+----------------+------------------+



# Average engagement by video duration

In [0]:
# it says that people are engage or not  in short,long or medium videos

In [0]:
# Step 1: Categorize durations
from pyspark.sql.functions import when

casted_df1 = casted_df1.withColumn(
    "durationCategory",
    when(col("duration") <= 300, "Short")  # ≤ 5 mins
    .when((col("duration") > 300) & (col("duration") <= 1200), "Medium")  # 5–20 mins
    .otherwise("Long")  # > 20 mins
)

# Step 2: Avg engagement per duration category
casted_df1.groupBy("durationCategory") \
    .agg(avg("engagement").alias("avgEngagement")) \
    .orderBy("avgEngagement", ascending=False) \
    .show()

+----------------+------------------+
|durationCategory|     avgEngagement|
+----------------+------------------+
|           Short|164712.93548387097|
|          Medium|           58205.4|
|            Long|22974.333333333332|
+----------------+------------------+



# Top performing categories within duration groups

In [0]:
casted_df1.groupBy("durationCategory", "categoryName") \
    .agg(avg("engagement").alias("avgEngagement")) \
    .orderBy("durationCategory", "avgEngagement", ascending=False) \
    .show()

+----------------+----------------+------------------+
|durationCategory|    categoryName|     avgEngagement|
+----------------+----------------+------------------+
|           Short|   Entertainment|          232500.0|
|           Short|           Music| 186831.2857142857|
|           Short|          Gaming|106542.66666666667|
|           Short|Film & Animation|           87688.6|
|           Short|  People & Blogs|           52446.0|
|          Medium|          Sports|          110011.0|
|          Medium|           Music|           62005.0|
|          Medium|Film & Animation|           58277.0|
|          Medium|          Gaming|           45550.2|
|            Long|  People & Blogs|           53170.5|
|            Long|          Gaming|14346.857142857143|
+----------------+----------------+------------------+



# Total Watch Time per Category (durationCategory)

In [0]:
casted_df1.groupBy("durationCategory") \
    .agg(sum("duration").alias("totalWatchTime_seconds")) \
    .orderBy("totalWatchTime_seconds", ascending=False) \
    .show()

+----------------+----------------------+
|durationCategory|totalWatchTime_seconds|
+----------------+----------------------+
|            Long|                139004|
|          Medium|                  6517|
|           Short|                  5274|
+----------------+----------------------+



# Total Watch Time per Category and Duration Bucket

In [0]:
from pyspark.sql.functions import sum

casted_df1.groupBy("categoryName", "durationCategory") \
    .agg(sum("duration").alias("totalWatchTime_seconds")) \
    .orderBy("categoryName", "durationCategory") \
    .show(truncate=False)

+----------------+----------------+----------------------+
|categoryName    |durationCategory|totalWatchTime_seconds|
+----------------+----------------+----------------------+
|Entertainment   |Short           |963                   |
|Film & Animation|Medium          |409                   |
|Film & Animation|Short           |703                   |
|Gaming          |Long            |130636                |
|Gaming          |Medium          |4690                  |
|Gaming          |Short           |416                   |
|Music           |Medium          |922                   |
|Music           |Short           |2837                  |
|People & Blogs  |Long            |8368                  |
|People & Blogs  |Short           |355                   |
|Sports          |Medium          |496                   |
+----------------+----------------+----------------------+



In [0]:
# i have attached the chanelTitle also , from here we can get idea 
# 1) Which channel is dominating which category .
# 2) Which channel do people watch for longer time?
# 3) Which channel's videos, short, medium or long, are being watched more?

In [0]:
from pyspark.sql.functions import sum

casted_df1.groupBy("channelTitle", "categoryName", "durationCategory") \
    .agg(sum("duration").alias("totalWatchTime_seconds")) \
    .orderBy("categoryName", "durationCategory", "totalWatchTime_seconds", ascending=False) \
    .show(truncate=False)

+-------------------+--------------+----------------+----------------------+
|channelTitle       |categoryName  |durationCategory|totalWatchTime_seconds|
+-------------------+--------------+----------------+----------------------+
|FORMULA 1          |Sports        |Medium          |496                   |
|Bert & Lulu        |People & Blogs|Short           |275                   |
|Fortnite           |People & Blogs|Short           |80                    |
|MoreSidemen        |People & Blogs|Long            |5564                  |
|NDL                |People & Blogs|Long            |2804                  |
|Ed Sheeran         |Music         |Short           |277                   |
|ChappellRoanVEVO   |Music         |Short           |272                   |
|DIOR大穎           |Music         |Short           |258                   |
|Think Music India  |Music         |Short           |257                   |
|Sony Music South   |Music         |Short           |246                   |
|

# Top 5 Longest Videos with High Engagement

In [0]:
casted_df1.orderBy(col("duration").desc(), col("engagement").desc()).select("title", "duration", "engagement").show(5, truncate=False)


+----------------------------------------------------------------------------------------------------+--------+----------+
|title                                                                                               |duration|engagement|
+----------------------------------------------------------------------------------------------------+--------+----------+
|BLAST Premier Bounty S2, Round 2 Day 1 - NAVI vs Astralis, FaZe vs VP, Spirit vs G2, FURIA vs Liquid|42899   |12231     |
|DRX vs DNF - GEN vs T1 | 2025 LCK                                                                   |23160   |8301      |
|TLN vs PRX - VCT Pacific - Stage 2 -  Group Stage - Day 13                                          |20865   |4153      |
|Brawl Stars Championship 2025 - August Monthly Finals - East Asia                                   |20438   |5101      |
|grow a garden admin abuse (w/flamingo)                                                              |19510   |43303     |
+---------------

In [0]:
casted_df1.write.mode("overwrite").saveAsTable("processed_data.singapore_trending_processed")