# Complete Banking Reconciliation Setup - Phase 1
## MinIO S3-backed Iceberg Catalog

## 🎯 Learning Objectives

This notebook demonstrates how to set up Apache Iceberg with a **MinIO S3-compatible object storage catalog** for a banking reconciliation system. You will learn:

### **Apache Iceberg Fundamentals**
- **Catalog System**: Understanding how Iceberg catalogs work with S3/MinIO
- **MinIO Storage**: How Iceberg stores metadata and data in MinIO buckets
- **Table Structure**: The file organization created by Iceberg tables in S3
- **Partitioning**: How Iceberg handles partitioned tables

### **What You'll Build**
- A complete banking reconciliation system with three core tables
- S3-based storage with MinIO integration
- Partitioned tables for efficient querying
- Comprehensive audit trail and validation

### **Key Concepts Explained**

#### **1. Iceberg Catalog with MinIO (S3)**
```python
spark.sql.catalog.minio.type = "hadoop"
spark.sql.catalog.minio.warehouse = "s3a://warehouse/"
```
- **Catalog**: A namespace for tables, schemas, and functions
- **MinIO**: S3-compatible object storage for all table data and metadata
- **Warehouse**: Data and metadata stored in MinIO buckets

#### **2. File Structure Created by Iceberg in S3**

```
s3a://warehouse/banking/table_name/
├── metadata/ # Table metadata and version history
│ ├── v1.metadata.json # Current table schema and properties
│ ├── version-hint.text # Points to latest metadata version
│ └── ..crc # Checksums for data integrity
├── data/ # Actual data files (Parquet format)
│ └── partition_paths/ # Partitioned data organized by partition keys
└── snapshots/ # Table snapshots for time travel
```


#### **3. Partitioning Strategy**
- **source_transactions**: Partitioned by `days(transaction_date)` and `source_system`
- **reconciliation_results**: Partitioned by `days(reconciliation_timestamp)` and `match_status`
- **reconciliation_batches**: No partitioning (small lookup table)

## Phase 1: Foundation Setup


## Step 1: Import Required Libraries

**Purpose**: Import all necessary libraries for Spark, MinIO, and file operations.

**Key Libraries**:
- `pyspark.sql.SparkSession`: Core Spark functionality
- `boto3`: AWS S3/MinIO client for object storage
- `os`, `json`, `datetime`: File and data manipulation utilities

In [1]:
# Import required libraries
!pip install --root-user-action=ignore rich --quiet
from rich import print
import pyspark
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
import pandas as pd

# This CATALOG_URL works for the "docker compose" testing and development environment
# Change 'lakekeeper' if you are not running on "docker compose" (f. ex. 'localhost' if Lakekeeper is running locally).
CATALOG_URL = "http://lakekeeper:8181/catalog"
WAREHOUSE = "irisa"

SPARK_VERSION = pyspark.__version__
SPARK_MINOR_VERSION = '.'.join(SPARK_VERSION.split('.')[:2])
ICEBERG_VERSION = "1.9.2"

print(f"Spark Version: {SPARK_VERSION} - Spark Minor Version: {SPARK_MINOR_VERSION} - Iceberg Version: {ICEBERG_VERSION}")

## Step 2: Stop any existing Spark session

**Purpose**: Ensure a clean Spark environment by stopping any existing sessions.

**Why This Matters**:
- Prevents configuration conflicts
- Ensures fresh Iceberg catalog initialization
- Clears any cached metadata or connections

In [2]:
# Stop any existing Spark session
try:
    SparkSession.builder.getOrCreate().stop()
    print("✓ Stopped existing Spark session")
except:
    print("ℹ No existing Spark session to stop")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/21 11:32:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Step 3: Create Spark Session with Iceberg + MinIO (S3) Configuration

**Purpose**: Initialize Spark with Apache Iceberg extensions and MinIO S3-compatible catalog configuration.

### **Configuration Breakdown**:

#### **Iceberg Extensions**
```python
spark.sql.extensions = "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
```
- Enables Iceberg-specific SQL commands (CREATE TABLE, MERGE, etc.)
- Provides time travel, schema evolution, and partition evolution capabilities

#### **Catalog Configuration**
```python
spark.sql.catalog.minio = "org.apache.iceberg.spark.SparkCatalog"
spark.sql.catalog.minio.type = "hadoop"
spark.sql.catalog.minio.warehouse = "s3a://warehouse/"
spark.hadoop.fs.s3a.endpoint = "http://minio:9000"
spark.hadoop.fs.s3a.access.key = "minio"
spark.hadoop.fs.s3a.secret.key = "minio123"
spark.hadoop.fs.s3a.path.style.access = "true"
spark.hadoop.fs.s3a.impl = "org.apache.hadoop.fs.s3a.S3AFileSystem"
```
- **minio**: Our custom catalog name for S3/MinIO
- **warehouse**: S3 bucket path for table data and metadata

