In [None]:
import psycopg2
from psycopg2 import sql
import time
import sys
from pathlib import Path
import csv
import io

# Configuration: update these to match your Postgres connection
DB_CONFIG = {
    "host": "localhost",
    "port": 5432,
    "dbname": "DataWarehouse",
    "user": "postgres",
    "password": "admin",
}

# Map each table to its CSV file path and column list
BASE_DIR = Path(r"C:/Users/Ryan Gabriel Magno/Documents/sql-data-warehouse-project/datasets")
TABLES = {
    "crm_cust_info": {
        "path": BASE_DIR / "source_crm" / "cust_info.csv",
        "columns": ["cst_id", "cst_key", "cst_firstname", "cst_lastname", "cst_marital_status", "cst_gndr", "cst_create_date"],
    },
    "crm_prd_info": {
        "path": BASE_DIR / "source_crm" / "prd_info.csv",
        "columns": ["prd_id", "prd_key", "prd_nm", "prd_cost", "prd_line", "prd_start_dt", "prd_end_dt"],
    },
    "crm_sales_details": {
        "path": BASE_DIR / "source_crm" / "sales_details.csv",
        "columns": ["sls_ord_num", "sls_prd_key", "sls_cust_id", "sls_order_dt", "sls_ship_dt", "sls_due_dt", "sls_sales", "sls_quantity", "sls_price"],
    },
    "erp_loc_a101": {
        "path": BASE_DIR / "source_erp" / "LOC_A101.csv",
        "columns": ["cid", "cntry"],
    },
    "erp_cust_az12": {
        "path": BASE_DIR / "source_erp" / "CUST_AZ12.csv",
        "columns": ["cid", "bdate", "gen"],
    },
    "erp_px_cat_g1v2": {
        "path": BASE_DIR / "source_erp" / "PX_CAT_G1V2.csv",
        "columns": ["id", "cat", "subcat", "maintenance"],
    },
}


def load_table(conn, table_name, csv_path, columns):
    """
    Truncate the target table and bulk-load CSV data via COPY,
    after trimming whitespace and converting blank fields to NULL.
    """
    print(f"\n>> Loading table: bronze.{table_name}")
    cur = conn.cursor()
    try:
        # Truncate
        print(f"   Truncating bronze.{table_name}")
        cur.execute(sql.SQL("TRUNCATE TABLE bronze.{tbl}").format(tbl=sql.Identifier(table_name)))
        conn.commit()

        # Preprocess CSV: strip whitespace and convert blanks to NULL markers
        print(f"   Preprocessing CSV {csv_path}")
        cleaned = io.StringIO()
        reader = csv.reader(open(csv_path, newline='', encoding='utf-8'))
        writer = csv.writer(cleaned)
        header = next(reader)
        writer.writerow(header)
        for row in reader:
            # strip whitespace; None for blanks
            newrow = [(cell.strip() if cell and cell.strip() else None) for cell in row]
            writer.writerow(newrow)
        cleaned.seek(0)

        # Bulk copy with default CSV null behavior (empty unquoted fields => NULL)
        print(f"   Copying data into bronze.{table_name}")
        copy_sql = sql.SQL(
            "COPY bronze.{tbl} ({cols}) FROM STDIN WITH (FORMAT csv, HEADER true)"
        ).format(
            tbl=sql.Identifier(table_name),
            cols=sql.SQL(', ').join(map(sql.Identifier, columns))
        )
        cur.copy_expert(copy_sql, cleaned)
        conn.commit()
        print(f"   Completed bronze.{table_name}")
    except Exception:
        conn.rollback()
        print(f"ERROR loading bronze.{table_name}:", sys.exc_info()[1])
        raise
    finally:
        cur.close()


def main():
    # Connect to Postgres
    try:
        conn = psycopg2.connect(**DB_CONFIG)
    except Exception as e:
        print("Unable to connect to database:", e)
        sys.exit(1)

    batch_start = time.time()
    print("========================================")
    print("Starting Bronze layer load...")
    print("========================================")

    try:
        for tbl, meta in TABLES.items():
            start = time.time()
            load_table(conn, tbl, str(meta["path"]), meta["columns"])
            duration = time.time() - start
            print(f">> Duration for {tbl}: {duration:.2f} seconds")

        total_duration = time.time() - batch_start
        print("========================================")
        print(f"Finished Bronze layer load in {total_duration:.2f} seconds")
        print("========================================")
    except Exception:
        print("Loading aborted due to error.")
    finally:
        conn.close()


if __name__ == "__main__":
    main()


Starting Bronze layer load...

>> Loading table: bronze.crm_cust_info
   Truncating bronze.crm_cust_info
   Preprocessing CSV C:\Users\Ryan Gabriel Magno\Documents\sql-data-warehouse-project\datasets\source_crm\cust_info.csv
   Copying data into bronze.crm_cust_info
   Completed bronze.crm_cust_info
>> Duration for crm_cust_info: 0.28 seconds

>> Loading table: bronze.crm_prd_info
   Truncating bronze.crm_prd_info
   Preprocessing CSV C:\Users\Ryan Gabriel Magno\Documents\sql-data-warehouse-project\datasets\source_crm\prd_info.csv
   Copying data into bronze.crm_prd_info
   Completed bronze.crm_prd_info
>> Duration for crm_prd_info: 0.01 seconds

>> Loading table: bronze.crm_sales_details
   Truncating bronze.crm_sales_details
   Preprocessing CSV C:\Users\Ryan Gabriel Magno\Documents\sql-data-warehouse-project\datasets\source_crm\sales_details.csv
   Copying data into bronze.crm_sales_details
   Completed bronze.crm_sales_details
>> Duration for crm_sales_details: 0.30 seconds

>> Loa