In [0]:
%run ./Shared_Functions

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType

# Ensure Schema Exists
spark.sql("CREATE SCHEMA IF NOT EXISTS ecomm_data_project.audit")

# Define Schema
audit_schema = StructType([
    StructField("run_id", StringType(), True),
    StructField("job_name", StringType(), True),
    StructField("layer", StringType(), True),
    StructField("status", StringType(), True),
    StructField("records_processed", LongType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("error_message", StringType(), True)
])

# Create the Table as a managed Delta table in Unity Catalog
empty_df = spark.createDataFrame([], audit_schema)
(
    empty_df.write
    .format("delta")
    .mode("ignore")
    .saveAsTable("ecomm_data_project.audit.pipeline_logs")
)

display(spark.table("ecomm_data_project.audit.pipeline_logs"))

In [0]:
import pyspark.sql.functions as F

In [0]:
import pyspark.sql.functions as F

# 1. GET RUN ID
try:
    run_id = dbutils.notebook.entry_point.getDbutils().notebook().getContext().runId().get()
except:
    run_id = "manual_run"
print(f"Current Run ID: {run_id}")

In [0]:
# 2. SETUP WIDGETS & PATHS
dbutils.widgets.text("process_type", "all")
process_type = dbutils.widgets.get("process_type")

base_path = "/Volumes/ecomm_data_project/ecomm_raw/landing_zone"
checkpoint_base = "/Volumes/ecomm_data_project/bronze/checkpoints"

print(f"Pipeline running for mode: {process_type}")

In [0]:
# PATH A: PROCESS USER DATA (With Logging)
# ==========================================
if process_type in ["users", "all"]:
    job_name = "Bronze_Ingest_Users"
    try:
        print(f">>> [START] {job_name}...")
        
        # Run Auto Loader
        query = (spark.readStream
                  .format("cloudFiles")
                  .option("cloudFiles.format", "parquet")
                  .option("cloudFiles.schemaLocation", f"{checkpoint_base}/users_schema")
                  .option("cloudFiles.schemaEvolutionMode", "addNewColumns") 
                  .load(f"{base_path}/users-raw-2")
                  .writeStream
                  .option("checkpointLocation", f"{checkpoint_base}/users")
                  .option("mergeSchema", "true") 
                  .trigger(availableNow=True) # Process all data then stop
                  .toTable("ecomm_data_project.bronze.users"))
        
        # Wait for the stream to actually finish writing
        query.awaitTermination()
        
        # Metric: Get Total Row Count (Simple Proxy for success)
        total_count = spark.read.table("ecomm_data_project.bronze.users").count()
        
        # LOG SUCCESS
        log_pipeline_run(run_id, job_name, "Bronze", "SUCCESS", total_count)
        
    except Exception as e:
        # LOG FAILURE
        print(f"!!! Error in {job_name}: {e}")
        log_pipeline_run(run_id, job_name, "Bronze", "FAILED", 0, str(e))
        raise e # Re-raise error so Databricks Job knows it failed

In [0]:
# PATH B: PROCESS REFERENCE DATA (With Logging)
# ==========================================
if process_type in ["reference", "all"]:
    job_name = "Bronze_Ingest_Reference"
    try:
        print(f">>> [START] {job_name}...")
        
        ref_tables = ["buyers", "sellers", "countries"]
        total_rows_affected = 0
        
        for table in ref_tables:
            df = spark.read.parquet(f"{base_path}/{table}-raw-2")
            df.write.format("delta").mode("overwrite").saveAsTable(f"ecomm_data_project.bronze.{table}")
            total_rows_affected += df.count()
            
        # LOG SUCCESS
        log_pipeline_run(run_id, job_name, "Bronze", "SUCCESS", total_rows_affected)
        
    except Exception as e:
        # LOG FAILURE
        log_pipeline_run(run_id, job_name, "Bronze", "FAILED", 0, str(e))
        raise e