In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType, TimestampType

spark = SparkSession.builder.appName("extras").getOrCreate()

In [0]:
# Create metrics table 'sales_pipeline_metrics'
schema = StructType(
    [
        StructField("run_id", StringType(), nullable=False),
        StructField("run_timestamp", TimestampType(), nullable=False),
        StructField("bronze_row_count", LongType(), nullable=False),
        StructField("silver_valid_count", LongType(), nullable=False),
        StructField("silver_quarantine_count", LongType(), nullable=False),
        StructField("fact_rows_inserted", LongType(), nullable=False),
        StructField("fact_rows_updated", LongType(), nullable=False),
        StructField("run_status", StringType(), nullable=False)
    ]
)

metrics_table = spark.createDataFrame([], schema)
metrics_table.printSchema()

metrics_table.write.format("delta").mode("overwrite").saveAsTable("sales_pipeline_metrics")

if spark.catalog.tableExists("sales_pipeline_metrics"):
    print("Table exists.")
else:
    print("Table is not created.")

In [0]:
# Bulk remove tables
tables_to_drop = ["sales_bronze", "sales_silver", "sales_quarantine", "sales_fact"]

for t in tables_to_drop:
    spark.sql(f"DROP TABLE IF EXISTS {t}")

In [0]:
# Create file proccessed table
schema = StructType(
    [
        StructField("file_name", StringType(), nullable=False),
        StructField("processed_timestamp", TimestampType(), nullable=False)
    ]
)

file_processed_table = spark.createDataFrame([], schema)
file_processed_table.write.format("delta").mode("overwrite").saveAsTable("sales_processed_files")

if spark.catalog.tableExists("sales_processed_files"):
    print("Table exists.")
else:
    print("Table is not created.")

In [0]:
# keep in here temporarily
'''# Initialize variables

SOURCE_PATH = "/Volumes/workspace/default/my_datas/daily_sales/sales_1000.csv"
BRONZE_TABLE = "sales_bronze"
SILVER_TABLE = "sales_silver"
QUARANTINE_TABLE = "sales_quarantine"
FACT_TABLE = "sales_fact"

run_metadata = {
    "run_id": str(uuid.uuid4()),
    "run_timestamp": datetime.datetime.now(),
    "bronze_row_count": 0,
    "silver_valid_count": 0,
    "silver_quarantine_count": 0,
    "fact_rows_inserted": 0,
    "fact_rows_updated": 0,
    "run_status": "success"
}

def write_pipeline_metrics(get_data):
    
    target_schema = spark.table('sales_pipeline_metrics').schema
    metrics_df = spark.createDataFrame([get_data], schema=target_schema)
    metrics_df.write.format('delta').mode("append").saveAsTable('sales_pipeline_metrics')
    print("Metrics appended successfully")

try:
    # Read CSV
    bronze_df = spark.read.format("csv").option("header", "true").load(SOURCE_PATH)

    # clean column names
    bronze_df = bronze_df.select([col(c).alias(c.replace(" ", "_")) for c in bronze_df.columns])

    # write to delta
    bronze_df.write.format('delta').mode("overwrite").saveAsTable(BRONZE_TABLE)

    # Phase 2 â€” Basic Data Quality (Silver + Quarantine)

    # Read from sales_bronze table
    bronze_temp_df = spark.read.table(BRONZE_TABLE)
    # Get bronze table count
    run_metadata["bronze_row_count"] = spark.table(BRONZE_TABLE).count()

    # clean column names - Advanced - using Regex
    bronze_temp_df = bronze_temp_df.select([
        col(c)
        .alias(re.sub(r'[^a-zA-Z0-9]+', '_', c)
            .lower()
            .strip('_')) 
        for c in bronze_temp_df.columns
    ])

    # Casting critical columns
    bronze_temp_df = (
        bronze_temp_df
        .withColumns(
            {
                "order_id": col('order_id').cast('long'),
                "units_sold": col('units_sold').cast('int'),
                "unit_price": col('unit_price').cast('double') 
            }
        )
    )

    # Business conditions to filter rows
    invalid_condition = (
        (col('order_id').isNull()) | 
        (col('units_sold') <= 0) | 
        (col('unit_price') <= 0)
    )

    # Retrieve valid & invalid rows based on Business conditions
    bronze_valid_df = bronze_temp_df.filter(~invalid_condition)
    bronze_invalid_df = bronze_temp_df.filter(invalid_condition)

    # Write Valid & Invlaid rows to delta
    bronze_valid_df.write.format('delta').mode("overwrite").saveAsTable(SILVER_TABLE)
    bronze_invalid_df.write.format('delta').mode("overwrite").saveAsTable(QUARANTINE_TABLE)

    # Get Silver_valid and Silver_invalid counts
    run_metadata["silver_valid_count"] = spark.table(SILVER_TABLE).count()
    run_metadata["silver_quarantine_count"] = spark.table(QUARANTINE_TABLE).count()

    # Checking counts
    print("Total_rows: ", run_metadata["bronze_row_count"])
    print("Valid_rows: ", run_metadata["silver_valid_count"])
    print("Invalid_rows: ", run_metadata["silver_quarantine_count"])

    # Phase 3 - Incremental Loads

    # Read from silver valid table
    silver_valid_temp = spark.read.table(SILVER_TABLE)

    # condtion for incremental load
    change_condition = """
        NOT (t.units_sold <=> s.units_sold) OR
        NOT (t.unit_price <=> s.unit_price)
    """

    # Condition for merge
    merge_condition = "t.order_id = s.order_id"

    # Check if table exists an create one and overwrite with silver valid data
    if not spark.catalog.tableExists(FACT_TABLE):
        print("Table does not exist. Creating one..")
        silver_valid_temp.write.format('delta').mode("overwrite").saveAsTable(FACT_TABLE)
        print("Table created.")
        fact_table_count = spark.table(FACT_TABLE).count()
        print(f"Total No of rows: {fact_table_count}")

    # If table exists, merge the upsert data
    else:
        print("Table exists. Upserting rows..")

        # Create delta table object for sales_fact
        delta_sales_fact_temp_df = DeltaTable.forName(spark, FACT_TABLE)
        delta_sales_fact_temp_df.alias("t").merge(
            silver_valid_temp.alias("s"), 
            merge_condition
            ).whenMatchedUpdateAll(
                condition=change_condition
                ).whenNotMatchedInsertAll().execute()
            
        latest_history = delta_sales_fact_temp_df.history(1).collect()[0]
        metrics = latest_history["operationMetrics"]
        run_metadata["fact_rows_inserted"] = int(metrics.get("numTargetRowsInserted", 0))
        run_metadata["fact_rows_updated"] = int(metrics.get("numTargetRowsUpdated", 0))

        print("new_rows: ", run_metadata["fact_rows_inserted"])
        print("rows_updated: ", run_metadata["fact_rows_updated"])
       

except Exception as e:
    run_metadata["run_status"] = "failure"
    raise e

finally:
    try:
        print("Attempting to log pipeline metrics...")
        write_pipeline_metrics(run_metadata)
    except Exception as e:
        print(f"CRITICAL WARNING: Metrics logging failed! Error: {e}")
'''

In [0]:
%sql
--TRUNCATE TABLE sales_pipeline_metrics;
--select * from sales_processed_files order by processed_timestamp desc;
select * from sales_pipeline_metrics order by run_timestamp desc;
--DELETE FROM sales_pipeline_metrics where silver_quarantine_count = '1008';
--DELETE FROM sales_processed_files where file_name = 'sales_1000';
--show tables;



In [0]:
%sql
select * from sales_processed_files;