# Excel to Azure Database Writer

This notebook reads the `ocr_results.xlsx` file generated by the credit OCR pipeline and writes the data to the Azure SQL Database.

In [7]:
# Standard library imports
import os
import sys
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Any, Optional

# Third-party imports
import pandas as pd
import pyodbc
from dotenv import load_dotenv

In [8]:
# Load environment variables
load_dotenv("azure.env")

# Get database credentials from environment variables
server = os.getenv("SQL_SERVER")
database = os.getenv("DB_NAME")
username = os.getenv("DB_USER")
password = os.getenv("DB_PASSWORD")
# Validate credentials
if not all([server, database, username, password]):
    raise ValueError("Missing required database credentials in environment variables")

# Create connection string
conn_str = (
    'DRIVER={ODBC Driver 17 for SQL Server};'
    f'SERVER={server};'
    f'DATABASE={database};'
    f'UID={username};'
    f'PWD={password}'
)

In [None]:
# Test database connection
try:
    conn = pyodbc.connect(conn_str, timeout=10)
    cursor = conn.cursor()
    cursor.execute("SELECT 1")
    print("Database connection successful")
except Exception as e:
    raise

In [None]:
# Read the Excel file
excel_file_path = Path("ocr_results.xlsx")

if not excel_file_path.exists():
    print(f"Excel file not found: {excel_file_path.absolute()}")
    print("Please run the credit_ocr_pipeline.ipynb first to generate the Excel file")
    raise FileNotFoundError(f"Excel file not found: {excel_file_path}")

print(f"Reading Excel file: {excel_file_path.absolute()}")

# Read all sheets from the Excel file
try:
    kreditantrag_df = pd.read_excel(excel_file_path, sheet_name="Kreditantrag")
    dokument_df = pd.read_excel(excel_file_path, sheet_name="Dokument")
    extrahierte_daten_df = pd.read_excel(excel_file_path, sheet_name="Extrahierte Daten")
    
    print(f"Successfully read Excel file:")
    print(f"  - Kreditantrag: {len(kreditantrag_df)} rows")
    print(f"  - Dokument: {len(dokument_df)} rows")
    print(f"  - Extrahierte Daten: {len(extrahierte_daten_df)} rows")
    
except Exception as e:
    print(f"Failed to read Excel file: {e}")
    raise

In [None]:
# Display sample data from each sheet
print("\n=== Sample Kreditantrag Data ===")
if not kreditantrag_df.empty:
    print(kreditantrag_df.head())
    print(f"\nColumns: {list(kreditantrag_df.columns)}")
else:
    print("No Kreditantrag data found")

print("\n=== Sample Dokument Data ===")
if not dokument_df.empty:
    print(dokument_df.head())
    print(f"\nColumns: {list(dokument_df.columns)}")
else:
    print("No Dokument data found")

print("\n=== Sample Extrahierte Daten ===")
if not extrahierte_daten_df.empty:
    print(extrahierte_daten_df.head())
    print(f"\nColumns: {list(extrahierte_daten_df.columns)}")
else:
    print("No Extrahierte Daten found")

In [None]:
# Check database schema to understand table structure
print("Checking database schema...")

# Check if tables exist
check_tables_sql = """
SELECT 
    TABLE_SCHEMA,
    TABLE_NAME,
    TABLE_TYPE
FROM INFORMATION_SCHEMA.TABLES 
WHERE TABLE_TYPE = 'BASE TABLE' AND TABLE_NAME IN ('Kreditantrag', 'Dokument', 'Extrahierte Daten')
ORDER BY TABLE_NAME
"""

try:
    cursor.execute(check_tables_sql)
    tables = cursor.fetchall()
    
    print(f"Found {len(tables)} relevant tables:")
    for table in tables:
        schema, name, table_type = table
        print(f"  - {schema}.{name}")
        
    # Get column information for each table
    for table_name in ['Kreditantrag', 'Dokument', 'Extrahierte Daten']:
        print(f"\n=== {table_name} Table Structure ===")
        columns_sql = f"""
        SELECT 
            COLUMN_NAME,
            DATA_TYPE,
            IS_NULLABLE,
            COLUMN_DEFAULT
        FROM INFORMATION_SCHEMA.COLUMNS 
        WHERE TABLE_NAME = '{table_name}'
        ORDER BY ORDINAL_POSITION
        """
        
        cursor.execute(columns_sql)
        columns = cursor.fetchall()
        
        if columns:
            for column in columns:
                name, data_type, nullable, default_val = column
                print(f"  {name}: {data_type} (nullable: {nullable}, default: {default_val})")
        else:
            print(f"  Table {table_name} not found")
            
except Exception as e:
    print(f"Failed to check database schema: {e}")
    raise

In [None]:
# Insert data into database tables
print("\n=== Inserting Data into Database ===")

# Track inserted IDs for foreign key relationships
inserted_ids = {
    'kreditantrag_ids': [],
    'dokument_ids': []
}

