##### Local Environment Setup

In [1]:

import os
import sys
# Set JAVA env variable
os.environ["JAVA_HOME"] = r"C:\Program Files\Eclipse Adoptium\jdk-11.0.26.4-hotspot"
# Set Hadoop environment variables 
os.environ['HADOOP_HOME'] = r'C:\hadoop'
os.environ['PATH'] = os.environ['HADOOP_HOME'] + r'\bin;' + os.environ['PATH']
# Set the Python executable path explicitly
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# Get Pipeline Tools Path
sys.path.append(R'C:\GitHub\Tools\de')


##### Libraries

In [2]:
import time
import logging
from datetime import datetime

import numpy as np
import pandas as pd

from typing import Dict


from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import (StructType, StructField, StringType, 
                            DoubleType, IntegerType, TimestampType, 
                            DateType)
from delta.tables import DeltaTable


In [3]:
from de_pipeline_tools import *

##### Spark Session

In [4]:
spark = initialize_local_spark_delta_lake("Financial Data Pipeline")

2025-04-17 14:31:24,210 - INFO - ---Spark session initialized with Delta Lake support---


In [5]:
# Create a database in the Hive warehouse if doesn't exist
spark.sql("CREATE DATABASE IF NOT EXISTS de_pipelines LOCATION 'C:/hive-warehouse/de_pipelines'")

DataFrame[]


## SCD Type 2:
##### Intial Creation (Overwrite)

##### Inputs/Outputs

In [6]:
# Intial Creation
data_dir = "../../data"
file_list = [
    "financial_transactions_20250409_113413.csv",
    "financial_transactions_20250410_113413.csv"
]

abs_file_list = []
for file in file_list:
    # Join data_dir with filename
    relative_path = os.path.join(data_dir, file)
    # Convert to absolute path
    abs_path = os.path.abspath(relative_path)
    abs_file_list.append(abs_path)

bronze_table = 'de_pipelines.financial_osb_bronze_type2'
silver_table = 'de_pipelines.financial_osb_silver_type2'
gold_table   = 'de_pipelines.financial_osb_gold_type2'


In [7]:
def get_schema():
    return StructType([
    StructField("transaction_id", StringType(), False),
    StructField("timestamp", StringType(), True), #load in as string type, handle in silver step     
    StructField("customer_id", StringType(), True),      
    StructField("account_number", StringType(), True),   
    StructField("transaction_type", StringType(), True), 
    StructField("amount", DoubleType(), True),           
    StructField("currency", StringType(), True),         
    StructField("balance_after", DoubleType(), True),    
    StructField("status", StringType(), True),           
    StructField("merchant", StringType(), True),         
    StructField("category", StringType(), True),         
    StructField("location", StringType(), True)          
])

#### Define: Validation Rules, Transformations, Write Executions

In [8]:
def bronze_transform(df:DataFrame) -> DataFrame:

    # define SCD Type 2 necessary inputs
    df = df.withColumn("is_current", F.lit(1))
    df = df.withColumn("start_date", F.current_date())
    df = df.withColumn("end_date", F.lit(None).cast("timestamp"))

    return df

In [9]:
def bronze_writer(df: DataFrame, table_name: str) -> None:
    (df.write
        .format("delta")
        .mode("overwrite")
        .option("overwriteSchema", "true")
        .saveAsTable(table_name)
    )

    # Get delta operation metrics
    metricsdf = spark.sql(f"DESCRIBE HISTORY {table_name} LIMIT 1")
    metrics = metricsdf.select("operationMetrics").collect()[0][0]
    
    return metrics

In [10]:
# Define bronze validation rules
bronze_validation_rules = [
    {
        "name": "has_transaction_id",
        "condition": "transaction_id IS NOT NULL",
        "description": "Transaction ID must be present"
    },
    {
        "name": "valid_amount",
        "condition": "amount IS NOT NULL AND amount > 0",
        "description": "Amount must be positive if not null"
    },
    {
        "name": "valid_timestamp",
        "condition": "timestamp IS NOT NULL AND timestamp <= current_timestamp()",
        "description": "Timestamp must not be in the future"
    }
]