#### **Default Catalog**
- You will use `minio` as the catalog name in all SQL operations.

In [3]:
# Create warehouse directory if it doesn't exist
config = {
    f"spark.sql.catalog.lakekeeper": "org.apache.iceberg.spark.SparkCatalog",
    f"spark.sql.catalog.lakekeeper.type": "rest",
    f"spark.sql.catalog.lakekeeper.uri": CATALOG_URL,
    f"spark.sql.catalog.lakekeeper.warehouse": WAREHOUSE,
    f"spark.sql.catalog.lakekeeper.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
    "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    "spark.sql.defaultCatalog": "lakekeeper",
    "spark.jars.packages": f"org.apache.iceberg:iceberg-spark-runtime-{SPARK_MINOR_VERSION}_2.12:{ICEBERG_VERSION},org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION}",
}

spark_config = SparkConf().setMaster('spark://spark-master:7077').setAppName("Iceberg-REST-Cluster-Banking-Sample-Phase1")
for k, v in config.items():
    spark_config = spark_config.set(k, v)

spark = SparkSession.builder.config(conf=spark_config).getOrCreate()

spark.sql("USE lakekeeper")
print("✓ Spark session created successfully")
print(f"Spark version: {spark.version}")
print(f"Default catalog: {spark.conf.get('spark.sql.defaultCatalog')}")


Py4JJavaError: An error occurred while calling o157.sql.
: org.apache.iceberg.exceptions.RESTException: Unable to process: Warehouse irisa not found
	at org.apache.iceberg.rest.ErrorHandlers$DefaultErrorHandler.accept(ErrorHandlers.java:248)
	at org.apache.iceberg.rest.ErrorHandlers$DefaultErrorHandler.accept(ErrorHandlers.java:212)
	at org.apache.iceberg.rest.HTTPClient.throwFailure(HTTPClient.java:215)
	at org.apache.iceberg.rest.HTTPClient.execute(HTTPClient.java:299)
	at org.apache.iceberg.rest.BaseHTTPClient.get(BaseHTTPClient.java:77)
	at org.apache.iceberg.rest.RESTSessionCatalog.fetchConfig(RESTSessionCatalog.java:1021)
	at org.apache.iceberg.rest.RESTSessionCatalog.initialize(RESTSessionCatalog.java:202)
	at org.apache.iceberg.rest.RESTCatalog.initialize(RESTCatalog.java:82)
	at org.apache.iceberg.CatalogUtil.loadCatalog(CatalogUtil.java:277)
	at org.apache.iceberg.CatalogUtil.buildIcebergCatalog(CatalogUtil.java:331)
	at org.apache.iceberg.spark.SparkCatalog.buildIcebergCatalog(SparkCatalog.java:153)
	at org.apache.iceberg.spark.SparkCatalog.initialize(SparkCatalog.java:752)
	at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:65)
	at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:54)
	at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
	at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:54)
	at org.apache.spark.sql.connector.catalog.LookupCatalog$CatalogAndNamespace$.unapply(LookupCatalog.scala:86)
	at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs$$anonfun$apply$1.applyOrElse(ResolveCatalogs.scala:51)
	at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs$$anonfun$apply$1.applyOrElse(ResolveCatalogs.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:170)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:170)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$4(AnalysisHelper.scala:175)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1216)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1215)
	at org.apache.spark.sql.catalyst.plans.logical.SetCatalogAndNamespace.mapChildren(v2Commands.scala:941)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:175)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:99)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:96)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:76)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:75)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs.apply(ResolveCatalogs.scala:30)
	at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs.apply(ResolveCatalogs.scala:27)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:240)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:236)
	at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:187)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:236)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:202)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:223)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:222)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:77)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:77)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:638)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:629)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:659)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Unknown Source)


## Step 4: Initialize MinIO Client and Check Health

**Purpose**: Set up MinIO (S3-compatible object storage) for future data operations.

### **MinIO Configuration**:
- **Endpoint**: `http://minio:9000` (Docker service name)
- **Credentials**: `minio`/`minio123` (default development credentials)
- **Region**: `us-east-1` (standard region)
- **Signature Version**: `s3v4` (AWS Signature Version 4)

### **Health Check Strategy**:
- Retry logic with exponential backoff
- Graceful handling of connection failures
- Detailed logging for troubleshooting

