# End-to-End Pipeline Validation

This notebook serves as a live, executable demonstration of the project's data pipelines. It will walk through the entire process, from cleaning the environment to ingesting data from the mock API, processing it through the bronze and silver layers, and validating the final, idempotent ledger.

**Objective:** To provide a clear, step-by-step validation of the data flow and the idempotency of the pipeline architecture.

In [12]:
import os
import shutil
import pandas as pd
from deltalake import DeltaTable

# Define the paths for our data lake directories
BRONZE_PATH = 'data_lake/bronze'
SILVER_PATH = 'data_lake/silver'
LEDGER_PATH = 'data_lake/application_status_ledger'

# Helper function to read and print a Delta table
def read_delta_table(path, name):
    """Reads and prints the contents of a Delta table."""
    print(f"--- Contents of {name} table ---")
    try:
        df = DeltaTable(path).to_pandas()
        print(df)
        print("-" * (len(name) + 24))
        return df
    except Exception as e:
        print(f"Could not read table: {e}")
        return None

print("Setup complete. Paths and helper function are defined.")

Setup complete. Paths and helper function are defined.


## Step 1: Clean the Environment

Before we begin, we need to ensure a clean state by deleting any existing data from previous runs. This guarantees that we are testing the pipeline's initialization process from scratch.

In [2]:
# Use a shell command to remove the directories
!rm -rf {BRONZE_PATH} {SILVER_PATH} {LEDGER_PATH}

print(f"Cleaned directories: {BRONZE_PATH}, {SILVER_PATH}, {LEDGER_PATH}")

Cleaned directories: pipelines/data_lake/bronze, pipelines/data_lake/silver, pipelines/data_lake/application_status_ledger


## Step 2: Run the Ingestion Pipeline

Now, we'll execute the ingestion pipeline. This script is responsible for:
1.  Calling the mock API to fetch raw transaction data.
2.  Combining the data from all accounts into a single DataFrame.
3.  Writing the raw data to the **bronze** Delta table.

In [8]:
# Run the ingestion script as a Python module
!python -m ingest_statements

Fetching data for account number: 010-30800-0095971396 (Id: 001_statement_a)
Fetching data for account number: 010-30800-0095983938 (Id: 002_statement_b)
Writing 1217 transactions to bronze Delta table at /Users/thiago.lopes/dev/nu/tmp/keep/pipelines/data_lake/bronze...
Bronze ingestion complete.


## Step 3: Run the Transformation Pipeline (First Run)

This is the core of our ETL process. In this first run, the script will:
1.  Read the raw data from the bronze table.
2.  Perform data cleaning and standardization.
3.  Since no silver table or ledger exists, it will **create** them and load the cleaned data.

In [10]:
# Run the transformation script
!python -m transform_statements

Read 1217 rows from the bronze layer.
Merging 1217 cleaned rows into the silver Delta table...
Silver table not found, creating new one.
Silver layer merge complete.
Updating scoring status ledger...
Ledger not found, creating new one.
New ledger created.


In [13]:
# Read the contents of the newly created tables
silver_df_run1 = read_delta_table(SILVER_PATH, "Silver")
ledger_df_run1 = read_delta_table(LEDGER_PATH, "Application Status Ledger")

--- Contents of Silver table ---
     account_balance             account_name        account_number  \
0             2016.7  No Fee Chequing Account  010-30800-0095971396   
1             2016.7  No Fee Chequing Account  010-30800-0095971396   
2             2016.7  No Fee Chequing Account  010-30800-0095971396   
3             2016.7  No Fee Chequing Account  010-30800-0095971396   
4             2016.7  No Fee Chequing Account  010-30800-0095971396   
...              ...                      ...                   ...   
1212          418.52  No Fee Chequing Account  010-30800-0095983938   
1213          418.52  No Fee Chequing Account  010-30800-0095983938   
1214          418.52  No Fee Chequing Account  010-30800-0095983938   
1215          418.52  No Fee Chequing Account  010-30800-0095983938   
1216          418.52  No Fee Chequing Account  010-30800-0095983938   

     account_type                               address   amount  balance  \
0       Operation  36 HOLKHAM AVE, AN

## Step 5: Test for Idempotency (Second Run)

This is the most critical test. We will run the exact same transformation pipeline again.

A non-idempotent pipeline would either fail or create duplicate records. Our Delta Lake architecture, however, should handle this gracefully. The `MERGE` operation will check for existing records and, finding that they are already present, will not insert any new data.

**Expected Outcome:** The pipeline will run successfully, but the contents and size of the silver table and the ledger should remain identical.

In [7]:
# Run the transformation script for a second time
!python -m pipelines.transform_statements

Read 1217 rows from the bronze layer.
Merging 1217 cleaned rows into the silver Delta table...
Silver layer merge complete.
Updating scoring status ledger...
Ledger merge complete. 1 records processed.


## Step 6: Final Validation

Now, we will read the tables again and compare their state to after the first run. If the number of rows is unchanged, we have successfully proven that our pipeline is idempotent.

In [8]:
import time

print("Waiting a moment for file system to update...")
time.sleep(1) # Small delay to ensure file modification times are updated

silver_df_run2 = read_delta_table(SILVER_PATH, "Silver (after 2nd run)")
ledger_df_run2 = read_delta_table(LEDGER_PATH, "Application Status Ledger (after 2nd run)")

# Perform the validation checks
print("\n--- Validation Results ---")
if len(silver_df_run1) == len(silver_df_run2):
    print("✅ Silver Table: Row count is unchanged. Idempotency test PASSED.")
else:
    print("❌ Silver Table: Row count has changed. Idempotency test FAILED.")

if len(ledger_df_run1) == len(ledger_df_run2):
    print("✅ Ledger: Row count is unchanged. Idempotency test PASSED.")
else:
    print("❌ Ledger: Row count has changed. Idempotency test FAILED.")
print("-" * 26)

Waiting a moment for file system to update...
--- Contents of Silver (after 2nd run) table ---
     account_balance             account_name        account_number  \
0             2016.7  No Fee Chequing Account  010-30800-0095971396   
1             2016.7  No Fee Chequing Account  010-30800-0095971396   
2             2016.7  No Fee Chequing Account  010-30800-0095971396   
3             2016.7  No Fee Chequing Account  010-30800-0095971396   
4             2016.7  No Fee Chequing Account  010-30800-0095971396   
...              ...                      ...                   ...   
1212          418.52  No Fee Chequing Account  010-30800-0095983938   
1213          418.52  No Fee Chequing Account  010-30800-0095983938   
1214          418.52  No Fee Chequing Account  010-30800-0095983938   
1215          418.52  No Fee Chequing Account  010-30800-0095983938   
1216          418.52  No Fee Chequing Account  010-30800-0095983938   

     account_type                               addr