Import libraries & start Spark

In [None]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Initialize Spark session (Databricks does this automatically, but keeping for local runs)
spark = SparkSession.builder.appName("YouTubeAnalytics").getOrCreate()

Load curated data

In [None]:
 # Load transformed parquet data from S3
df = spark.read.parquet("s3a://my-youtube-bucket/clean/youtube_videos")

# Display schema
df.printSchema()

# Quick sample
df.show(5, truncate=False)


Basic analysis

In [None]:
# Count number of videos per channel
df.groupBy("channel_title") \
  .agg(F.count("*").alias("video_count")) \
  .orderBy(F.desc("video_count")) \
  .show()

# Find the most recent videos
df.orderBy(F.col("published_at").desc()).show(5)


Add partitioning column

In [None]:
# Extract year from published date for partitioning
df_with_date = df.withColumn("year", F.year("published_at"))

# Show sample
df_with_date.select("video_id", "title", "year").show(5, truncate=False)


Save partitioned dataset

In [None]:
# Save partitioned by year for faster queries
df_with_date.write \
    .mode("overwrite") \
    .partitionBy("year") \
    .parquet("s3a://my-youtube-bucket/partitioned/youtube_videos")

print(" Data saved partitioned by year")


Caching for performance

In [None]:
# Cache DataFrame in memory for faster repeated queries
df.cache()

# First action triggers cache
print("Cached count:", df.count())

# Re-run query (faster because data is cached)
df.groupBy("channel_title").agg(F.count("*").alias("video_count")).show()


Bucketing Example

In [None]:
# Save data as bucketed table (good for joins on channel_title)
df.write \
    .bucketBy(4, "channel_title") \
    .sortBy("published_at") \
    .mode("overwrite") \
    .saveAsTable("youtube_bucketed")

print("✅ Saved as bucketed table")


Delta Lake storage

In [None]:
# Save as Delta table (transactional, supports time travel)
df_with_date.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("year") \
    .save("s3a://my-youtube-bucket/delta/youtube_videos")

print("✅ Delta table created")


Delta Lake optimization (Z-Order) (If required to opmimize)
Note-Optimize is more costly

In [None]:
# Compact small files into fewer large files and Z-Order by channel_title
spark.sql("""
OPTIMIZE delta.`s3a://my-youtube-bucket/delta/youtube_videos`
ZORDER BY (channel_title)
""")

print("✅ Delta table optimized with Z-Ordering")


Analysis of the data

In [None]:
#Window functions (top N videos per channel)
# Define window by channel and order by published date
w = Window.partitionBy("channel_title").orderBy(F.col("published_at").desc())

# Rank videos per channel
df_ranked = df.withColumn("row_num", F.row_number().over(w))

# Get top 3 videos per channel
df_ranked.filter(F.col("row_num") <= 3).show(truncate=False)