In [None]:
!pip3 install boto3

In [None]:

import boto3
from botocore.client import Config
# Initialize MinIO client
s3_client = boto3.client(
    's3',
    endpoint_url='http://minio:9000',
    aws_access_key_id='minio',
    aws_secret_access_key='minio123',
    config=Config(signature_version='s3v4'),
    region_name='us-east-1'
)
# Initialize MinIO client
s3_client = boto3.client(
    's3',
    endpoint_url='http://minio:9000',
    aws_access_key_id='minio-root-user',
    aws_secret_access_key='minio-root-password',
    config=Config(signature_version='s3v4'),
    region_name='us-east-1'
)

print("✓ MinIO client initialized")
print(f"Endpoint: http://minio:9000")
print(f"Signature Version: s3v4")

In [None]:
# Check MinIO health with retries
print("Checking MinIO health...")
max_attempts = 30
attempt = 1

while attempt <= max_attempts:
    try:
        # Try to list buckets to check connectivity
        buckets = s3_client.list_buckets()
        print(f"✓ MinIO is ready! Found {len(buckets['Buckets'])} existing buckets")
        break
    except Exception as e:
        print(f"Waiting for MinIO... (attempt {attempt}/{max_attempts}): {str(e)}")
        if attempt >= max_attempts:
            print("⚠ MinIO failed to start within the expected time. Continuing anyway...")
            break
        time.sleep(2)
        attempt += 1

## Step 5: Initialize MinIO Buckets

**Purpose**: Create the required S3 buckets for different stages of data processing.

### **Bucket Strategy**:

| Bucket | Purpose | Data Type |
|--------|---------|-----------|
| `warehouse` | Iceberg table data and metadata | Structured data |
| `raw-data` | Original source files | CSV, JSON, etc. |
| `stage-data` | Intermediate processing data | Transformed data |
| `reconciled-data` | Final reconciliation results | Processed results |

### **Benefits of This Structure**:
- **Data Lineage**: Clear separation of data stages
- **Cost Optimization**: Different retention policies per bucket
- **Security**: Granular access controls per bucket
- **Performance**: Optimized storage for each data type

In [None]:
# List existing buckets
try:
    existing_buckets = [bucket['Name'] for bucket in s3_client.list_buckets()['Buckets']]
    print(f"Existing buckets: {existing_buckets}")
except Exception as e:
    print(f"Error listing buckets: {str(e)}")
    existing_buckets = []

# Define required buckets with their purposes
bucket_purposes = {
    'warehouse': 'Iceberg table data and metadata',
    'raw-data': 'Original source files (CSV, JSON)',
    'stage-data': 'Intermediate processing data',
    'reconciled-data': 'Final reconciliation results'
}

# Create buckets if they don't exist
created_buckets = []
for bucket, purpose in bucket_purposes.items():
    if bucket not in existing_buckets:
        try:
            s3_client.create_bucket(Bucket=bucket)
            created_buckets.append(bucket)
            print(f"✓ Created bucket: {bucket} ({purpose})")
        except Exception as e:
            print(f"✗ Error creating bucket {bucket}: {str(e)}")
    else:
        print(f"ℹ Bucket already exists: {bucket} ({purpose})")

print(f"\n📊 Bucket Summary:")
print(f"- Total buckets: {len(existing_buckets) + len(created_buckets)}")
print(f"- Newly created: {len(created_buckets)}")
print(f"- Already existed: {len(existing_buckets)}")

## Step 6: Create Banking Namespace

**Purpose**: Create a logical namespace to organize related tables in MinIO.

### **Namespace Benefits**:
- **Organization**: Groups related tables together
- **Access Control**: Apply permissions at namespace level
- **Naming**: Avoid table name conflicts
- **Discovery**: Easier to find related tables

### **S3 Impact**:
The namespace creates a directory structure in your MinIO bucket:
```
s3a://warehouse/banking/
├── source_transactions/
├── reconciliation_results/
└── reconciliation_batches/
```


In [None]:
# Create the banking namespace
try:
    spark.sql("CREATE NAMESPACE IF NOT EXISTS lakekeeper.banking")
    print("✓ Created namespace: lakekeeper.banking")
    print(f"Namespace location: lakekeeper/banking")
except Exception as e:
    print(f"✗ Error creating namespace: {str(e)}")

## Step 6b: Verify Namespace Creation

**Purpose**: Confirm that the namespace directory structure exists in your MinIO bucket.

