In [0]:
# Databricks notebook source

from pyspark.sql.functions import *
from datetime import datetime
import traceback, uuid


# PARAMETERS (Widgets)

dbutils.widgets.text("SLACK_WEBHOOK", "")
SLACK_WEBHOOK = dbutils.widgets.get("SLACK_WEBHOOK")


# LOG TABLE

LOG_TABLE = "nyc_yellow_taxi_trip.log_yellow_taxi.pipeline_logs"

spark.sql(f"""
CREATE TABLE IF NOT EXISTS {LOG_TABLE} (
  RunId STRING,
  Timestamp TIMESTAMP,
  Layer STRING,
  Status STRING,
  ProcessedRecordCount LONG,
  ErrorMessages STRING,
  FileName STRING
) USING delta
""")


# LOGGING FUNCTION

def write_log(layer, status, processed=0, error_msg="", file_name=""):
    row = (str(uuid.uuid4()), datetime.now(), layer, status, processed, error_msg, file_name)
    df = spark.createDataFrame(
        [row],
        ["RunId","Timestamp","Layer","Status","ProcessedRecordCount","ErrorMessages","FileName"]
    )
    df.write.format("delta").mode("append").saveAsTable(LOG_TABLE)
    print(f"[LOG] {layer} | {status} | {processed} | {file_name}")


# SLACK ALERT FUNCTION (optional)

def send_alert(message):
    if SLACK_WEBHOOK.strip() != "":
        try:
            import requests
            requests.post(SLACK_WEBHOOK, json={"text": message})
        except:
            print("Slack alert failed.")


# VALIDATION FUNCTION (Per-file row count)

def validate_layer(layer, table_name):
    try:
        df = spark.table(table_name)

        # ---- group by source file (Bronze/Silver/Gold) ----
        if "_source_file" in df.columns:
            file_counts = (
                df.groupBy("_source_file")
                  .agg(count("*").alias("rows"))
                  .collect()
            )

            # Write log for EACH file
            for row in file_counts:
                write_log(
                    layer,
                    "SUCCESS",
                    processed=row["rows"],
                    file_name=row["_source_file"]
                )

            return file_counts

        else:
            # fallback for tables with no source file column
            total = df.count()
            write_log(layer, "SUCCESS", processed=total, file_name="")
            return total

    except Exception as e:
        errmsg = traceback.format_exc()
        write_log(layer, "FAILED", error_msg=errmsg)
        send_alert(f"‚ùå {layer} FAILED\n\n{errmsg}")
        raise


# MAIN EXECUTION

try:
    print(" ORCHESTRATION STARTED")
    write_log("ORCHESTRATION_START", "STARTED")


    # Validate Bronze, Silver, Gold after DLT task finishes

    bronze = validate_layer("BRONZE", "nyc_yellow_taxi_trip.taxi_bronze.taxi_bronze")
    silver = validate_layer("SILVER", "nyc_yellow_taxi_trip.taxi_silver.taxi_silver")
    gold   = validate_layer("GOLD",   "nyc_yellow_taxi_trip.taxi_gold.taxi_gold")

    send_alert(f"""
    PIPELINE SUCCESS

    BRONZE = {bronze}
    SILVER = {silver}
    GOLD   = {gold}
    """)

    write_log("ORCHESTRATION_END", "SUCCESS")

    print(" ORCHESTRATION COMPLETED SUCCESSFULLY")

except Exception as e:
    errmsg = traceback.format_exc()
    write_log("ORCHESTRATION_FAILED", "FAILED", error_msg=errmsg)
    send_alert(f" PIPELINE FAILED\n\n{errmsg}")
    raise
