In [0]:

from pyspark.sql import functions as F
from pyspark.sql import types as T
import time

audit_tbl = "edl_hc_mart.audit.audit_ingestion"
run_start = F.current_timestamp()

def audit_start(r, attempt=1, watermark_col=None, last_success_wm=None, checkpoint_path=None):
    df = (spark.createDataFrame([{
        "pipeline_name": r.pipeline_name,
        "source_type": r.source_type,
        "source_name": r.source_name,
        "bronze_table": r.bronze_table,
        "batch_id": r.batch_id,
        "run_id": r.run_id,
        "trigger_type": "schedule",
        "attempt": attempt,
        "run_start_ts": None,   # set below for deterministic equality
        "run_end_ts": None,
        "duration_ms": None,
        "last_status": "Started",
        "records_read": None,
        "records_written": None,
        "error_count": None,
        "error_message": None,
        "watermark_col": watermark_col,
        "last_success_watermark_value": last_success_wm,
        "current_run_high_watermark_value": None,
        "file_checkpoint_path": checkpoint_path,
        "schema_version_applied": r.schema_version,
        "producer_system": r.producer_system,
        "ingestion_user": r.ingestion_user,
        "notes": r.notes,
        "last_load_date": None,
        "created_at": None,
        "updated_at": None
    }], schema="""
        pipeline_name string, source_type string, source_name string, bronze_table string,
        batch_id string, run_id string, trigger_type string, attempt int,
        run_start_ts timestamp, run_end_ts timestamp, duration_ms long,
        last_status string, records_read long, records_written long, error_count long, error_message string,
        watermark_col string, last_success_watermark_value string, current_run_high_watermark_value string,
        file_checkpoint_path string, schema_version_applied string, producer_system string,
        ingestion_user string, notes string, last_load_date date, created_at timestamp, updated_at timestamp
    """)
         .withColumn("run_start_ts", F.current_timestamp())
         .withColumn("created_at", F.current_timestamp())
         .withColumn("updated_at", F.current_timestamp())
    )

    df.write.format("delta").mode("append").saveAsTable(audit_tbl)
    return df


In [0]:

def audit_success(r, records_read, records_written, high_watermark_value):
    # Append a success record (immutable log) for simplicity
    df = (spark.createDataFrame([{
        "pipeline_name": r.pipeline_name,
        "source_type": r.source_type,
        "source_name": r.source_name,
        "bronze_table": r.bronze_table,
        "batch_id": r.batch_id,
        "run_id": r.run_id,
        "trigger_type": "schedule",
        "attempt": 1,
        "run_start_ts": None,
        "run_end_ts": None,
        "duration_ms": None,
        "last_status": "Success",
        "records_read": records_read,
        "records_written": records_written,
        "error_count": 0,
        "error_message": None,
        "watermark_col": "ingestion_ts",  # or your chosen column
        "last_success_watermark_value": high_watermark_value,
        "current_run_high_watermark_value": high_watermark_value,
        "file_checkpoint_path": None,
        "schema_version_applied": r.schema_version,
        "producer_system": r.producer_system,
        "ingestion_user": r.ingestion_user,
        "notes": r.notes,
        "last_load_date": F.to_date(F.current_timestamp()),
        "created_at": None,
        "updated_at": None
    }], schema=spark.table(audit_tbl).schema)
         .withColumn("run_start_ts", F.current_timestamp()) # optional; or omit
         .withColumn("run_end_ts", F.current_timestamp())
         .withColumn("duration_ms", F.lit(0))  # compute if you capture start/end
         .withColumn("created_at", F.current_timestamp())
         .withColumn("updated_at", F.current_timestamp())
    )
    df.write.format("delta").mode("append").saveAsTable(audit_tbl)
    return df

In [0]:

def audit_failure(r, error_message, high_watermark_observed=None, attempt=1):
    df = (spark.createDataFrame([{
        "pipeline_name": r.pipeline_name,
        "source_type": r.source_type,
        "source_name": r.source_name,
        "bronze_table": r.bronze_table,
        "batch_id": r.batch_id,
        "run_id": r.run_id,
        "trigger_type": "schedule",
        "attempt": attempt,
        "run_start_ts": None,
        "run_end_ts": None,
        "duration_ms": None,
        "last_status": "Failure",
        "records_read": None,
        "records_written": None,
        "error_count": 1,
        "error_message": error_message[:1000],  # truncate to avoid giant rows
        "watermark_col": "ingestion_ts",
        "last_success_watermark_value": None,   # not promoting
        "current_run_high_watermark_value": high_watermark_observed,
        "file_checkpoint_path": None,
        "schema_version_applied": r.schema_version,
        "producer_system": r.producer_system,
        "ingestion_user": r.ingestion_user,
        "notes": r.notes,
        "last_load_date": F.to_date(F.current_timestamp()),
        "created_at": None,
        "updated_at": None
    }], schema=spark.table(audit_tbl).schema)
         .withColumn("run_start_ts", F.current_timestamp())
         .withColumn("run_end_ts", F.current_timestamp())
         .withColumn("duration_ms", F.lit(0))
         .withColumn("created_at", F.current_timestamp())
         .withColumn("updated_at", F.current_timestamp())
    )
    df.write.format("delta").mode("append").saveAsTable(audit_tbl)
    return df

In [0]:
%sql

CREATE OR REPLACE VIEW edl_hc_mart.audit.vw_audit_ingestion_last_success AS
WITH ranked AS (
  SELECT
    pipeline_name, source_name, bronze_table,
    MAX(CASE WHEN last_status = 'Success' THEN created_at END) AS last_success_ts
  FROM edl_hc_mart.audit.audit_ingestion
  GROUP BY pipeline_name, source_name, bronze_table
)
SELECT a.pipeline_name, a.source_name, a.bronze_table,
       a.watermark_col,
       a.last_success_watermark_value
FROM edl_hc_mart.audit.audit_ingestion a
JOIN ranked r
  ON a.pipeline_name = r.pipeline_name
 AND a.source_name   = r.source_name
 AND a.bronze_table  = r.bronze_table
 AND a.created_at    = r.last_success_ts
WHERE a.last_status = 'Success';


In [0]:
%sql
SELECT * FROM edl_hc_mart.audit.vw_audit_ingestion_last_success;