> Note: You can use MinIO Console (http://localhost:9001) or `mc` CLI to browse the S3 structure:
> - `s3a://warehouse/banking/`

## Step 7: Create Iceberg Tables in MinIO

**Purpose**: Create the core tables for the banking reconciliation system in the MinIO S3 bucket.

### **Table Design Strategy**:

#### **1. source_transactions**
- **Purpose**: Store all transaction data from different source systems
- **Partitioning**: By `days(transaction_date)` and `source_system`
- **Benefits**: Efficient querying by date range and source
- **Schema**: Comprehensive transaction metadata

#### **2. reconciliation_results**
- **Purpose**: Store reconciliation outcomes and discrepancies
- **Partitioning**: By `days(reconciliation_timestamp)` and `match_status`
- **Benefits**: Easy analysis of reconciliation performance
- **Schema**: Detailed discrepancy tracking

#### **3. reconciliation_batches**
- **Purpose**: Track reconciliation batch metadata
- **Partitioning**: None (small lookup table)
- **Benefits**: Batch-level monitoring and reporting
- **Schema**: Batch execution metadata

> All tables will be created under the `minio.banking` namespace and stored in your MinIO S3 bucket.

In [None]:
# Create source_transactions table
print("Creating source_transactions table...")
try:
    spark.sql("""
    CREATE TABLE IF NOT EXISTS lakekeeper.banking.source_transactions (
      transaction_id STRING,
      source_system STRING,
      transaction_date TIMESTAMP,
      amount DECIMAL(18,2),
      account_id STRING,
      transaction_type STRING,
      reference_id STRING,
      status STRING,
      payload STRING,
      created_at TIMESTAMP,
      processing_timestamp TIMESTAMP
    )
    USING iceberg
    PARTITIONED BY (days(transaction_date), source_system)
    """)
    print("✓ Created table: lakekeeper.banking.source_transactions")
    print("  - Partitioned by: days(transaction_date), source_system")
    print("  - Purpose: Store all transaction data from different sources")
except Exception as e:
    print(f"✗ Error creating source_transactions table: {str(e)}")

In [None]:
# Create reconciliation_results table
print("Creating reconciliation_results table...")
try:
    spark.sql("""
    CREATE TABLE IF NOT EXISTS lakekeeper.banking.reconciliation_results (
      reconciliation_id STRING,
      batch_id STRING,
      primary_transaction_id STRING,
      secondary_transaction_id STRING,
      match_status STRING,
      discrepancy_type STRING,
      discrepancy_amount DECIMAL(18,2),
      reconciliation_timestamp TIMESTAMP,
      notes STRING
    )
    USING iceberg
    PARTITIONED BY (days(reconciliation_timestamp), match_status)
    """)
    print("✓ Created table: lakekeeper.banking.reconciliation_results")
    print("  - Partitioned by: days(reconciliation_timestamp), match_status")
    print("  - Purpose: Store reconciliation outcomes and discrepancies")
except Exception as e:
    print(f"✗ Error creating reconciliation_results table: {str(e)}")

In [None]:
# Create reconciliation_batches table
print("Creating reconciliation_batches table...")
try:
    spark.sql("""
    CREATE TABLE IF NOT EXISTS lakekeeper.banking.reconciliation_batches (
      batch_id STRING,
      reconciliation_date TIMESTAMP,
      source_systems ARRAY<STRING>,
      start_date TIMESTAMP,
      end_date TIMESTAMP,
      status STRING,
      total_transactions BIGINT,
      matched_count BIGINT,
      unmatched_count BIGINT,
      created_at TIMESTAMP,
      completed_at TIMESTAMP
    )
    USING iceberg
    """)
    print("✓ Created table: minio.banking.reconciliation_batches")
    print("  - Partitioned by: None (small lookup table)")
    print("  - Purpose: Track reconciliation batch metadata")
except Exception as e:
    print(f"✗ Error creating reconciliation_batches table: {str(e)}")

## Step 8: Verify Tables and Audit Setup

**Purpose**: Validate that all tables were created correctly in MinIO and examine the Iceberg file structure.

### **What We'll Verify**:
1. **Table Existence**: Confirm all tables are created in the `minio.banking` namespace
2. **File Structure**: Examine Iceberg metadata and data directories in S3
3. **Schema Validation**: Check table schemas are correct
4. **Accessibility**: Test basic queries on each table

In [None]:
# List tables to verify
print("📋 Verifying created tables...")
tables_df = spark.sql("SHOW TABLES IN lakekeeper.banking")
tables_df.show()

# Count tables
table_count = tables_df.count()
print(f"\n📊 Table Summary:")
print(f"- Total tables in banking namespace: {table_count}")
print(f"- Expected tables: 3")
print(f"- Status: {'✓ PASS' if table_count >= 3 else '✗ FAIL'}")

In [None]:
# Test a simple query on each table
print("🔍 Testing table accessibility...")

tables_to_test = [
    'lakekeeper.banking.source_transactions',
    'lakekeeper.banking.reconciliation_results',
    'lakekeeper.banking.reconciliation_batches'
]

for table in tables_to_test:
    try:
        result = spark.sql(f"SELECT COUNT(*) as count FROM {table}")
        count = result.collect()[0]['count']
        print(f"✓ {table}: {count} rows")
    except Exception as e:
        print(f"✗ {table}: Error - {str(e)}")

## Step 9: Examine Iceberg File Structure in MinIO

**Purpose**: Understand how Iceberg organizes files and metadata in your MinIO S3 bucket.

### **Iceberg File Organization in S3**:

Each Iceberg table creates this structure in your MinIO bucket:
```
s3a://warehouse/banking/table_name/
├── metadata/              # Table metadata and version history
│   ├── v1.metadata.json  # Current table schema and properties
│   ├── version-hint.text # Points to latest metadata version
│   └── .*.crc           # Checksums for data integrity
├── data/                # Actual data files (Parquet format)
│   └── partition_paths/ # Partitioned data organized by partition keys
└── snapshots/           # Table snapshots for time travel
```
> Use the MinIO Console (http://localhost:9001) or mc CLI to browse and inspect these files.

### **Key Files Explained**:
- **v1.metadata.json**: Contains table schema, partitioning, and properties
- **version-hint.text**: Points to the current metadata version
- **.crc files**: Checksums to ensure data integrity
- **data/**: Contains actual Parquet files with the data
- **snapshots/**: Enables time travel and rollback capabilities

In [None]:
bucket_name = 'warehouse'
namespace_prefix = 'banking/'

def list_s3_prefix(prefix, title):
    print(f"\n🔍 {title}")
    response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix, Delimiter='/')
    if 'Contents' in response:
        for obj in response['Contents']:
            print(obj['Key'])
    else:
        print("No objects found.")

print("📁 Examining Iceberg file structure...")

# Banking namespace structure
list_s3_prefix(namespace_prefix, "Banking namespace structure")

# source_transactions table structure
list_s3_prefix(namespace_prefix + 'source_transactions/', "source_transactions table structure")

# source_transactions metadata
list_s3_prefix(namespace_prefix + 'source_transactions/metadata/', "source_transactions metadata")

# reconciliation_results table structure
list_s3_prefix(namespace_prefix + 'reconciliation_results/', "reconciliation_results table structure")

# reconciliation_batches table structure
list_s3_prefix(namespace_prefix + 'reconciliation_batches/', "reconciliation_batches table structure")

In [None]:
# Examine the metadata file to understand table schema and properties
bucket_name = 'warehouse'
metadata_prefix = 'banking/source_transactions/metadata/'

def read_s3_object(key, title):
    print(f"\n🔍 {title}")
    try:
        obj = s3_client.get_object(Bucket=bucket_name, Key=key)
        content = obj['Body'].read().decode('utf-8')
        print(content)
    except Exception as e:
        print(f"Error reading {key}: {e}")

print("📄 Examining table metadata...")

# Read v1.metadata.json
read_s3_object(metadata_prefix + 'v1.metadata.json', "source_transactions metadata content")

# Read version-hint.text
read_s3_object(metadata_prefix + 'version-hint.text', "version hint file")

## Step 10: Phase 1 Summary and Next Steps

**Purpose**: Provide a comprehensive summary of what was accomplished and prepare for the next phase.

### **What We've Accomplished**:

#### **✅ Infrastructure Setup**
- Spark session with Iceberg extensions
- Local catalog with Hive metastore
- MinIO object storage buckets

#### **✅ Data Architecture**
- Banking namespace for organization
- Three core tables with proper schemas
- Partitioning strategy for performance

#### **✅ File Structure**
- Iceberg metadata and data directories
- Proper table organization
- Version control and checksums

### **Next Phase Preview**:
- **Data Generation**: Create sample transaction data
- **Data Ingestion**: Load data into Iceberg tables
- **Reconciliation Logic**: Implement matching algorithms
- **Testing**: Validate the complete system

In [None]:
# Generate setup summary
print("\n📚 Learning Summary:")
print("✅ Understanding of Iceberg local catalog with Hive metastore")
print("✅ Knowledge of Iceberg file structure and metadata organization")
print("✅ Experience with table partitioning and schema design")
print("✅ Familiarity with MinIO object storage integration")
print("\n🚀 Next: Run Phase 2 notebook for data generation and population.")

In [None]:
spark.stop()