## Data prep

In [None]:
from spark_session_manager import SparkSessionManager

spark = SparkSessionManager.get_spark_session()

In [5]:
# schema for csv file

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType

schema = StructType([
    StructField("video_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("publishedAt", StringType(), True),
    StructField("channelId", StringType(), True),
    StructField("channelTitle", StringType(), True),
    StructField("categoryId", StringType(), True),
    StructField("trending_date", StringType(), True),
    StructField("tags", StringType(), True),
    StructField("view_count", IntegerType(), True),
    StructField("likes", IntegerType(), True),
    StructField("dislikes", IntegerType(), True),
    StructField("comment_count", IntegerType(), True),
    StructField("thumbnail_link", StringType(), True),
    StructField("comments_disabled", BooleanType(), True),
    StructField("ratings_disabled", BooleanType(), True),
    StructField("description", StringType(), True)
])

In [6]:
file_path = 'data/US_youtube_trending_data.csv'  # Adjust the path accordingly

# Load your dataset into a Spark DataFrame
df = spark.read.csv(file_path, header=True, schema=schema, escape='"', quote='"', multiLine=True)

df.show()

                                                                                

+-----------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+----------+------+--------+-------------+--------------------+-----------------+----------------+--------------------+
|   video_id|               title|         publishedAt|           channelId|        channelTitle|categoryId|       trending_date|                tags|view_count| likes|dislikes|comment_count|      thumbnail_link|comments_disabled|ratings_disabled|         description|
+-----------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+----------+------+--------+-------------+--------------------+-----------------+----------------+--------------------+
|3C66w5Z0ixs|I ASKED HER TO BE...|2020-08-11T19:20:14Z|UCvtRTOMP2TqYqu51...|            Brawadis|        22|2020-08-12T00:00:00Z|brawadis|prank|ba...|   1514614|156908|    5855|        35313|ht

In [7]:
# Print the schema of the DataFrame
df.printSchema()

# Show the first few rows of the DataFrame
df.show(5)


root
 |-- video_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- publishedAt: string (nullable = true)
 |-- channelId: string (nullable = true)
 |-- channelTitle: string (nullable = true)
 |-- categoryId: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- view_count: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: boolean (nullable = true)
 |-- ratings_disabled: boolean (nullable = true)
 |-- description: string (nullable = true)

+-----------+--------------------+--------------------+--------------------+-------------+----------+--------------------+--------------------+----------+------+--------+-------------+--------------------+-----------------+----------------+--------------------+
|   video_id|               title|         publi

### Data Type Conversion

In [8]:
from pyspark.sql.functions import col

# List of columns to convert from string to integer
columns_to_convert = ['view_count', 'likes', 'dislikes', 'comment_count']

# Convert columns
for column in columns_to_convert:
    df = df.withColumn(column, col(column).cast('int'))

#### Date Formatting

In [9]:
# from pyspark.sql.functions import to_date

# # Convert date columns to date type
# df = df.withColumn("publishedAt", to_date(col("publishedAt"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
# df = df.withColumn("trending_date", to_date(col("trending_date"), "yyyy-MM-dd"))

# # Show the first few rows of the DataFrame
# df.show(5)

#### Handling Missing Values

In [10]:
# Drop rows with any missing values
df_cleaned = df.na.drop()

# Or fill missing values with a default value
df_filled = df.na.fill({"likes": 0, "dislikes": 0, "comment_count": 0})  # Example for numeric columns


In [None]:
from chart_defaults import *

## Engagement

### Engagement metrics

a lot of values in column dislikes are '0', thus eng. metrics will not use it

- view_to_like_ratio = view_count / likes
- like_to_comment_ratio = likes / comment_count
- engagement_rate = ((likes + comment_count) / view_count) * 100
- comments_per_view = comment_count / view_count

In [None]:
# introduce engagement metrics

from pyspark.sql import functions as F


#### introduce eng. metrics

In [None]:
df = df.withColumn("view_to_like_ratio", F.col("view_count") / F.col("likes")) \
    .withColumn("like_to_comment_ratio", F.col("likes") / F.col("comment_count")) \
    .withColumn("engagement_rate", ((F.col("likes") + F.col("comment_count")) / F.col("view_count")) * 100) \
    .withColumn("comments_per_view", F.col("comment_count") / F.col("view_count")) \
    .withColumn("average_views_per_day", F.col("view_count") / F.datediff(F.col("trending_date"), F.col("publishedAt")))


#### Calculate Engagement Metrics by Category

In [None]:
# calculate avg, median, min, max, std for each metric for each category

eng_metrics_by_category = df.groupBy("categoryTitle").agg(
    F.avg("view_to_like_ratio").alias("avg_view_to_like_ratio"),
    F.avg("like_to_comment_ratio").alias("avg_like_to_comment_ratio"),
    F.avg("engagement_rate").alias("avg_engagement_rate"),
    F.avg("comments_per_view").alias("avg_comments_per_view"),
    F.avg("average_views_per_day").alias("avg_average_views_per_day"),
    F.stddev("view_to_like_ratio").alias("stddev_view_to_like_ratio"),
    F.stddev("like_to_comment_ratio").alias("stddev_like_to_comment_ratio"),
    F.stddev("engagement_rate").alias("stddev_engagement_rate"),
    F.stddev("comments_per_view").alias("stddev_comments_per_view"),
    F.stddev("average_views_per_day").alias("stddev_average_views_per_day"),
    F.min("view_to_like_ratio").alias("min_view_to_like_ratio"),
    F.min("like_to_comment_ratio").alias("min_like_to_comment_ratio"),
    F.min("engagement_rate").alias("min_engagement_rate"),
    F.min("comments_per_view").alias("min_comments_per_view"),
    F.min("average_views_per_day").alias("min_average_views_per_day"),
    F.max("view_to_like_ratio").alias("max_view_to_like_ratio"),
    F.max("like_to_comment_ratio").alias("max_like_to_comment_ratio"),
    F.max("engagement_rate").alias("max_engagement_rate"),
    F.max("comments_per_view").alias("max_comments_per_view"),
    F.max("average_views_per_day").alias("max_average_views_per_day"),
    F.expr("percentile_approx(view_to_like_ratio, 0.5)").alias("median_view_to_like_ratio"),
    F.expr("percentile_approx(like_to_comment_ratio, 0.5)").alias("median_like_to_comment_ratio"),
    F.expr("percentile_approx(engagement_rate, 0.5)").alias("median_engagement_rate"),
    F.expr("percentile_approx(comments_per_view, 0.5)").alias("median_comments_per_view"),
    F.expr("percentile_approx(average_views_per_day, 0.5)").alias("median_average_views_per_day")
)

In [None]:
# show the dataframe
eng_metrics_by_category.show()

                                                                                

+--------------------+----------------------+-------------------------+-------------------+---------------------+-------------------------+-------------------------+----------------------------+----------------------+------------------------+----------------------------+----------------------+-------------------------+-------------------+---------------------+-------------------------+----------------------+-------------------------+-------------------+---------------------+-------------------------+-------------------------+----------------------------+----------------------+------------------------+----------------------------+
|       categoryTitle|avg_view_to_like_ratio|avg_like_to_comment_ratio|avg_engagement_rate|avg_comments_per_view|avg_average_views_per_day|stddev_view_to_like_ratio|stddev_like_to_comment_ratio|stddev_engagement_rate|stddev_comments_per_view|stddev_average_views_per_day|min_view_to_like_ratio|min_like_to_comment_ratio|min_engagement_rate|min_comments_per_view|

#### charts

In [None]:
# graph one by one of the columns on bar chart

# convert the spark dataframe to pandas dataframe
eng_metrics_by_category_pd = eng_metrics_by_category.toPandas()

import plotly.graph_objects as go
import numpy as np


# Assuming eng_metrics_by_category_pd is the pandas DataFrame converted from Spark DataFrame

def plot_bar_chart(df, column_name):
    # Create a bar chart using Plotly, with colors based on the column values
    fig = go.Figure(data=[go.Bar(
        x=df['categoryTitle'],
        y=df[column_name],
        marker=dict(color=df[column_name], colorscale='Viridis')  # Apply color based on column value
    )])
    fig.update_layout(title_text=column_name, xaxis_title='Category Title', yaxis_title=column_name)
    # Hide legend (since it's not needed here)
    fig.update_layout(showlegend=False)

    # vertical text
    fig.update_layout(xaxis_tickangle=-45)

    # add mean line, full width of paper
    fig.add_shape(
        type="line",
        x0=-0.5,
        y0=df[column_name].mean(),
        x1=len(df['categoryTitle']) - 0.5,
        y1=df[column_name].mean(),
        line=dict(
            color="red",
            width=2
        )
    )

    fig.show()

for column in eng_metrics_by_category_pd.columns:
    if column != "categoryTitle":
        plot_bar_chart(eng_metrics_by_category_pd, column)


## Saving Processed Data


In [11]:
# Example: Specifying an absolute path and ensuring the directory exists
import os

output_path = "data/processed/processed_US_youtube_trending_data.parquet"
os.makedirs(os.path.dirname(output_path), exist_ok=True)

df.write.mode('overwrite').parquet(output_path)

                                                                                

24/03/29 18:29:44 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 2984655 ms exceeds timeout 120000 ms
24/03/29 18:29:44 WARN SparkContext: Killing executors is not supported by current scheduler.
24/03/29 18:29:44 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$