In [6]:
#!/usr/bin/env python3
"""
Initialize database tables for dbt project.

This script creates initial tables in PostgreSQL that correspond to the dbt seeds.
It can be run before dbt to ensure the database schema is ready.
"""

from pathlib import Path
import pandas as pd
from sqlalchemy import create_engine, text

# Database connection parameters, use environment variables or config files in production
DB_USER = "admin"
DB_PASSWORD = "password"
DB_HOST = "localhost"  # Use localhost when connecting from host machine
DB_PORT = "5432"
DB_NAME = "bankloan"

def create_engine_with_retries(max_retries=5):
    """Create a SQLAlchemy engine with connection retries."""
    connection_string = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
    
    for attempt in range(max_retries):
        try:
            engine = create_engine(connection_string)
            # Test connection
            with engine.connect() as conn:
                conn.execute(text("SELECT 1"))
            print(f"Successfully connected to database on attempt {attempt + 1}")
            return engine
        except Exception as e:
            print(f"Attempt {attempt + 1} failed: {e}")
            if attempt == max_retries - 1:
                raise
    
    return None

def create_tables(engine):
    """Create the initial tables in the database."""
    loan_query = """
    CREATE TABLE IF NOT EXISTS client_loan (
    account_id       INTEGER,
    district_id      INTEGER,
    statement_freq   VARCHAR(50),
    client_id        INTEGER,
    gender           VARCHAR(10),
    DateOfBirth      DATE,
    loan_id          VARCHAR(50),
    loan_date        DATE,
    amount           REAL,
    duration         INTEGER,
    payments         REAL,
    contract_status  VARCHAR(20),
    isdebt           VARCHAR(10),
    dis_name         VARCHAR(50),
    Region           VARCHAR(50),
    Avg_Salary       REAL,
    UnempRate95      REAL,
    UnempRate96      REAL
    );
    """
    
    
    with engine.begin() as connection:
        print("Creating client_loan table...")
        connection.execute(text(loan_query))
        
    
    print("Tables created successfully!")

def load_seed_data(engine, project_path):
    """Load data from seed files into the database tables."""
    
    # Path to loan seed file
    loan_file = project_path / 'bank_loan.csv'

    
    # Load loan data
    if loan_file.exists():
        print(f"Loading data from {loan_file}...")
        df_loan = pd.read_csv(loan_file)
        
        # Insert data using pandas to_sql method
        df_loan.to_sql('client_loan', 
                           engine, 
                           if_exists='replace', 
                           index=False, 
                           method='multi',
                           chunksize=1000)
        print("Customer data loaded successfully!")
    else:
        print(f"Warning: Could not find seed file {loan_file}")
    

def check_tables(engine):
    """Verify tables were created and data was loaded properly."""
    queries = [
        ("Count loan", "SELECT COUNT(*) FROM client_loan"),

    ]
    
    with engine.connect() as connection:
        for description, query in queries:
            print(f"\n{description}:")
            result = connection.execute(text(query))
            rows = result.fetchall()
            for row in rows:
                print(row)

def main():
    print("Initializing database for dbt project...")
    
    # Path to the dbt project
    current_file = Path('./')
    project_path = current_file.parent / 'data'
    print(f"Using project path: {project_path}, {project_path.resolve()}")	
    
    try:
        # Create database connection
        engine = create_engine_with_retries()
        
        # Create tables
        create_tables(engine)
        
        # Load seed data
        load_seed_data(engine, project_path)
        
        # Verify tables and data
        check_tables(engine)
        
        print("\nDatabase initialization complete! You can now run dbt commands.")
        
    except Exception as e:
        print(f"Error initializing database: {e}")


main()

Initializing database for dbt project...
Using project path: data, /mnt/c/Users/Sugandh/code/bank_loans/notebooks/data
Successfully connected to database on attempt 1
Creating client_loan table...
Tables created successfully!
Loading data from data/bank_loan.csv...
Customer data loaded successfully!

Count loan:
(99240,)

Database initialization complete! You can now run dbt commands.
