In [None]:
# Databricks notebook (script) - 02_transform_delta
from pyspark.sql import functions as F

raw_path = "s3a://my-youtube-bucket/raw/"
curated_path = "s3a://my-youtube-bucket/curated/youtube/"

# Read raw JSON files
df = spark.read.json(raw_path + "*/*.json")

# Flatten and select fields
cleaned = df.select(
    F.coalesce(F.col("id.videoId"), F.col("videoId")).alias("video_id"),
    F.col("snippet.title").alias("title"),
    F.col("snippet.channelTitle").alias("channel_title"),
    F.to_timestamp("snippet.publishedAt").alias("published_ts")
).where(F.col("video_id").isNotNull())

cleaned = cleaned.withColumn("year", F.year("published_ts")).withColumn("month", F.month("published_ts"))

# Write to Delta partitioned by year/month
(cleaned.write
    .format("delta")
    .mode("overwrite")
    .partitionBy("year", "month")
    .save(curated_path))

# Optimize (works in Databricks)
try:
    spark.sql(f"OPTIMIZE delta.`{curated_path}` ZORDER BY (channel_title)")
except Exception as e:
    print('OPTIMIZE skipped (not supported in this environment):', e)

display(cleaned.limit(5))
