In [0]:
from pyspark.sql.functions import (
    col, lit, current_timestamp, monotonically_increasing_id,
    to_timestamp, to_date, coalesce, struct, to_json, input_file_name
)
from pyspark.sql import DataFrame
import uuid
import time

# ------------------------------
# Config (catalog tables)
# ------------------------------
source_table = "weather_streaming_database.`22`"             # Glue source table (numeric name must be backticked)
bronze_table = "weather_catalog.raw.weather_readings"        # UC Bronze
dlq_table    = "weather_catalog.raw.weather_dlq"             # UC DLQ
log_table    = "weather_catalog.logging.weather_logging"     # UC Logging

batch_id  = str(uuid.uuid4())
job_start = current_timestamp()

# ------------------------------
# Step 1: Read from Glue table
# ------------------------------
try:
    df_raw = spark.table(source_table)
except Exception as e:
    # If even reading fails, log and rethrow
    err = str(e)
    log_df = spark.createDataFrame(
        [(batch_id, source_table, bronze_table, 0, 0, "FAILURE", err, job_start, current_timestamp())],
        ["batch_id","source","target","row_count_clean","row_count_dlq","status","error_message","start_time","end_time"]
    )
    log_df.write.format("delta").mode("append").saveAsTable(log_table)
    raise

# ------------------------------
# Step 2: Enrich + derive partition date
# - Your Glue schema shows: city, date_time (string), etc.
# - Derive a robust 'date' from date_time for partitioning.
# ------------------------------
df_enriched = (
    df_raw
    .withColumn("ingest_time", current_timestamp())
    .withColumn("source_table", lit(source_table))
    .withColumn("batch_id", lit(batch_id))
    .withColumn("record_id", monotonically_increasing_id())
    .withColumn("source_file", input_file_name())
    # Try multiple timestamp patterns; fall back to null if unparsable
    .withColumn(
        "ts_parsed",
        coalesce(
            to_timestamp(col("date_time")),                                    # default Spark parse
            to_timestamp(col("date_time"), "yyyy-MM-dd HH:mm:ss"),
            to_timestamp(col("date_time"), "yyyy-MM-dd'T'HH:mm:ss"),
            to_timestamp(col("date_time"), "yyyy/MM/dd HH:mm:ss"),
            to_timestamp(col("date_time"), "dd-MM-yyyy HH:mm:ss"),
            to_timestamp(col("date_time"), "dd-MM-yyyy")
        )
    )
    .withColumn("date", to_date(col("ts_parsed")))
)

# ------------------------------
# Step 3: Split clean vs DLQ
#   Business rules for Bronze:
#   - city NOT NULL
#   - date_time NOT NULL
#   - date (parsed) NOT NULL
#   Everything else goes to DLQ with raw payload + reason
# ------------------------------
invalid_cond = (col("city").isNull() | col("date_time").isNull() | col("date").isNull())

df_dlq = (
    df_enriched
    .filter(invalid_cond)
    .withColumn("error_reason", lit("Missing or invalid city/date_time/date"))
    .withColumn("raw_payload", to_json(struct([col(c) for c in df_raw.columns])))
    .select(
        "record_id","batch_id","ingest_time","source_table","source_file",
        "error_reason","raw_payload"
    )
)

df_clean = (
    df_enriched
    .filter(~invalid_cond)
    .drop("ts_parsed")  # keep the table tidy
)

# Cache before counting/writing to avoid recompute
df_dlq_cached = df_dlq.cache()
df_clean_cached = df_clean.cache()

row_count_dlq = df_dlq_cached.count()
row_count_clean = df_clean_cached.count()

# ------------------------------
# Step 4: Write DLQ (auto-creates table if missing)
# ------------------------------
if row_count_dlq > 0:
    (df_dlq_cached.write.format("delta").mode("append").saveAsTable(dlq_table))

# ------------------------------
# Step 5: Write Bronze (schema evolution + partitioning)
#   Partition by city + date (both present after validation)
# ------------------------------
(df_clean_cached.write
    .format("delta")
    .mode("append")
    .option("mergeSchema", "true")      # ✅ schema evolution
    .partitionBy("city", "date")
    .saveAsTable(bronze_table)
)

# ------------------------------
# Step 6: Logging / Audit
# ------------------------------
log_df = spark.createDataFrame(
    [(batch_id, source_table, bronze_table, row_count_clean, row_count_dlq, "SUCCESS", None, job_start, current_timestamp())],
    ["batch_id","source","target","row_count_clean","row_count_dlq","status","error_message","start_time","end_time"]
)
log_df.write.format("delta").mode("append").saveAsTable(log_table)

print(f"✅ Batch {batch_id} Completed | Clean: {row_count_clean}, DLQ: {row_count_dlq}")


In [0]:
%sh
git checkout -b feature/add-notebooks