In [11]:
def silver_transform(df:DataFrame) -> DataFrame:

    # Clean any non-timestamp characters first
    df = df.withColumn(
        "transaction_timestamp", 
        F.regexp_replace(F.col("timestamp"), "[^0-9\\-: ]", "")
    )

    # Cast to timestamp type
    df = df.withColumn("transaction_timestamp", F.col("transaction_timestamp").cast("timestamp"))

    # Remove duplicates
    df = df.dropDuplicates(subset=["transaction_id","transaction_timestamp"])

    # Standardize Data
    df = (df
            .withColumn("amount", F.abs(F.col("amount")))
            .withColumn("transaction_type", F.lower(F.col("transaction_type")))
            .withColumn("category", F.lower(F.col("category")))
            .withColumn("status", F.lower(F.col("status")))
    )

    # Filter Data
    # Address bronze layer data validation check concerns
    df = df.filter(
                    (F.col('transaction_id').isNotNull()) # transaction id must exist
                    & (F.col('account_number').isNotNull()) # account number must exist
                    & (F.col('amount') > 0) # amount must be positive
                    & ((F.col('transaction_timestamp') <= F.current_date()) # must be <= current date
                    |(F.col('transaction_timestamp').isNull()))# or must be Null, no future timestamps
    ) 
    
    # Split timestamp into date and time and year_month for paritioning
    df = (df
            .withColumn("transaction_date", F.to_date("transaction_timestamp"))
            .withColumn("transaction_time", F.date_format("transaction_timestamp", "HH:mm:ss"))
            .withColumn("year_month", F.date_format(F.col("transaction_date"), "yyyy-MM"))
    )
    
    # Add processing timestamp for bookkeeping
    df = (df
            .withColumn("processing_timestamp", F.current_timestamp())
    )

    return df

In [12]:
def silver_writer(df: DataFrame, table_name: str) -> None:
    (df.write
        .format("delta")
        .mode("overwrite")
        .option("overwriteSchema", "true")
        .partitionBy("year_month")
        .saveAsTable(table_name)
    )

    # Get delta operation metrics
    metricsdf = spark.sql(f"DESCRIBE HISTORY {table_name} LIMIT 1")
    metrics = metricsdf.select("operationMetrics").collect()[0][0]

    return metrics

In [13]:
# Define silver validation rules
silver_validation_rules = [
    {
        "name": "valid_transaction_type",
        "condition": "transaction_type IN ('debit', 'credit', 'transfer', 'payment', 'withdrawal', 'deposit') OR transaction_type IS NULL",
        "description": "Transaction type must be one of the valid types"
    },
    {
        "name": "valid_status",
        "condition": "status IN ('completed', 'pending', 'failed', 'cancelled', 'refunded') OR status IS NULL",
        "description": "Status must be one of the valid statuses"
    },
    {
        "name": "valid_currency",
        "condition": "currency IS NOT NULL AND length(currency) = 3",
        "description": "Currency code should be 3 characters if present"
    },
    {
        "name": "valid_transaction_timestamp",
        "condition": "transaction_timestamp IS NOT NULL AND transaction_timestamp <= current_timestamp()",
        "description": "Transaction timestamp must not be NULL or in the future"
    }
]

