# Apache Iceberg Banking Reconciliation Demo

This notebook demonstrates the key features of the Apache Iceberg Banking Reconciliation System.

## 1. Setup

First, let's initialize our Spark session with Iceberg configuration.

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, when, lit
import uuid
from datetime import datetime, timedelta
import os

# Create warehouse directory if it doesn't exist
warehouse_dir = "/opt/bitnami/spark/warehouse"
os.makedirs(warehouse_dir, exist_ok=True)

# Create Spark session with Iceberg configuration
spark = SparkSession.builder \
    .appName("Banking Reconciliation Demo") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hive") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", f"file://{warehouse_dir}") \
    .config("spark.sql.defaultCatalog", "local") \
    .getOrCreate()

print("Spark session created successfully with local file system warehouse")

25/05/09 20:22:35 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


## 2. Explore Iceberg Tables

Let's explore the Iceberg tables we've created.

In [8]:
# List all tables in the banking namespace
spark.sql("SHOW TABLES IN local.banking").show()

IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: s3a://warehousebanking

In [10]:
# Describe the source_transactions table
spark.sql("DESCRIBE TABLE local.banking.source_transactions").show()

AnalysisException: Table or view not found: local.banking.source_transactions; line 1 pos 15;
'DescribeRelation false, [col_name#33, data_type#34, comment#35]
+- 'UnresolvedTableOrView [local, banking, source_transactions], DESCRIBE TABLE, true


In [11]:
# Check the number of transactions by source system
spark.sql("""
    SELECT source_system, COUNT(*) as transaction_count
    FROM local.banking.source_transactions
    GROUP BY source_system
    ORDER BY source_system
""").show()

