#### 1) Setup, Configuration, and Database Engine


In [6]:
### 1. Install Libraries, Configuration, and Database Engine ###

#!pip install sqlalchemy pandas psycopg2-binary

import pandas as pd
import os
import json
from sqlalchemy import create_engine, text 
from datetime import datetime

# --- DATABASE CONFIGURATION ---

DB_USER = "postgres"  
DB_PASSWORD = "123" 
DB_HOST = "localhost" 
DB_PORT = "5432"
DB_NAME = "C4_Project" 

DATABASE_URL = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

# 2. File Directory 
OUTPUT_DIR = '/Users/mayapatel/Desktop/SQL/Project/Check Point 4/Simulated Data Files'

# --- SQLAlchemy Engine ---
try:
    # Create the connection engine.
    engine = create_engine(DATABASE_URL)
    
    # Test connection
    with engine.connect() as conn:
        result = conn.execute(text("SELECT current_database();")).scalar()
        print(f"Connection successful to database: {result}")

except Exception as e:
    print(f"FATAL CONNECTION ERROR: Check if your PostgreSQL server is running (via PgAdmin) and confirm the password/port: {e}")


# --- Utility Function for Loading ---
def load_data_from_csv(table_name):
    """
    Reads a CSV, applies type conversion, and loads data into the specified table 
    in the 'realestate' schema using pandas.to_sql.
    """
    csv_file = os.path.join(OUTPUT_DIR, f"{table_name}.csv")
    print(f"-> Processing {table_name}...")
    
    try:
        # Read the CSV. 
        df = pd.read_csv(csv_file)
        
        # --- Data Type Coercion ---
        date_cols = ['hire_date', 'listed_date', 'closed_date', 'offer_date', 'start_date', 'end_date', 'close_date']
        datetime_cols = ['created_at', 'appointment_datetime']
        
        for col in date_cols:
            if col in df.columns:
                df[col] = pd.to_datetime(df[col], errors='coerce', format='%Y-%m-%d').dt.date
        
        for col in datetime_cols:
            if col in df.columns:
                df[col] = pd.to_datetime(df[col], errors='coerce')
        
        if 'demographics' in df.columns:
            df['demographics'] = df['demographics'].apply(lambda x: json.loads(x) if pd.notna(x) else None)
        
        # --- Execute Bulk Insert ---
        rows_before = len(df)
        df.to_sql(
            table_name,
            con=engine,
            schema='realestate',  
            if_exists='append',   
            index=False          
        )
        print(f"   [SUCCESS] Loaded {rows_before} rows into realestate.{table_name}.")
        
    except FileNotFoundError:
        # Skip temporary ID files
        print(f"   [SKIP] CSV file not found (or intentionally skipped): {table_name}.csv.")
    except Exception as e:
        print(f"   [FATAL ERROR] Failed to load {table_name}. Check constraints: {e}")

Connection successful to database: C4_Project


#### 2) Dependency Order and Execution

Cell executes the bulk insertion process following the strict dependency order.

In [9]:
### 2. Execute Bulk Data Load in Dependency Order (FIXED) ###

# List of all tables to be loaded, organized by Foreign Key dependency.
TABLE_LOAD_ORDER = {
    # LEVEL 1: Only addresses and marketing_channels are left. The other 6 lookups are skipped.
    "Level 1 (Foundation)": [
        # Skipping states, property_types, etc., as data exists from schema INSERTs.
        "marketing_channels", "addresses"
    ],
    # LEVEL 2: Organization & Users
    "Level 2 (Organization)": [
        "users", "offices"
    ],
    # LEVEL 3: Core Personnel (FIXED: Clients load order moved before agents)
    # Clients must load first to ensure properties and preferences can reference them.
    "Level 3 (Personnel)": [
        "clients", "agents"
    ],
    # LEVEL 4: Inventory Prep
    "Level 4 (Inventory Prep)": [
        "client_preferences", "properties"
    ],
    # LEVEL 5: Listings
    "Level 5 (Listings)": [
        "listings"
    ],
    # LEVEL 6: Activities & Transactions
    "Level 6 (Activities)": [
        "appointments", "offers", "transactions", "marketing_campaigns"
    ]
}