In [14]:
def gold_transform(df:DataFrame) -> Dict:
    gold_dfs = {}

    # Gold aggregation 1: Daily summary by category
    daily_category = (df
        .groupBy("transaction_date", "category")
        .agg(
            F.count("transaction_id").alias("transaction_count"),
            F.sum("amount").alias("total_amount"),
            F.avg("amount").alias("avg_amount"),
            F.min("amount").alias("min_amount"),
            F.max("amount").alias("max_amount"),
            F.countDistinct("customer_id").alias("unique_customers")
        )
        .withColumn("processing_timestamp", F.current_timestamp())
    )
    
    gold_dfs["daily_category"] = daily_category
    
    # Gold aggregation 2: Customer summary
    customer_summary = (df
        .groupBy("customer_id")
        .agg(
            F.count("transaction_id").alias("transaction_count"),
            F.sum("amount").alias("total_amount"),
            F.avg("amount").alias("avg_amount"),
            F.min("transaction_date").alias("first_transaction_date"),
            F.max("transaction_date").alias("last_transaction_date"),
            F.approx_count_distinct("category").alias("category_count")
        )
        .withColumn("processing_timestamp", F.current_timestamp())
        .withColumn("days_since_last_transaction", 
                    F.datediff(F.current_date(), F.col("last_transaction_date")))
    )
    
    gold_dfs["customer_summary"] = customer_summary
    
    # Gold aggregation 3: Transaction type summary
    transaction_type_summary = (df
        .groupBy("transaction_type")
        .agg(
            F.count("transaction_id").alias("transaction_count"),
            F.sum("amount").alias("total_amount"),
            F.avg("amount").alias("avg_amount")
        )
        .withColumn("processing_timestamp", F.current_timestamp())
    )
    
    gold_dfs["transaction_type_summary"] = transaction_type_summary

    return gold_dfs

In [15]:
def gold_writer(df: DataFrame, table_name: str) -> None:
    (df.write
        .format("delta")
        .mode("overwrite")
        .option("overwriteSchema", "true")
        .saveAsTable(table_name)
    )

    # Get delta operation metrics
    metricsdf = spark.sql(f"DESCRIBE HISTORY {table_name} LIMIT 1")
    metrics = metricsdf.select("operationMetrics").collect()[0][0]

    return metrics

In [16]:
# Define gold validation rules
gold_validation_rules = [
    {
        "name": "positive_transaction_counts",
        "condition": "transaction_count > 0",
        "description": "Transaction counts should be positive"
    },
    {
        "name": "valid_total_amounts",
        "condition": "total_amount >= 0",
        "description": "Total amounts should not be negative"
    }
]

### Running Full Batch Pipeline

In [17]:
run_batch_de_pipeline(spark, 'csv', abs_file_list, get_schema(), 
                     bronze_table, silver_table, gold_table, 
                     bronze_transform=bronze_transform, silver_transform=silver_transform, gold_transform=gold_transform,
                     bronze_writer=bronze_writer, silver_writer=silver_writer, gold_writer=gold_writer,
                     bronze_validation_rules=bronze_validation_rules, 
                     silver_validation_rules=silver_validation_rules, 
                     gold_validation_rules=gold_validation_rules,
                     pipeline_name='Financial_DE_Pipeline')

2025-04-17 13:59:43,514 - INFO - --Starting data pipeline execution with ID: Financial_DE_Pipeline_20250417_135943--
2025-04-17 13:59:43,516 - INFO - Starting bronze layer processing
2025-04-17 13:59:44,297 - INFO - Successfully read CSV data from: 
  - c:\GitHub\DE_Pipelines\data\financial_transactions_20250409_113413.csv
  - c:\GitHub\DE_Pipelines\data\financial_transactions_20250410_113413.csv
2025-04-17 13:59:44,435 - INFO - Transformation function applied
2025-04-17 14:00:13,449 - INFO - Successfully wrote bronze table: de_pipelines.financial_osb_bronze_type2
2025-04-17 14:00:13,451 - INFO - Write Metrics: 
{
  "numOutputRows": "1000",
  "numOutputBytes": "53856",
  "numFiles": "2"
}
2025-04-17 14:00:14,130 - INFO - Running data quality checks for bronze layer
2025-04-17 14:00:19,358 - INFO - Data Quality Metrics for bronze layer:
2025-04-17 14:00:19,360 - INFO -   - Shape: [18,1000] (approx. row count)
2025-04-17 14:00:19,362 - INFO -   - Schema: 
root
 |-- transaction_id: string