AnalysisException: Table or view not found: local.banking.source_transactions; line 3 pos 9;
'Sort ['source_system ASC NULLS FIRST], true
+- 'Aggregate ['source_system], ['source_system, count(1) AS transaction_count#36L]
   +- 'UnresolvedRelation [local, banking, source_transactions], [], false


## 3. Demonstrate Iceberg Features

### 3.1 Schema Evolution

Let's demonstrate schema evolution by adding a new column to the source_transactions table.

In [None]:
# Add a new column to the source_transactions table
spark.sql("""
    ALTER TABLE local.banking.source_transactions
    ADD COLUMN transaction_category STRING
""").show()

In [None]:
# Verify the new column was added
spark.sql("DESCRIBE TABLE local.banking.source_transactions").show()

In [None]:
# Update some records with the new column
spark.sql("""
    UPDATE local.banking.source_transactions
    SET transaction_category = 
        CASE 
            WHEN transaction_type = 'deposit' THEN 'INCOME'
            WHEN transaction_type = 'withdrawal' THEN 'EXPENSE'
            WHEN transaction_type = 'transfer' THEN 'TRANSFER'
            WHEN transaction_type = 'payment' THEN 'EXPENSE'
            WHEN transaction_type = 'refund' THEN 'INCOME'
            WHEN transaction_type = 'fee' THEN 'FEE'
            ELSE 'OTHER'
        END
    WHERE source_system = 'core_banking'
""").show()

In [None]:
# Query the data with the new column
spark.sql("""
    SELECT transaction_type, transaction_category, COUNT(*) as count
    FROM local.banking.source_transactions
    WHERE source_system = 'core_banking'
    GROUP BY transaction_type, transaction_category
    ORDER BY transaction_type
""").show()

### 3.2 Time Travel

Let's demonstrate time travel by querying the table at different points in time.

In [None]:
# Get the current snapshot information
spark.sql("""
    SELECT * FROM local.banking.source_transactions.snapshots
    ORDER BY committed_at DESC
    LIMIT 5
""").show()

In [None]:
# Store the timestamp of the snapshot before our update
snapshots = spark.sql("""
    SELECT * FROM local.banking.source_transactions.snapshots
    ORDER BY committed_at DESC
    LIMIT 2
""").collect()

# Get the timestamp of the previous snapshot
if len(snapshots) >= 2:
    previous_snapshot_timestamp = snapshots[1]["committed_at"]
    print(f"Previous snapshot timestamp: {previous_snapshot_timestamp}")

In [None]:
# Query the table as of the previous snapshot (before adding the new column)
if 'previous_snapshot_timestamp' in locals():
    spark.sql(f"""
        SELECT transaction_type, COUNT(*) as count
        FROM local.banking.source_transactions
        FOR TIMESTAMP AS OF '{previous_snapshot_timestamp}'
        WHERE source_system = 'core_banking'
        GROUP BY transaction_type
        ORDER BY transaction_type
    """).show()
    
    # This would fail because the column didn't exist in the previous snapshot
    try:
        spark.sql(f"""
            SELECT transaction_category, COUNT(*) as count
            FROM local.banking.source_transactions
            FOR TIMESTAMP AS OF '{previous_snapshot_timestamp}'
            WHERE source_system = 'core_banking'
            GROUP BY transaction_category
            ORDER BY transaction_category
        """).show()
    except Exception as e:
        print(f"Error (expected): {str(e)}")

### 3.3 Partition Evolution

Let's demonstrate partition evolution by changing the partition spec.

In [None]:
# Check the current partition spec
spark.sql("""
    SELECT * FROM local.banking.source_transactions.partitions
""").show()

In [None]:
# Add a new partition field
spark.sql("""
    ALTER TABLE local.banking.source_transactions
    ADD PARTITION FIELD transaction_category
""").show()

In [None]:
# Check the updated partition spec
spark.sql("""
    SELECT * FROM local.banking.source_transactions.partitions
""").show()

## 4. Run a Reconciliation Process

Let's run a reconciliation process to match transactions across systems.

In [None]:
# Import necessary modules
import sys
sys.path.append('/opt/spark')

from src.main.python.etl.extractors import TransactionExtractor
from src.main.python.etl.transformers import TransactionTransformer
from src.main.python.reconciliation.matcher import TransactionMatcher
from src.main.python.reconciliation.reporter import ReconciliationReporter
from src.main.python.etl.loaders import IcebergLoader

In [None]:
# Define reconciliation parameters
batch_id = f"DEMO-{uuid.uuid4().hex[:8]}"
source_systems = ['core_banking', 'card_processor']
end_date = datetime.now()
start_date = end_date - timedelta(days=30)

print(f"Running reconciliation for batch {batch_id}")
print(f"Source systems: {source_systems}")
print(f"Date range: {start_date} to {end_date}")

In [None]:
# Register reconciliation batch
batch_df = spark.createDataFrame([{
    "batch_id": batch_id,
    "reconciliation_date": datetime.now(),
    "source_systems": source_systems,
    "start_date": start_date,
    "end_date": end_date,
    "status": "IN_PROGRESS",
    "total_transactions": 0,
    "matched_count": 0,
    "unmatched_count": 0,
    "created_at": datetime.now(),
    "completed_at": None
}])

loader = IcebergLoader(spark)
loader.load_reconciliation_batch(batch_df)

In [None]:
# Extract transactions
extractor = TransactionExtractor(spark)
transactions_by_source = extractor.extract_transactions_for_reconciliation(
    source_systems, start_date, end_date
)

# Print transaction counts
for source, df in transactions_by_source.items():
    print(f"{source}: {df.count()} transactions")

In [None]:
# Transform transactions
transformer = TransactionTransformer(spark)
prepared_transactions = transformer.prepare_for_reconciliation(transactions_by_source)

# Get primary and secondary DataFrames
primary_source = source_systems[0]
secondary_source = source_systems[1]
primary_df = prepared_transactions[primary_source]
secondary_df = prepared_transactions[secondary_source]

In [None]:
# Match transactions
matcher = TransactionMatcher(spark)
matched_df, unmatched_primary_df, unmatched_secondary_df = matcher.match_transactions(
    primary_df, secondary_df, match_strategy="hybrid"
)

# Print matching results
print(f"Matched: {matched_df.count()} transactions")
print(f"Unmatched in {primary_source}: {unmatched_primary_df.count()} transactions")
print(f"Unmatched in {secondary_source}: {unmatched_secondary_df.count()} transactions")

In [None]:
# Create reconciliation results
results_df = matcher.create_reconciliation_results(
    batch_id,
    matched_df,
    unmatched_primary_df,
    unmatched_secondary_df,
    primary_source,
    secondary_source
)

# Save reconciliation results
loader.load_reconciliation_results(results_df)

In [None]:
# Generate reports
reporter = ReconciliationReporter(spark)
summary_report = reporter.generate_summary_report(results_df)
discrepancy_report = reporter.generate_discrepancy_report(results_df)

# Display summary report
summary_report.show()

In [None]:
# Display discrepancy report (first 10 rows)
discrepancy_report.show(10)

In [None]:
# Update batch status
spark.sql(f"""
    UPDATE local.banking.reconciliation_batches
    SET 
        status = 'COMPLETED',
        matched_count = {matched_df.count()},
        unmatched_count = {unmatched_primary_df.count() + unmatched_secondary_df.count()},
        total_transactions = {matched_df.count() + unmatched_primary_df.count() + unmatched_secondary_df.count()},
        completed_at = CURRENT_TIMESTAMP()
    WHERE batch_id = '{batch_id}'
""")

## 5. Demonstrate ACID Transactions

Let's demonstrate ACID transactions by performing a multi-statement transaction.

In [None]:
# Start a transaction
spark.sql("START TRANSACTION")

try:
    # Update some transactions
    spark.sql("""
        UPDATE local.banking.source_transactions
        SET status = 'completed'
        WHERE status = 'pending' AND source_system = 'core_banking'
    """)
    
    # Insert a new reconciliation batch
    new_batch_id = f"ACID-{uuid.uuid4().hex[:8]}"
    spark.sql(f"""
        INSERT INTO local.banking.reconciliation_batches VALUES (
            '{new_batch_id}',
            CURRENT_TIMESTAMP(),
            ARRAY('core_banking', 'payment_gateway'),
            TIMESTAMP('{start_date}'),
            TIMESTAMP('{end_date}'),
            'PENDING',
            0,
            0,
            0,
            CURRENT_TIMESTAMP(),
            NULL
        )
    """)
    
    # Commit the transaction
    spark.sql("COMMIT")
    print("Transaction committed successfully")
except Exception as e:
    # Rollback the transaction on error
    spark.sql("ROLLBACK")
    print(f"Transaction rolled back due to error: {str(e)}")

In [None]:
# Verify the changes
spark.sql(f"""
    SELECT * FROM local.banking.reconciliation_batches
    WHERE batch_id = '{new_batch_id}'
""").show()

## 6. Demonstrate Incremental Processing

Let's demonstrate incremental processing by processing only new transactions.

In [None]:
# Get the latest snapshot timestamp
latest_snapshot = spark.sql("""
    SELECT * FROM local.banking.source_transactions.snapshots
    ORDER BY committed_at DESC
    LIMIT 1
""").collect()[0]

latest_timestamp = latest_snapshot["committed_at"]
print(f"Latest snapshot timestamp: {latest_timestamp}")

In [None]:
# Create some new transactions
import pandas as pd
import random
from decimal import Decimal

# Generate 10 new transactions
new_transactions = []
for i in range(10):
    tx_id = f"NEW-{uuid.uuid4().hex[:8]}"
    source_system = "core_banking"
    tx_date = datetime.now()
    amount = Decimal(random.uniform(100, 1000)).quantize(Decimal('0.01'))
    account_id = f"ACC{random.randint(10000000, 99999999)}"
    tx_type = random.choice(["deposit", "withdrawal", "transfer", "payment"])
    ref_id = f"REF-{random.randint(1000000000, 9999999999)}"
    status = random.choice(["completed", "pending"])
    
    new_transactions.append({
        "transaction_id": tx_id,
        "source_system": source_system,
        "transaction_date": tx_date,
        "amount": amount,
        "account_id": account_id,
        "transaction_type": tx_type,
        "reference_id": ref_id,
        "status": status,
        "payload": "{}",
        "created_at": tx_date,
        "processing_timestamp": tx_date,
        "transaction_category": "NEW"
    })

# Create a DataFrame from the new transactions
new_tx_df = spark.createDataFrame(new_transactions)
new_tx_df.show(5)

In [None]:
# Load the new transactions incrementally
loader = IcebergLoader(spark)
loader.load_transactions_incrementally(
    new_tx_df, 
    "core_banking",
    snapshot_time=datetime.now()
)

In [None]:
# Query only the new transactions added since the last snapshot
spark.sql(f"""
    SELECT * 
    FROM local.banking.source_transactions
    WHERE _commit_time > TIMESTAMP('{latest_timestamp}')
    ORDER BY transaction_date DESC
""").show()

## 7. Conclusion

In this notebook, we've demonstrated the key features of the Apache Iceberg Banking Reconciliation System:

1. **Schema Evolution**: Adding new columns without rebuilding tables
2. **Time Travel**: Querying historical states of the data
3. **Partition Evolution**: Changing partition specifications
4. **ACID Transactions**: Ensuring consistency during updates
5. **Incremental Processing**: Processing only new data
6. **Reconciliation Process**: Matching transactions across systems

These features make Apache Iceberg an excellent choice for banking reconciliation systems, providing the reliability, flexibility, and performance needed for financial data processing.