def load_data_from_csv(table_name):
    """
    Reads a CSV, applies type conversion, handles JSONB, and loads data.
    """
    csv_file = os.path.join(OUTPUT_DIR, f"{table_name}.csv")
    print(f"-> Processing {table_name}...")
    
    try:
        df = pd.read_csv(csv_file)
        
        # --- Data Type Coercion (Includes JSONB fix) ---
        date_cols = ['hire_date', 'listed_date', 'closed_date', 'offer_date', 'start_date', 'end_date', 'close_date']
        datetime_cols = ['created_at', 'appointment_datetime']
        
        for col in date_cols:
            if col in df.columns:
                df[col] = pd.to_datetime(df[col], errors='coerce', format='%Y-%m-%d').dt.date
        
        for col in datetime_cols:
            if col in df.columns:
                df[col] = pd.to_datetime(df[col], errors='coerce')
        
        # **SOLUTION 2 FIX:** Explicitly serialize Python dict/object data for JSONB column
        if 'demographics' in df.columns:
            df['demographics'] = df['demographics'].apply(lambda x: json.dumps(json.loads(x)) if pd.notna(x) else None)

        # **SOLUTION 4 FIX:** Ensure commission_amount aligns with price * rate for trigger
        if 'close_price' in df.columns and 'commission_pct' in df.columns and 'commission_amount' in df.columns:
            # Re-calculate and aggressively round commission_amount to two decimal places 
            # to match the expected format used in the trigger check.
            df['commission_amount'] = (df['close_price'] * df['commission_pct']).round(2)
        
        # --- Execute Bulk Insert ---
        rows_before = len(df)
        df.to_sql(
            table_name,
            con=engine,
            schema='realestate',
            if_exists='append',
            index=False
        )
        print(f"   [SUCCESS] Loaded {rows_before} rows into realestate.{table_name}.")
        
    except FileNotFoundError:
        print(f"   [SKIP] CSV file not found (or intentionally skipped): {table_name}.csv.")
    except Exception as e:
        print(f"   [FATAL ERROR] Failed to load {table_name}. Check constraints: {e}")


# --- EXECUTION ---

print("-" * 60)
print(f"STARTING BULK DATA LOAD INTO DATABASE: {DB_NAME} (Fixed)")
print("-" * 60)

# Loop through the levels of dependency
for level, table_list in TABLE_LOAD_ORDER.items():
    print(f"\n--- {level} ---")
    for table_name in table_list:
        load_data_from_csv(table_name)

print("-" * 60)
print("DATA LOADING COMPLETE (Attempt 6).")
print("-" * 60)

------------------------------------------------------------
STARTING BULK DATA LOAD INTO DATABASE: C4_Project (Fixed)
------------------------------------------------------------

--- Level 1 (Foundation) ---
-> Processing marketing_channels...
   [SKIP] CSV file not found (or intentionally skipped): marketing_channels.csv.
-> Processing addresses...
   [SUCCESS] Loaded 150 rows into realestate.addresses.

--- Level 2 (Organization) ---
-> Processing users...
   [SUCCESS] Loaded 100 rows into realestate.users.
-> Processing offices...
   [SUCCESS] Loaded 10 rows into realestate.offices.

--- Level 3 (Personnel) ---
-> Processing clients...
   [SUCCESS] Loaded 100 rows into realestate.clients.
-> Processing agents...
   [SUCCESS] Loaded 100 rows into realestate.agents.

--- Level 4 (Inventory Prep) ---
-> Processing client_preferences...
   [SUCCESS] Loaded 80 rows into realestate.client_preferences.
-> Processing properties...
   [SUCCESS] Loaded 100 rows into realestate.properties.

