In [0]:

source_bronzeFile = "abfss://youtube-bronze@practhirusa.dfs.core.windows.net/trending/year=2025/month=09/day=15/bronzedata_20250914.csv"

In [0]:

Target_silverFile = "abfss://youtube-silver@practhirusa.dfs.core.windows.net/trending/year=2025/month=09/day=15/silverdata_20250914.csv"

In [0]:
sourceCSV_BronzeDF = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", ",") \
    .option("escape", "\"") \
    .option("multiLine", "true") \
    .option("fs.azure.account.auth.type", "SAS") \
    .option("fs.azure.sas.token.provider.type", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider") \
    .option("fs.azure.sas.fixed.token", "sv=2024-11-04&ss=bfqt&srt=sco&sp=rwdlacupyx&se=2025-09-30T00:03:45Z&st=2025-09-08T15:48:45Z&spr=https&sig=cny38M%2FKcd5Up3ev6zuq3sOG06XyG%2BfugTeakKjs%2BlE%3D") \
    .load(source_bronzeFile)

In [0]:
display(sourceCSV_BronzeDF)

In [0]:
#Defining the proper data types
from pyspark.sql.functions import col, to_timestamp

silver_df = (sourceCSV_BronzeDF
    .withColumn("videoId", col("videoId").cast("string"))
    .withColumn("title", col("title").cast("string"))
    .withColumn("views", col("views").cast("bigint"))
    .withColumn("likes", col("likes").cast("bigint"))
    .withColumn("comments", col("comments").cast("bigint"))
    .withColumn("publishedAt", to_timestamp("publishedAt"))
)


In [0]:
#RENAME COLUMNS
silver_df = silver_df.withColumnRenamed("videoId", "VideoID") \
    .withColumnRenamed("title", "Title") \
    .withColumnRenamed("publishedAt", "Published_at") \
    .withColumnRenamed("views", "Views") \
    .withColumnRenamed("likes", "Likes") \
    .withColumnRenamed("comments", "Comments")

In [0]:
#REMOVE DUPLICATES
silver_df = silver_df.dropDuplicates(["videoId"])
display(silver_df)

In [0]:
#Ensure no NULLs in critical fields.
silver_df = silver_df.fillna({
    "views": 0,
    "likes": 0,
    "comments": 0
}).filter(col("videoId").isNotNull())


#SparkSQL

In [0]:
silver_df.createOrReplaceTempView("silver_view")


In [0]:
%sql
SELECT * FROM silver_view




In [0]:
%sql
SELECT 
    VideoID,
    REGEXP_REPLACE(TRIM(Title), '[^a-zA-Z0-9 ]', '') AS Title_Name,
    Views, Likes, Comments, Published_at
FROM silver_view;


In [0]:
%sql
SELECT 
    *,
    YEAR(Published_at) AS year,
    MONTH(Published_at) AS month,
    CASE WHEN Views > 0 THEN Likes/Views ELSE 0 END AS engagementRate
FROM silver_view;

In [0]:
# Read the temp view into a DataFrame
silver_df = spark.sql("SELECT * FROM silver_view")

In [0]:
display(silver_df)

In [0]:
from pyspark.sql.functions import regexp_replace, trim

silver_df = silver_df.withColumn("title", trim(regexp_replace("Title", "[^a-zA-Z0-9 ]", "")))

In [0]:
display(silver_df)

In [0]:
from pyspark.sql.functions import to_date, year, month, dayofmonth, date_format, col

# Format as yyyyMMdd
silver_df = silver_df.withColumn("Published_at", date_format(("Published_at"), "yyyy-MM-dd"))


In [0]:
display(silver_df)

In [0]:
silver_df.write.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", ",") \
    .option("fs.azure.account.auth.type", "SAS") \
    .option("fs.azure.sas.token.provider.type", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider") \
    .option("fs.azure.sas.fixed.token", "sv=2024-11-04&ss=bfqt&srt=sco&sp=rwdlacupyx&se=2025-09-30T00:03:45Z&st=2025-09-08T15:48:45Z&spr=https&sig=cny38M%2FKcd5Up3ev6zuq3sOG06XyG%2BfugTeakKjs%2BlE%3D") \
    .mode("overwrite").save(Target_silverFile)