{'status': 'success',
 'pipeline_id': 'Financial_DE_Pipeline_20250417_135943',
 'bronze_version': 2,
 'silver_version': 1,
 'timestamp': '2025-04-17T14:02:41.416879',
 'duration_seconds': 177.90047144889832,
 'metrics': {'pipeline_id': 'Financial_DE_Pipeline_20250417_135943',
  'start_time': '2025-04-17T13:59:43.514408',
  'stages': {'bronze': {'duration_seconds': 37.139564752578735,
    'version': 2,
    'status': 'success'},
   'bronze_optimize': {'layer': 'bronze',
    'duration_seconds': 12.810115814208984,
    'status': 'success'},
   'silver': {'duration_seconds': 36.66653513908386,
    'version': 1,
    'status': 'success',
    'source_bronze_version': 2},
   'silver_optimize': {'layer': 'silver',
    'duration_seconds': 15.660342454910278,
    'status': 'success'},
   'gold': {'duration_seconds': 69.4354395866394,
    'status': 'success',
    'source_silver_version': 1,
    'tables': ['daily_category',
     'customer_summary',
     'transaction_type_summary']},
   'gold_optimiz

### Running Layers In Isolation

In [18]:
spark = initialize_local_spark_delta_lake("Financial Data Pipeline - Testing")

2025-04-17 14:02:44,171 - INFO - ---Spark session initialized with Delta Lake support---


#### Bronze

In [27]:
bronzedf, bronze_version = process_batch_bronze_layer(spark, 'csv', abs_file_list, get_schema(), bronze_table,
                               bronze_transform=bronze_transform, validation_rules=bronze_validation_rules,
                               pipeline_id='test', mode='test', bronze_writer=None)

2025-04-17 14:06:07,531 - INFO - Starting bronze layer processing
2025-04-17 14:06:07,593 - INFO - Successfully read CSV data from: 
  - c:\GitHub\DE_Pipelines\data\financial_transactions_20250414_113413.csv
  - c:\GitHub\DE_Pipelines\data\financial_transactions_20250417_113413.csv
2025-04-17 14:06:07,622 - INFO - Transformation function applied
2025-04-17 14:06:07,625 - INFO - Running data quality checks for bronze layer
2025-04-17 14:06:11,753 - INFO - Data Quality Metrics for bronze layer:
2025-04-17 14:06:11,755 - INFO -   - Shape: [18,2750] (approx. row count)
2025-04-17 14:06:11,756 - INFO -   - Schema: 
root
 |-- transaction_id: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- account_number: string (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- balance_after: double (nullable = true)
 |-- status: string (nullable 

In [28]:
bronzedf.show(5)

+--------------+-------------------+-----------+--------------+----------------+-------+--------+-------------+--------+--------------------+---------+---------------+--------------------+--------------------+--------+----------+--------+----------+
|transaction_id|          timestamp|customer_id|account_number|transaction_type| amount|currency|balance_after|  status|            merchant| category|       location| ingestion_timestamp|         source_file|batch_id|is_current|end_date|start_date|
+--------------+-------------------+-----------+--------------+----------------+-------+--------+-------------+--------+--------------------+---------+---------------+--------------------+--------------------+--------+----------+--------+----------+
|   TXN00000001|2023-05-05 03:42:36| CUST001179| ACCT-39450273|        interest|1378.52|     USD|      9544.56|reversed|                NULL|     NULL|Los Angeles, CA|2025-04-17 14:06:...|file:///c:/GitHub...|    test|         1|    NULL|2025-04-17|


#### Silver

In [21]:
silverdf, silver_version = process_batch_silver_layer(spark, bronze_table, bronze_version=None, 
                                                   silver_table=silver_table, 
                                                   silver_transform=silver_transform, 
                                                   validation_rules=silver_validation_rules,
                                                   pipeline_id='test', mode='test', 
                                                   silver_writer=None)

2025-04-17 14:02:50,881 - INFO - Starting silver layer processing
2025-04-17 14:02:51,532 - INFO - Successfully read bronze data version 3
2025-04-17 14:02:51,703 - INFO - Transformation function applied
2025-04-17 14:02:51,708 - INFO - Running data quality checks for silver layer
2025-04-17 14:03:03,967 - INFO - Data Quality Metrics for silver layer:
2025-04-17 14:03:03,969 - INFO -   - Shape: [23,950] (approx. row count)
2025-04-17 14:03:03,970 - INFO -   - Schema: 
root
 |-- transaction_id: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- account_number: string (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- balance_after: double (nullable = true)
 |-- status: string (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- location: string (nullable = true)
 |-- ingestion_ti

In [22]:
silverdf.show(5)

+--------------+-------------------+-----------+--------------+----------------+-------+--------+-------------+--------+--------------------+--------+---------------+--------------------+--------------------+--------------------+----------+--------+----------+---------------------+----------------+----------------+----------+--------------------+
|transaction_id|          timestamp|customer_id|account_number|transaction_type| amount|currency|balance_after|  status|            merchant|category|       location| ingestion_timestamp|         source_file|            batch_id|is_current|end_date|start_date|transaction_timestamp|transaction_date|transaction_time|year_month|processing_timestamp|
+--------------+-------------------+-----------+--------------+----------------+-------+--------+-------------+--------+--------------------+--------+---------------+--------------------+--------------------+--------------------+----------+--------+----------+---------------------+----------------+---

##### Investigating Silver Validation Check Failures:
Corrupt "status" column

In [23]:
silverdf.groupBy('status').count().show()

+---------+-----+
|   status|count|
+---------+-----+
| dqsputed|    1|
|  pgnding|    1|
|completed|  195|
|   failen|    1|
|   failed|  179|
|bompleted|    1|
|     NULL|   17|
|comcleted|    1|
| disputad|    1|
| dmsputed|    1|
| diiputed|    1|
|  pending|  164|
| disputed|  190|
| reverszd|    1|
| reversed|  196|
+---------+-----+



#### Gold

In [24]:
gold_dfs = process_batch_gold_layer(spark, silver_table, silver_version=None, 
                                 gold_table=gold_table, 
                                 gold_transform=gold_transform, 
                                 validation_rules=gold_validation_rules, 
                                 pipeline_id='test', 
                                 mode='test',
                                 gold_writer=None)

2025-04-17 14:03:18,135 - INFO - Starting gold layer processing
2025-04-17 14:03:18,781 - INFO - Successfully read silver data version 2
2025-04-17 14:03:18,979 - INFO - Transformation function applied
2025-04-17 14:03:18,981 - INFO - Running data quality checks for de_pipelines.financial_osb_gold_type2_daily_category layer
2025-04-17 14:03:31,231 - INFO - Data Quality Metrics for de_pipelines.financial_osb_gold_type2_daily_category layer:
2025-04-17 14:03:31,232 - INFO -   - Shape: [9,702] (approx. row count)
2025-04-17 14:03:31,233 - INFO -   - Schema: 
root
 |-- transaction_date: date (nullable = true)
 |-- category: string (nullable = true)
 |-- transaction_count: long (nullable = false)
 |-- total_amount: double (nullable = true)
 |-- avg_amount: double (nullable = true)
 |-- min_amount: double (nullable = true)
 |-- max_amount: double (nullable = true)
 |-- unique_customers: long (nullable = false)
 |-- processing_timestamp: timestamp (nullable = false)

2025-04-17 14:03:31,234 -

In [25]:
for table_name in gold_dfs.keys():
    gold_dfs[table_name].show(5)

+----------------+----------+-----------------+------------+----------+----------+----------+----------------+--------------------+
|transaction_date|  category|transaction_count|total_amount|avg_amount|min_amount|max_amount|unique_customers|processing_timestamp|
+----------------+----------+-----------------+------------+----------+----------+----------+----------------+--------------------+
|      2023-05-25|      fees|                1|       30.89|     30.89|     30.89|     30.89|               1|2025-04-17 14:04:...|
|      2023-09-25|    income|                1|      1384.1|    1384.1|    1384.1|    1384.1|               1|2025-04-17 14:04:...|
|      2023-06-22|      NULL|                1|       64.69|     64.69|     64.69|     64.69|               1|2025-04-17 14:04:...|
|      2023-12-18|investment|                1|      626.53|    626.53|    626.53|    626.53|               1|2025-04-17 14:04:...|
|      2023-08-28|   housing|                1|       37.49|     37.49|     


## SCD Type 2: 
##### Change Data Capture (Merge)

##### Inputs/Outputs

In [11]:
# Change Data Capture
file_list = [
    "financial_transactions_20250414_113413.csv",
    "financial_transactions_20250417_113413.csv"
]

abs_file_list = []
for file in file_list:
    # Join data_dir with filename
    relative_path = os.path.join(data_dir, file)
    # Convert to absolute path
    abs_path = os.path.abspath(relative_path)
    abs_file_list.append(abs_path)


#### Define: Validation Rules(Same), Transformations(Same), Write Executions (Merge)

In [22]:
def bronze_writer(df: DataFrame, table_name: str) -> None:

    # prepare the data - mark latest records as current, others as not current
    df = df.withColumn("row_num", F.row_number().over(
        Window.partitionBy("transaction_id", "account_number")
        .orderBy(F.col("timestamp").desc())
    ))

    df = df.withColumn("is_current", F.when(F.col("row_num") == 1, 1).otherwise(0))
    df = df.withColumn("end_date", F.when(F.col("is_current") == 0, F.current_date()).otherwise(None))
    df = df.drop("row_num")

    # Merge the source and target DataFrame
    df.createOrReplaceTempView("source")

    spark.sql(f"""
        MERGE INTO {table_name} t
        USING source s
        ON t.transaction_id = s.transaction_id
        AND t.account_number = s.account_number
        AND t.is_current = 1
        AND s.is_current = 1
        AND (t.amount != s.amount OR t.balance_after != s.balance_after)
        WHEN MATCHED THEN
        UPDATE SET
            t.is_current = 0,
            t.end_date = CURRENT_DATE()

    """)

    # Execute merge
    spark.sql(f"""
        MERGE INTO {table_name} t
        USING source s
        ON s.transaction_id = t.transaction_id AND s.account_number = t.account_number AND t.is_current=1
        WHEN NOT MATCHED THEN
            INSERT *
    """)

    # Get most recent two MERGE operations
    merge_metrics = (
        spark.sql(f"DESCRIBE HISTORY {table_name}")
        .filter("operation = 'MERGE'")
        .orderBy("timestamp", ascending=False)
        .limit(2)
        .select("operationMetrics")
        .collect()
    )

    # Extract operationMetrics as list of dicts
    metrics = [row["operationMetrics"] for row in merge_metrics]

    return metrics

In [23]:
#### Bronze
bronzedf, bronze_version = process_batch_bronze_layer(spark, 'csv', abs_file_list, get_schema(), bronze_table,
                               bronze_transform=bronze_transform, validation_rules=bronze_validation_rules,
                               pipeline_id='test', mode='write', bronze_writer=bronze_writer)

2025-04-17 14:47:34,115 - INFO - Starting bronze layer processing
2025-04-17 14:47:34,194 - INFO - Successfully read CSV data from: 
  - c:\GitHub\DE_Pipelines\data\financial_transactions_20250414_113413.csv
  - c:\GitHub\DE_Pipelines\data\financial_transactions_20250417_113413.csv
2025-04-17 14:47:34,228 - INFO - Transformation function applied
2025-04-17 14:47:44,905 - INFO - Successfully wrote bronze table: de_pipelines.financial_osb_bronze_type2
2025-04-17 14:47:44,907 - INFO - Write Metrics: 
[
  {
    "numOutputRows": "109",
    "numTargetBytesAdded": "9995",
    "numTargetRowsInserted": "109",
    "numTargetFilesAdded": "1",
    "materializeSourceTimeMs": "8",
    "numTargetRowsMatchedDeleted": "0",
    "numTargetFilesRemoved": "0",
    "numTargetRowsMatchedUpdated": "0",
    "executionTimeMs": "1797",
    "numTargetDeletionVectorsUpdated": "0",
    "numTargetRowsCopied": "0",
    "rewriteTimeMs": "1772",
    "numTargetRowsUpdated": "0",
    "numTargetDeletionVectorsRemoved": "0

In [19]:
bronzedf.count()

2750

In [None]:
def silver_writer(df: DataFrame, table_name: str) -> None:

    # Merge the source and target DataFrame
    df.createOrReplaceTempView("source")

    spark.sql(f"""
        MERGE INTO {table_name} t
        USING source s
        ON t.transaction_id = s.transaction_id
        AND t.account_number = s.account_number
        AND t.is_current = 1
        AND s.is_current = 1
        AND (t.amount != s.amount OR t.balance_after != s.balance_after) --actual change in the record
        WHEN MATCHED THEN
        UPDATE SET
            t.is_current = 0,
            t.end_date = CURRENT_DATE()

    """)

    # Execute merge
    spark.sql(f"""
        MERGE INTO {table_name} t
        USING source s
        ON s.transaction_id = t.transaction_id AND s.account_number = t.account_number AND t.is_current=1
        WHEN NOT MATCHED THEN
            INSERT *
    """)

    # Get delta operation metrics
    metricsdf = spark.sql(f"DESCRIBE HISTORY {table_name} LIMIT 1")
    metrics = metricsdf.select("operationMetrics").collect()[0][0]

    return metrics

#### Running CDC Full Batch Pipeline (Bronze, Silver)

In [None]:
run_batch_de_pipeline(spark, 'csv', abs_file_list, get_schema(), 
                     bronze_table=bronze_table, silver_table=silver_table, gold_table=None, 
                     bronze_transform=None, silver_transform=silver_transform, gold_transform=None,
                     bronze_writer=bronze_writer, silver_writer=silver_writer, gold_writer=None,
                     bronze_validation_rules=bronze_validation_rules, 
                     silver_validation_rules=silver_validation_rules, 
                     gold_validation_rules=None,
                     pipeline_name='Financial_DE_Pipeline')

2025-04-17 13:09:59,808 - INFO - --Starting data pipeline execution with ID: Financial_DE_Pipeline_20250417_130959--
2025-04-17 13:09:59,810 - INFO - Starting bronze layer processing for: 
['c:\\GitHub\\DE_Pipelines\\data\\financial_transactions_20250414_113413.csv', 'c:\\GitHub\\DE_Pipelines\\data\\financial_transactions_20250417_113413.csv']
2025-04-17 13:10:00,236 - INFO - Successfully read CSV data from: 
['c:\\GitHub\\DE_Pipelines\\data\\financial_transactions_20250414_113413.csv', 'c:\\GitHub\\DE_Pipelines\\data\\financial_transactions_20250417_113413.csv']
2025-04-17 13:10:00,237 - INFO - No transformation function defined
2025-04-17 13:10:28,564 - INFO - Successfully wrote bronze table: de_pipelines.financial_osb_bronze_type1
2025-04-17 13:10:28,566 - INFO - Write Metrics: 
{
  "numOutputRows": "1601",
  "numTargetBytesAdded": "62413",
  "numTargetRowsInserted": "61",
  "numTargetFilesAdded": "1",
  "materializeSourceTimeMs": "4365",
  "numTargetRowsMatchedDeleted": "0",
  "num

{'status': 'success',
 'pipeline_id': 'Financial_DE_Pipeline_20250417_130959',
 'bronze_version': 17,
 'silver_version': 10,
 'timestamp': '2025-04-17T13:11:33.327288',
 'duration_seconds': 93.51692771911621,
 'metrics': {'pipeline_id': 'Financial_DE_Pipeline_20250417_130959',
  'start_time': '2025-04-17T13:09:59.808359',
  'stages': {'bronze': {'duration_seconds': 35.74067544937134,
    'version': 17,
    'status': 'success'},
   'bronze_optimize': {'layer': 'bronze',
    'duration_seconds': 4.173312187194824,
    'status': 'success'},
   'silver': {'duration_seconds': 38.407339096069336,
    'version': 10,
    'status': 'success',
    'source_bronze_version': 17},
   'silver_optimize': {'layer': 'silver',
    'duration_seconds': 15.187612771987915,
    'status': 'success'}},
  'status': 'success',
  'end_time': '2025-04-17T13:11:33.325287',
  'total_duration_seconds': 93.51692771911621}}