# 1. Insert Kreditantrag data
if not kreditantrag_df.empty:
    print("\n1. Inserting Kreditantrag data...")
    
    for index, row in kreditantrag_df.iterrows():
        try:
            # Prepare insert statement based on actual table structure
            insert_sql = """
            INSERT INTO Kreditantrag (
                Antragsstatus,
                Texterkennungsstatus,
                Kreditbetrag,
                Laufzeit,
                Zinsart,
                Zinssatz,
                Sondertilgung,
                Verwendungszweck,
                WeitereInformationen,
                Status,
                CustomerName,
                RequestType,
                Purpose
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            """
            
            # Extract values from DataFrame row
            values = (
                row.get('Antragsstatus', 'In Bearbeitung'),
                row.get('Texterkennungsstatus', 'Erkannt'),
                row.get('Kreditbetrag'),
                row.get('Laufzeit'),
                row.get('Zinsart', 'Festzins'),
                row.get('Zinssatz'),
                row.get('Sondertilgung'),
                row.get('Verwendungszweck'),
                row.get('WeitereInformationen'),
                row.get('Status', 'In Bearbeitung'),
                row.get('CustomerName'),
                row.get('RequestType'),
                row.get('Purpose')
            )
            
            cursor.execute(insert_sql, values)
            
            # Get the inserted ID
            cursor.execute("SELECT SCOPE_IDENTITY()")
            inserted_id = cursor.fetchone()[0]
            inserted_ids['kreditantrag_ids'].append(inserted_id)
            
            print(f"Inserted Kreditantrag with ID: {inserted_id}")
            
        except Exception as e:
            print(f"Failed to insert Kreditantrag row {index}: {e}")
            conn.rollback()
            continue
    
    conn.commit()
    print(f"Committed {len(inserted_ids['kreditantrag_ids'])} Kreditantrag records")
else:
    print("No Kreditantrag data to insert")

In [None]:
# 2. Insert Dokument data
if not dokument_df.empty and inserted_ids['kreditantrag_ids']:
    print("\n2. Inserting Dokument data...")
    
    for index, row in dokument_df.iterrows():
        try:
            # Use the first Kreditantrag ID for foreign key relationship
            kreditantrag_id = inserted_ids['kreditantrag_ids'][0]
            
            insert_sql = """
            INSERT INTO Dokument (
                Dokumententyp,
                Pfad_DMS,
                FK_Kreditantrag
            ) VALUES (?, ?, ?)
            """
            
            values = (
                row.get('Dokumententyp'),
                row.get('Pfad DMS'),
                kreditantrag_id
            )
            
            cursor.execute(insert_sql, values)
            
            # Get the inserted ID
            cursor.execute("SELECT SCOPE_IDENTITY()")
            inserted_id = cursor.fetchone()[0]
            inserted_ids['dokument_ids'].append(inserted_id)
            
            print(f"Inserted Dokument with ID: {inserted_id}")
            
        except Exception as e:
            print(f"Failed to insert Dokument row {index}: {e}")
            conn.rollback()
            continue
    
    conn.commit()
    print(f"Committed {len(inserted_ids['dokument_ids'])} Dokument records")
else:
    print("No Dokument data to insert or no Kreditantrag ID available")

In [None]:
# 3. Insert Extrahierte Daten
if not extrahierte_daten_df.empty and inserted_ids['dokument_ids']:
    print("\n3. Inserting Extrahierte Daten...")
    
    inserted_count = 0
    
    for index, row in extrahierte_daten_df.iterrows():
        try:
            # Use the first Dokument ID for foreign key relationship
            dokument_id = inserted_ids['dokument_ids'][0]
            
            insert_sql = """
            INSERT INTO [Extrahierte Daten] (
                Feldname,
                Wert,
                [Position im Dokument],
                Konfidenzscore,
                FK_Dokument
            ) VALUES (?, ?, ?, ?, ?)
            """
            
            values = (
                row.get('Feldname'),
                row.get('Wert'),
                row.get('Position im Dokument'),
                row.get('Konfidenzscore'),
                dokument_id
            )
            
            cursor.execute(insert_sql, values)
            inserted_count += 1
            
        except Exception as e:
            print(f"  ✗ Failed to insert Extrahierte Daten row {index}: {e}")
            conn.rollback()
            continue
    
    conn.commit()
    print(f"Committed {inserted_count} Extrahierte Daten records")
else:
    print("No Extrahierte Daten to insert or no Dokument ID available")

In [None]:
# Validation and summary
print("\n=== Database Insertion Summary ===")
print(f"Kreditantrag records inserted: {len(inserted_ids['kreditantrag_ids'])}")
print(f"Dokument records inserted: {len(inserted_ids['dokument_ids'])}")

# Count total Extrahierte Daten records
if inserted_ids['dokument_ids']:
    try:
        count_sql = "SELECT COUNT(*) FROM [Extrahierte Daten] WHERE FK_Dokument = ?"
        cursor.execute(count_sql, (inserted_ids['dokument_ids'][0],))
        extrahierte_count = cursor.fetchone()[0]
        print(f"Extrahierte Daten records inserted: {extrahierte_count}")
    except Exception as e:
        print(f"Could not count Extrahierte Daten: {e}")

# Show sample of inserted data
if inserted_ids['kreditantrag_ids']:
    print("\n=== Sample Inserted Data ===")
    
    # Show Kreditantrag sample
    try:
        sample_sql = """
        SELECT TOP 1 Antragsstatus, Kreditbetrag, Laufzeit, CustomerName
        FROM Kreditantrag WHERE AntragID = ?
        """
        cursor.execute(sample_sql, (inserted_ids['kreditantrag_ids'][0],))
        sample = cursor.fetchone()
        if sample:
            print(f"Kreditantrag: Status={sample[0]}, Betrag={sample[1]}, Laufzeit={sample[2]}, Customer={sample[3]}")
    except Exception as e:
        print(f"Could not retrieve Kreditantrag sample: {e}")

print("\n=== Database Insertion Complete ===")

# Close database connection
try:
    conn.close()
    print("Database connection closed")
except Exception as e:
    print(f"Error closing database connection: {e}")