Environment Check and Prerequisites
==========================================
Before we start, let's understand what we're building:

1. We have some mock financial data CSV files)
2. We need to move this data into a "real" database (PostgreSQL)
3. We'll learn SQL by querying our own financial data
4. This mirrors real FinTech companies' data pipelines

What you'll learn:
- How to connect Python to PostgreSQL
- How to import CSV files into database tables
- How to validate data quality
- How to write SQL queries for financial analysis

Prerequisites Check:
- PostgreSQL installed (version 14+)
- pgAdmin installed (comes with PostgreSQL)
- Python packages: pandas, numpy, psycopg2, sqlalchemy
- Your generated data in 'mock_financial_data' folder

In [1]:
import os
import sys
import pandas as pd
import numpy as np
from datetime import datetime

# Check Python version
print(f"🐍 Python version: {sys.version}")
print(f"📊 Pandas version: {pd.__version__}")
print(f"🔢 NumPy version: {np.__version__}")

# Check if we have the synthetic data 
data_dir = 'mock_financial_data'
if os.path.exists(data_dir):
    print(f"\n✅ Found data directory: {data_dir}")
    files = os.listdir(data_dir)
    print(f"📁 Files available: {len(files)}")
    for file in sorted(files):
        if file.endswith('.csv'):
            size_mb = os.path.getsize(os.path.join(data_dir, file)) / (1024*1024)
            print(f"   - {file}: {size_mb:.2f} MB")
else:
    print(f"\n❌ Data directory not found!")
    print("Please run Week 0 data generation first")
    print("Run: generator.save_all_datasets()")

🐍 Python version: 3.13.2 (tags/v3.13.2:4f8bb39, Feb  4 2025, 15:23:48) [MSC v.1942 64 bit (AMD64)]
📊 Pandas version: 2.2.3
🔢 NumPy version: 2.2.6

❌ Data directory not found!
Please run Week 0 data generation first
Run: generator.save_all_datasets()


Installing Database Connection Libraries
===============================================
We need special Python packages to talk to PostgreSQL:

1. psycopg2: Low-level PostgreSQL adapter (like a translator)
2. sqlalchemy: High-level database toolkit (makes complex operations easier)

Think of it like:
- psycopg2 = manual transmission (more control, more complex)
- sqlalchemy = automatic transmission (easier to use, less control)

If installation fails:
- Windows: You might need Visual C++ build tools
- Mac: You might need to install PostgreSQL first
- Linux: You might need postgresql-dev package

In [2]:
import subprocess
import sys

def install_package(package_name):
    """Helper function to install packages"""
    try:
        __import__(package_name.replace('-', '_'))
        print(f"✅ {package_name} already installed")
    except ImportError:
        print(f"📦 Installing {package_name}...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", package_name])
        print(f"✅ {package_name} installed successfully")

# Install required packages
packages_needed = [
    'psycopg2-binary',  # PostgreSQL adapter
    'sqlalchemy',       # SQL toolkit
    'python-dotenv'     # For secure password storage
]

print("🔧 Checking required packages...\n")
for package in packages_needed:
    install_package(package)

print("\n✅ All required packages are ready!")

🔧 Checking required packages...

📦 Installing psycopg2-binary...
✅ psycopg2-binary installed successfully
✅ sqlalchemy already installed
📦 Installing python-dotenv...
✅ python-dotenv installed successfully

✅ All required packages are ready!


Setting Up Database Connection
=====================================

IMPORTANT: Database connections are like phone calls:
1. You need the right "phone number" (host, port, database name)
2. You need to "authenticate" yourself (username, password)
3. The connection can "drop" if not used properly
4. Always "hang up" (close connection) when done

Security Note:
- NEVER hardcode passwords in your code
- NEVER commit passwords to GitHub
- Use environment variables or config files

Let's create a safe connection setup:

In [4]:
import psycopg2
from sqlalchemy import create_engine
import getpass  # For secure password input

# Database configuration class
class DatabaseConfig:
    """
    Stores database connection parameters.
    In production, these would come from environment variables.
    """
    def __init__(self):
        self.host = 'localhost'      # Your computer
        self.port = 5432            # Default PostgreSQL port
        self.database = 'fintech_db' # Database name we'll create
        self.user = 'postgres'       # Default PostgreSQL user
        
        # Secure password input
        print("🔐 PostgreSQL Connection Setup")
        print(f"Host: {self.host}")
        print(f"Port: {self.port}")
        print(f"Database: {self.database}")
        print(f"User: {self.user}")
        
        # Get password securely (won't show on screen)
        self.password = getpass.getpass("Enter PostgreSQL password: ")
        
    def get_connection_string(self):
        """Create connection string for SQLAlchemy"""
        return f"postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}"
    
    def get_connection_params(self):
        """Get parameters for psycopg2"""
        return {
            'host': self.host,
            'port': self.port,
            'database': self.database,
            'user': self.user,
            'password': self.password
        }

# Create configuration
db_config = DatabaseConfig()

# Test connection
def test_connection(config):
    """Test if we can connect to PostgreSQL"""
    try:
        # First, connect to PostgreSQL server (not specific database)
        conn_params = config.get_connection_params()
        conn_params['database'] = 'postgres'  # Default database
        
        conn = psycopg2.connect(**conn_params)
        cur = conn.cursor()
        
        # Check PostgreSQL version
        cur.execute("SELECT version();")
        version = cur.fetchone()[0]
        print(f"\n✅ Connected to PostgreSQL!")
        print(f"📊 Version: {version.split(',')[0]}")
        
        cur.close()
        conn.close()
        return True
        
    except Exception as e:
        print(f"\n❌ Connection failed: {e}")
        print("\nTroubleshooting:")
        print("1. Is PostgreSQL running? Check pgAdmin")
        print("2. Is the password correct?")
        print("3. Is the port 5432 free?")
        return False

# Test the connection
if test_connection(db_config):
    print("\n🎉 Database connection successful!")
else:
    print("\n⚠️ Please fix connection issues before proceeding")

🔐 PostgreSQL Connection Setup
Host: localhost
Port: 5432
Database: fintech_db
User: postgres


Enter PostgreSQL password:  ········



✅ Connected to PostgreSQL!
📊 Version: PostgreSQL 17.5 on x86_64-windows

🎉 Database connection successful!


Creating Our FinTech Database
====================================

Databases are like filing cabinets:
- PostgreSQL server = The entire office
- Database = One filing cabinet  
- Tables = Drawers in the cabinet
- Rows = Individual files in drawers

We'll create a dedicated database for our FinTech project.
This keeps our data organized and separate from other projects.

Important Concepts:
- CREATE DATABASE: Makes a new database
- DROP DATABASE: Deletes a database (careful!)
- IF EXISTS / IF NOT EXISTS: Prevents errors

In [4]:
def create_fintech_database(config):
    """
    Create the fintech_db database if it doesn't exist.
    This is like creating a new filing cabinet for our project.
    """
    try:
        # Connect to PostgreSQL server (not a specific database)
        conn_params = config.get_connection_params()
        conn_params['database'] = 'postgres'  # Default database
        
        # Important: autocommit=True for CREATE DATABASE
        conn = psycopg2.connect(**conn_params)
        conn.autocommit = True  # Required for CREATE DATABASE
        cur = conn.cursor()
        
        # Check if database exists
        cur.execute("""
            SELECT 1 FROM pg_database WHERE datname = %s
        """, (config.database,))
        
        exists = cur.fetchone()
        
        if exists:
            print(f"📊 Database '{config.database}' already exists")
            
            # Get database size
            cur.execute(f"""
                SELECT pg_size_pretty(pg_database_size('{config.database}'))
            """)
            size = cur.fetchone()[0]
            print(f"📏 Current size: {size}")
            
        else:
            print(f"🏗️ Creating database '{config.database}'...")
            cur.execute(f"CREATE DATABASE {config.database}")
            print(f"✅ Database '{config.database}' created successfully!")
        
        # List all databases (for learning purposes)
        print("\n📋 All databases on this server:")
        cur.execute("""
            SELECT datname, pg_size_pretty(pg_database_size(datname)) as size
            FROM pg_database 
            WHERE datistemplate = false
            ORDER BY datname;
        """)
        
        for db_name, db_size in cur.fetchall():
            print(f"   - {db_name}: {db_size}")
        
        cur.close()
        conn.close()
        return True
        
    except Exception as e:
        print(f"❌ Error creating database: {e}")
        return False

# Create our database
if create_fintech_database(db_config):
    print("\n✅ Database setup complete!")
else:
    print("\n⚠️ Database creation failed")

📊 Database 'fintech_db' already exists
📏 Current size: 27 MB

📋 All databases on this server:
   - fintech_db: 27 MB
   - postgres: 7987 kB

✅ Database setup complete!


Designing Database Tables (Schemas)
==========================================

A table schema is like a blueprint for data storage.
Just like a form has specific fields, tables have columns with specific types.

Data Types in PostgreSQL:
- INTEGER: Whole numbers (-2B to +2B)
- BIGINT: Very large whole numbers
- NUMERIC(10,2): Decimal with 10 total digits, 2 after decimal
- VARCHAR(50): Text up to 50 characters
- TEXT: Unlimited text
- DATE: Calendar date (no time)
- TIMESTAMP: Date + time
- BOOLEAN: True/False

Primary Keys:
- Unique identifier for each row
- Like a social security number for data
- Can be one column or combination of columns

Indexes:
- Like a book's index - helps find data faster
- Trade-off: Faster reads, slower writes


In [5]:
# Let's examine what tables we need
def analyze_csv_structure(data_dir='mock_financial_data'):
    """
    Analyze our CSV files to understand what tables we need.
    This helps us design appropriate database schemas.
    """
    print("📊 Analyzing CSV files to design table schemas...\n")
    
    csv_files = [f for f in os.listdir(data_dir) if f.endswith('.csv')]
    
    table_designs = {}
    
    for csv_file in csv_files:
        print(f"📄 Analyzing: {csv_file}")
        print("-" * 50)
        
        # Read first few rows to understand structure
        df = pd.read_csv(os.path.join(data_dir, csv_file), nrows=5)
        
        # Analyze columns
        print(f"Columns: {len(df.columns)}")
        print(f"Sample rows: {len(df)}")
        print("\nColumn Analysis:")
        
        table_name = csv_file.replace('.csv', '').replace('_data', 's')
        columns_info = []
        
        for col in df.columns:
            # Determine PostgreSQL data type
            dtype = str(df[col].dtype)
            sample_value = df[col].iloc[0] if len(df) > 0 else None
            
            if 'date' in col.lower() or 'time' in col.lower():
                if 'timestamp' in col.lower():
                    pg_type = 'TIMESTAMP'
                else:
                    pg_type = 'DATE'
            elif dtype == 'object':
                # Check if it's a string column
                max_len = df[col].astype(str).str.len().max()
                if max_len <= 50:
                    pg_type = f'VARCHAR({int(max_len * 1.5)})'  # Add buffer
                else:
                    pg_type = 'TEXT'
            elif 'int' in dtype:
                if df[col].max() > 2147483647:  # Max INT value
                    pg_type = 'BIGINT'
                else:
                    pg_type = 'INTEGER'
            elif 'float' in dtype:
                # Determine precision needed
                if 'price' in col.lower() or 'value' in col.lower():
                    pg_type = 'NUMERIC(20,8)'  # High precision for prices
                elif 'weight' in col.lower() or 'return' in col.lower():
                    pg_type = 'NUMERIC(7,4)'   # Percentages
                else:
                    pg_type = 'NUMERIC(20,4)'  # General decimal
            elif 'bool' in dtype:
                pg_type = 'BOOLEAN'
            else:
                pg_type = 'TEXT'  # Fallback
            
            columns_info.append({
                'name': col,
                'pandas_type': dtype,
                'postgres_type': pg_type,
                'sample': sample_value
            })
            
            print(f"  - {col:20} {dtype:10} → {pg_type:15} (sample: {sample_value})")
        
        table_designs[table_name] = columns_info
        print("\n")
    
    return table_designs

# Analyze our data
table_designs = analyze_csv_structure()
print("✅ Schema analysis complete!")

📊 Analyzing CSV files to design table schemas...

📄 Analyzing: crypto_prices.csv
--------------------------------------------------
Columns: 7
Sample rows: 5

Column Analysis:
  - Timestamp            object     → TIMESTAMP       (sample: 2020-01-01 00:00:00)
  - Symbol               object     → VARCHAR(4)      (sample: BTC)
  - Open                 float64    → NUMERIC(20,4)   (sample: 55610.37)
  - High                 float64    → NUMERIC(20,4)   (sample: 61577.1)
  - Low                  float64    → NUMERIC(20,4)   (sample: 55610.37)
  - Close                float64    → NUMERIC(20,4)   (sample: 60842.74)
  - Volume               int64      → INTEGER         (sample: 57397)


📄 Analyzing: customer_data.csv
--------------------------------------------------
Columns: 11
Sample rows: 5

Column Analysis:
  - CustomerID           object     → VARCHAR(16)     (sample: CUST_000001)
  - Age                  int64      → INTEGER         (sample: 18)
  - Income               float64    → N

Cell 6: Creating Tables in PostgreSQL
====================================

Now we'll create the actual tables in our database.
This is like setting up the drawers in our filing cabinet.

SQL Commands we'll use:
- CREATE TABLE: Makes a new table
- PRIMARY KEY: Unique identifier for rows
- NOT NULL: This column cannot be empty
- CREATE INDEX: Speed up searches

Best Practices:
1. Always include created_at timestamp
2. Use meaningful column names
3. Choose appropriate data types
4. Add indexes for frequently searched columns


In [6]:
def create_tables(config):
    """
    Create all necessary tables for our FinTech project.
    Each table is designed for specific financial data.
    """
    
    # Table definitions with explanations
    tables = {
        'stock_prices': {
            'description': 'Daily stock price data (OHLCV format)',
            'sql': """
                CREATE TABLE IF NOT EXISTS stock_prices (
                    -- Primary key columns (unique identifier)
                    date DATE NOT NULL,
                    symbol VARCHAR(10) NOT NULL,
                    
                    -- Price data (OHLCV)
                    open NUMERIC(10,2),      -- Opening price
                    high NUMERIC(10,2),      -- Highest price of day
                    low NUMERIC(10,2),       -- Lowest price of day
                    close NUMERIC(10,2),     -- Closing price
                    volume BIGINT,           -- Number of shares traded
                    
                    -- Metadata
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    
                    -- Composite primary key (date + symbol must be unique)
                    PRIMARY KEY (date, symbol)
                );
                
                -- Indexes for faster queries
                CREATE INDEX IF NOT EXISTS idx_stock_symbol ON stock_prices(symbol);
                CREATE INDEX IF NOT EXISTS idx_stock_date ON stock_prices(date);
                
                -- Add comments for documentation
                COMMENT ON TABLE stock_prices IS 'Daily stock price data in OHLCV format';
                COMMENT ON COLUMN stock_prices.symbol IS 'Stock ticker symbol (e.g., AAPL, GOOGL)';
                COMMENT ON COLUMN stock_prices.volume IS 'Number of shares traded during the day';
            """
        },
        
        'crypto_prices': {
            'description': '6-hourly cryptocurrency price data',
            'sql': """
                CREATE TABLE IF NOT EXISTS crypto_prices (
                    -- Crypto trades 24/7, so we use timestamp
                    timestamp TIMESTAMP NOT NULL,
                    symbol VARCHAR(10) NOT NULL,
                    
                    -- Price data (higher precision for crypto)
                    open NUMERIC(20,8),      -- 8 decimals for small altcoins
                    high NUMERIC(20,8),
                    low NUMERIC(20,8),
                    close NUMERIC(20,8),
                    volume BIGINT,
                    
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    PRIMARY KEY (timestamp, symbol)
                );
                
                CREATE INDEX IF NOT EXISTS idx_crypto_symbol ON crypto_prices(symbol);
                CREATE INDEX IF NOT EXISTS idx_crypto_timestamp ON crypto_prices(timestamp);
                
                COMMENT ON TABLE crypto_prices IS '6-hourly cryptocurrency price data';
                COMMENT ON COLUMN crypto_prices.timestamp IS 'UTC timestamp of the price snapshot';
            """
        },
        
        'economic_indicators': {
            'description': 'Monthly macroeconomic indicators',
            'sql': """
                CREATE TABLE IF NOT EXISTS economic_indicators (
                    date DATE NOT NULL,
                    indicator VARCHAR(50) NOT NULL,
                    value NUMERIC(20,4),
                    unit VARCHAR(20),        -- %, USD, points, etc.
                    
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    PRIMARY KEY (date, indicator)
                );
                
                CREATE INDEX IF NOT EXISTS idx_econ_indicator ON economic_indicators(indicator);
                CREATE INDEX IF NOT EXISTS idx_econ_date ON economic_indicators(date);
                
                COMMENT ON TABLE economic_indicators IS 'Monthly macroeconomic indicators';
                COMMENT ON COLUMN economic_indicators.indicator IS 'Indicator name (e.g., GDP_GROWTH, INFLATION_RATE)';
                COMMENT ON COLUMN economic_indicators.unit IS 'Unit of measurement';
            """
        },
        
        'portfolio_holdings': {
            'description': 'Monthly portfolio snapshots',
            'sql': """
                CREATE TABLE IF NOT EXISTS portfolio_holdings (
                    date DATE NOT NULL,
                    portfolio_id VARCHAR(20) NOT NULL,
                    
                    -- Portfolio characteristics
                    risk_level VARCHAR(20),   -- Conservative/Moderate/Aggressive
                    total_value NUMERIC(20,2),
                    
                    -- Asset allocation (must sum to 1.0)
                    stock_weight NUMERIC(5,4),  -- 0.0000 to 1.0000
                    bond_weight NUMERIC(5,4),
                    cash_weight NUMERIC(5,4),
                    
                    -- Performance
                    monthly_return NUMERIC(7,4),  -- -0.9999 to 9.9999
                    
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    PRIMARY KEY (date, portfolio_id)
                );
                
                CREATE INDEX IF NOT EXISTS idx_portfolio_id ON portfolio_holdings(portfolio_id);
                CREATE INDEX IF NOT EXISTS idx_portfolio_risk ON portfolio_holdings(risk_level);
                
                COMMENT ON TABLE portfolio_holdings IS 'Monthly portfolio performance and allocation data';
            """
        },
        
        'customers': {
            'description': 'Customer demographics and account information',
            'sql': """
                CREATE TABLE IF NOT EXISTS customers (
                    -- Customer identity
                    customer_id VARCHAR(20) PRIMARY KEY,
                    
                    -- Demographics
                    age INTEGER CHECK (age >= 18 AND age <= 120),
                    income NUMERIC(12,2) CHECK (income >= 0),
                    credit_score INTEGER CHECK (credit_score >= 300 AND credit_score <= 850),
                    
                    -- Account information
                    account_age_days INTEGER CHECK (account_age_days >= 0),
                    account_balance NUMERIC(20,2),
                    
                    -- Behavior metrics
                    monthly_transactions INTEGER CHECK (monthly_transactions >= 0),
                    avg_transaction_amount NUMERIC(12,2),
                    num_products INTEGER CHECK (num_products >= 0),
                    has_loan BOOLEAN,
                    
                    -- Risk classification
                    risk_segment VARCHAR(20),
                    
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );
                
                CREATE INDEX IF NOT EXISTS idx_customer_risk ON customers(risk_segment);
                CREATE INDEX IF NOT EXISTS idx_customer_credit ON customers(credit_score);
                
                COMMENT ON TABLE customers IS 'Customer profiles for analytics and risk assessment';
                COMMENT ON COLUMN customers.credit_score IS 'FICO credit score (300-850 range)';
            """
        }
    }
    
    try:
        # Connect to our fintech database
        conn = psycopg2.connect(**config.get_connection_params())
        cur = conn.cursor()
        
        print("🏗️ Creating database tables...\n")
        
        for table_name, table_info in tables.items():
            print(f"📊 Creating table: {table_name}")
            print(f"   Description: {table_info['description']}")
            
            # Execute the CREATE TABLE statement
            cur.execute(table_info['sql'])
            
            # Check if table was created successfully
            cur.execute("""
                SELECT COUNT(*) 
                FROM information_schema.tables 
                WHERE table_schema = 'public' 
                AND table_name = %s
            """, (table_name,))
            
            if cur.fetchone()[0] > 0:
                print(f"   ✅ Table created successfully")
                
                # Get column count
                cur.execute("""
                    SELECT COUNT(*) 
                    FROM information_schema.columns 
                    WHERE table_name = %s
                """, (table_name,))
                col_count = cur.fetchone()[0]
                print(f"   📋 Columns: {col_count}")
            else:
                print(f"   ❌ Table creation failed")
            
            print()
        
        # Commit all changes
        conn.commit()
        
        # Show summary of created tables
        print("\n📊 Database Schema Summary:")
        print("-" * 60)
        cur.execute("""
            SELECT 
                table_name,
                COUNT(*) as column_count
            FROM information_schema.columns
            WHERE table_schema = 'public'
            GROUP BY table_name
            ORDER BY table_name;
        """)
        
        for table, col_count in cur.fetchall():
            print(f"Table: {table:20} Columns: {col_count}")
        
        cur.close()
        conn.close()
        print("\n✅ All tables created successfully!")
        return True
        
    except Exception as e:
        print(f"❌ Error creating tables: {e}")
        if conn:
            conn.rollback()
        return False

# Create all tables
if create_tables(db_config):
    print("🎉 Database schema is ready!")
else:
    print("⚠️ Please fix table creation errors")

🏗️ Creating database tables...

📊 Creating table: stock_prices
   Description: Daily stock price data (OHLCV format)
   ✅ Table created successfully
   📋 Columns: 8

📊 Creating table: crypto_prices
   Description: 6-hourly cryptocurrency price data
   ✅ Table created successfully
   📋 Columns: 8

📊 Creating table: economic_indicators
   Description: Monthly macroeconomic indicators
   ✅ Table created successfully
   📋 Columns: 5

📊 Creating table: portfolio_holdings
   Description: Monthly portfolio snapshots
   ✅ Table created successfully
   📋 Columns: 9

📊 Creating table: customers
   Description: Customer demographics and account information
   ✅ Table created successfully
   📋 Columns: 12


📊 Database Schema Summary:
------------------------------------------------------------
Table: crypto_prices        Columns: 8
Table: customers            Columns: 12
Table: daily_returns        Columns: 6
Table: economic_cycle_indicators Columns: 6
Table: economic_indicators  Columns: 5
Table:

Understanding the Data Import Process
============================================

Before we import data, let's understand what we're doing:

CSV Files → PostgreSQL Tables

Challenges:
1. Data types might not match
2. Dates need special formatting
3. NULL values need handling
4. Large files need batch processing

Methods for importing data:
1. COPY command (fastest, but less flexible)
2. INSERT statements (slow, but most control)
3. pandas.to_sql (balanced approach)

We'll use pandas.to_sql because:
- It handles data type conversion
- It can append to existing tables
- It provides progress feedback
- It's easier to debug

In [7]:
def preview_csv_data(csv_file, data_dir='mock_financial_data', rows=5):
    """
    Preview CSV data before import to spot potential issues.
    
    """
    filepath = os.path.join(data_dir, csv_file)
    
    print(f"📄 Previewing: {csv_file}")
    print("=" * 60)
    
    # Read first few rows
    df = pd.read_csv(filepath, nrows=rows)
    
    # Basic statistics
    total_rows = sum(1 for line in open(filepath)) - 1  # Subtract header
    
    print(f"Total rows: {total_rows:,}")
    print(f"Columns: {list(df.columns)}")
    print(f"\nFirst {rows} rows:")
    print(df)
    
    # Check for potential issues
    print(f"\nData Quality Checks:")
    
    # Check for nulls
    null_counts = df.isnull().sum()
    if null_counts.any():
        print("⚠️ Found NULL values:")
        print(null_counts[null_counts > 0])
    else:
        print("✅ No NULL values in sample")
    
    # Check data types
    print(f"\nData types:")
    for col, dtype in df.dtypes.items():
        print(f"  {col}: {dtype}")
    
    # Memory usage
    memory_usage = df.memory_usage(deep=True).sum() / 1024**2
    estimated_total = memory_usage * (total_rows / rows)
    print(f"\nEstimated memory needed: {estimated_total:.2f} MB")
    
    return df

# Preview each CSV file
csv_files = [
    'stock_prices.csv',
    'crypto_prices.csv', 
    'economic_indicators.csv',
    'portfolio_data.csv',
    'customer_data.csv'
]

print("🔍 Previewing all CSV files before import...\n")

for csv_file in csv_files:
    if os.path.exists(os.path.join('mock_financial_data', csv_file)):
        preview_df = preview_csv_data(csv_file)
        print("\n" + "-"*60 + "\n")
        # Wait for user to review
        input("Press Enter to continue to next file...")

🔍 Previewing all CSV files before import...

📄 Previewing: stock_prices.csv
Total rows: 26,100
Columns: ['Date', 'Symbol', 'Open', 'High', 'Low', 'Close', 'Volume']

First 5 rows:
         Date Symbol   Open   High    Low  Close   Volume
0  2020-01-01   AAPL  24.56  24.86  24.56  24.84  1701200
1  2020-01-02   AAPL  24.93  24.93  23.82  24.11  3978279
2  2020-01-03   AAPL  24.11  24.36  24.11  24.33  7701730
3  2020-01-06   AAPL  24.20  24.35  24.18  24.19  7669587
4  2020-01-07   AAPL  24.11  24.11  23.60  23.61  1400184

Data Quality Checks:
✅ No NULL values in sample

Data types:
  Date: object
  Symbol: object
  Open: float64
  High: float64
  Low: float64
  Close: float64
  Volume: int64

Estimated memory needed: 4.44 MB

------------------------------------------------------------



Press Enter to continue to next file... 


📄 Previewing: crypto_prices.csv
Total rows: 73,050
Columns: ['Timestamp', 'Symbol', 'Open', 'High', 'Low', 'Close', 'Volume']

First 5 rows:
             Timestamp Symbol      Open      High       Low     Close  Volume
0  2020-01-01 00:00:00    BTC  55610.37  61577.10  55610.37  60842.74   57397
1  2020-01-01 06:00:00    BTC  58654.92  61403.00  58654.92  61398.09  178177
2  2020-01-01 12:00:00    BTC  60507.59  60507.59  59462.07  59701.21   59879
3  2020-01-01 18:00:00    BTC  61069.08  61069.08  57124.83  58855.20  178901
4  2020-01-02 00:00:00    BTC  57630.54  61751.48  57630.54  59970.54   67811

Data Quality Checks:
✅ No NULL values in sample

Data types:
  Timestamp: object
  Symbol: object
  Open: float64
  High: float64
  Low: float64
  Close: float64
  Volume: int64

Estimated memory needed: 12.99 MB

------------------------------------------------------------



Press Enter to continue to next file... 


📄 Previewing: economic_indicators.csv
Total rows: 600
Columns: ['Date', 'Indicator', 'Value']

First 5 rows:
         Date            Indicator   Value
0  2020-01-31           GDP_GROWTH    2.78
1  2020-01-31       INFLATION_RATE    1.75
2  2020-01-31    UNEMPLOYMENT_RATE    5.24
3  2020-01-31        INTEREST_RATE    1.66
4  2020-01-31  CONSUMER_CONFIDENCE  100.13

Data Quality Checks:
✅ No NULL values in sample

Data types:
  Date: object
  Indicator: object
  Value: float64

Estimated memory needed: 0.09 MB

------------------------------------------------------------



Press Enter to continue to next file... 


📄 Previewing: portfolio_data.csv
Total rows: 6,000
Columns: ['Date', 'PortfolioID', 'RiskLevel', 'TotalValue', 'StockWeight', 'BondWeight', 'CashWeight', 'MonthlyReturn']

First 5 rows:
         Date PortfolioID     RiskLevel  TotalValue  StockWeight  BondWeight  \
0  2020-01-31      PF_001  Conservative  1864562.35        0.337       0.599   
1  2020-02-29      PF_001  Conservative  1850585.81        0.331       0.606   
2  2020-03-31      PF_001  Conservative  1825414.18        0.348       0.589   
3  2020-04-30      PF_001  Conservative  1786687.35        0.352       0.588   
4  2020-05-31      PF_001  Conservative  1854394.63        0.355       0.583   

   CashWeight  MonthlyReturn  
0       0.064        -0.0228  
1       0.063        -0.0075  
2       0.064        -0.0136  
3       0.060        -0.0212  
4       0.062         0.0379  

Data Quality Checks:
✅ No NULL values in sample

Data types:
  Date: object
  PortfolioID: object
  RiskLevel: object
  TotalValue: float64
  Stoc

Press Enter to continue to next file... 


📄 Previewing: customer_data.csv
Total rows: 10,000
Columns: ['CustomerID', 'Age', 'Income', 'CreditScore', 'AccountAgeDays', 'AccountBalance', 'MonthlyTransactions', 'AvgTransactionAmount', 'NumProducts', 'HasLoan', 'RiskSegment']

First 5 rows:
    CustomerID  Age    Income  CreditScore  AccountAgeDays  AccountBalance  \
0  CUST_000001   18  46881.45          712            1858         6722.32   
1  CUST_000002   28  32164.17          570             473         6837.84   
2  CUST_000003   28  80756.05          822            1470         7014.52   
3  CUST_000004   18  55980.56          708            1204         3961.80   
4  CUST_000005   18  28709.23          476             902          554.23   

   MonthlyTransactions  AvgTransactionAmount  NumProducts  HasLoan RiskSegment  
0                   31                 20.54            1    False      Medium  
1                   33                358.30            1    False        High  
2                   45                 53.

Press Enter to continue to next file... 


Importing Stock Price Data
==================================

Let's start with stock prices - our most important dataset.
We'll import it step by step with proper error handling.

Key considerations:
1. Date formatting (pandas → PostgreSQL)
2. Handling duplicates
3. Transaction management
4. Progress tracking


In [8]:
from sqlalchemy import create_engine

def import_stock_prices(config, data_dir='mock_financial_data', replace_existing=False):
    """
    Import stock price data with proper duplicate handling.
    
    Args:
        config: Database configuration object
        data_dir: Directory containing CSV files
        replace_existing: If True, clear existing data before import
    """
    csv_file = 'stock_prices.csv'
    filepath = os.path.join(data_dir, csv_file)
    
    print(f"📈 Importing Stock Price Data")
    print("=" * 60)
    
    try:
        # Step 1: Load the CSV file
        print("Step 1: Loading CSV file...")
        df = pd.read_csv(filepath)
        print(f"✅ Loaded {len(df):,} rows")
        
        # Step 2: Data preprocessing
        print("\nStep 2: Preprocessing data...")
        
        # Convert date column to datetime
        df['date'] = pd.to_datetime(df['Date'])
        df = df.drop('Date', axis=1)  # Remove original column
        
        # Ensure column names match database (lowercase)
        df.columns = [col.lower() for col in df.columns]
        
        # Check for duplicates in the CSV
        duplicates = df.duplicated(subset=['date', 'symbol'])
        if duplicates.any():
            print(f"⚠️ Found {duplicates.sum()} duplicate rows in CSV")
            df = df[~duplicates]
            print(f"✅ Removed duplicates, {len(df):,} rows remaining")
        
        # Step 3: Data validation
        print("\nStep 3: Validating data...")
        
        # Check OHLC logic (High >= Low, etc.)
        invalid_ohlc = df[
            (df['high'] < df['low']) |
            (df['high'] < df['open']) |
            (df['high'] < df['close']) |
            (df['low'] > df['open']) |
            (df['low'] > df['close'])
        ]
        
        if len(invalid_ohlc) > 0:
            print(f"⚠️ Found {len(invalid_ohlc)} rows with invalid OHLC")
            print("First few invalid rows:")
            print(invalid_ohlc.head())
        else:
            print("✅ All OHLC values are valid")
        
        # Check for negative prices
        negative_prices = df[(df[['open', 'high', 'low', 'close']] < 0).any(axis=1)]
        if len(negative_prices) > 0:
            print(f"⚠️ Found {len(negative_prices)} rows with negative prices")
        else:
            print("✅ No negative prices found")
        
        # Step 4: Handle existing data
        print(f"\nStep 4: Checking for existing data...")
        
        # Connect to database to check existing records
        conn = psycopg2.connect(**config.get_connection_params())
        cur = conn.cursor()
        
        # Check if table exists and has data
        cur.execute("""
            SELECT COUNT(*) FROM stock_prices
        """)
        existing_count = cur.fetchone()[0]
        
        if existing_count > 0:
            print(f"📊 Found {existing_count:,} existing records in database")
            
            if replace_existing:
                print("🗑️ Clearing existing data (replace_existing=True)...")
                cur.execute("DELETE FROM stock_prices")
                conn.commit()
                print("✅ Existing data cleared")
            else:
                print("🔍 Checking for overlapping data...")
                
                # Get existing date/symbol combinations
                cur.execute("""
                    SELECT DISTINCT date, symbol 
                    FROM stock_prices
                """)
                existing_combinations = set(cur.fetchall())
                
                # Filter out records that already exist
                df['temp_tuple'] = list(zip(df['date'].dt.date, df['symbol']))
                df_filtered = df[~df['temp_tuple'].isin(existing_combinations)]
                df_filtered = df_filtered.drop('temp_tuple', axis=1)
                
                if len(df_filtered) < len(df):
                    skipped = len(df) - len(df_filtered)
                    print(f"⏭️ Skipping {skipped:,} records that already exist")
                    print(f"📥 Will import {len(df_filtered):,} new records")
                    df = df_filtered
                else:
                    print("✅ No overlapping data found")
        
        cur.close()
        conn.close()
        
        # Skip import if no new data
        if len(df) == 0:
            print("ℹ️ No new data to import")
            return True
        
        # Step 5: Import to database
        print(f"\nStep 5: Importing {len(df):,} records to PostgreSQL...")
        
        # Create SQLAlchemy engine
        engine = create_engine(config.get_connection_string())
        
        # Import in chunks for better performance
        chunk_size = 5000
        total_chunks = (len(df) // chunk_size) + 1
        
        for i in range(0, len(df), chunk_size):
            chunk = df.iloc[i:i+chunk_size]
            
            try:
                chunk.to_sql(
                    'stock_prices',
                    engine,
                    if_exists='append',
                    index=False,
                    method='multi'
                )
                
                # Progress update
                current_chunk = (i // chunk_size) + 1
                progress = (i + len(chunk)) / len(df) * 100
                print(f"  Chunk {current_chunk}/{total_chunks}: {progress:.1f}% complete")
                
            except Exception as chunk_error:
                print(f"❌ Error in chunk {current_chunk}: {chunk_error}")
                # Continue with next chunk or handle as needed
                continue
        
        print(f"\n✅ Successfully imported {len(df):,} stock price records")
        
        # Step 6: Verify import
        print("\nStep 6: Verifying import...")
        
        conn = psycopg2.connect(**config.get_connection_params())
        cur = conn.cursor()
        
        # Count total rows
        cur.execute("SELECT COUNT(*) FROM stock_prices")
        db_count = cur.fetchone()[0]
        
        # Get date range and symbol count
        cur.execute("""
            SELECT MIN(date), MAX(date), COUNT(DISTINCT symbol)
            FROM stock_prices
        """)
        min_date, max_date, symbol_count = cur.fetchone()
        
        print(f"Database summary:")
        print(f"  Total rows: {db_count:,}")
        print(f"  Date range: {min_date} to {max_date}")
        print(f"  Unique symbols: {symbol_count}")
        
        # Sample recent data
        print("\nSample recent data from database:")
        cur.execute("""
            SELECT date, symbol, open, high, low, close, volume
            FROM stock_prices
            ORDER BY date DESC, symbol
            LIMIT 5
        """)
        
        for row in cur.fetchall():
            print(f"  {row}")
            
        # Data quality check
        print("\nData quality verification:")
        cur.execute("""
            SELECT 
                COUNT(*) as total_records,
                COUNT(CASE WHEN volume = 0 THEN 1 END) as zero_volume,
                COUNT(CASE WHEN high < low THEN 1 END) as invalid_ohlc
            FROM stock_prices
        """)
        total_records, zero_volume, invalid_ohlc = cur.fetchone()
        
        print(f"  Total records: {total_records:,}")
        print(f"  Zero volume records: {zero_volume:,}")
        print(f"  Invalid OHLC records: {invalid_ohlc:,}")
        
        cur.close()
        conn.close()
        
        return True
        
    except Exception as e:
        print(f"\n❌ Import failed: {e}")
        print("\nTroubleshooting steps:")
        print("1. Check if stock_prices table exists")
        print("2. Verify column names match database schema")
        print("3. Check date format compatibility")
        print("4. Ensure sufficient disk space")
        print("5. Try with replace_existing=True to clear existing data")
        return False

# Usage examples:

# Option 1: Skip existing records (default behavior)
print("🔄 Importing stock prices (skip existing)...")
if import_stock_prices(db_config):
    print("\n🎉 Stock price import complete!")
else:
    print("\n⚠️ Please check stock import errors")

# Option 2: Replace all existing data
print("\n🔄 Alternative: Replace existing data...")
print("Uncomment the line below if you want to replace all existing data:")
print("# import_stock_prices(db_config, replace_existing=True)")

# Quick verification query
def verify_stock_data(config):
    """Quick verification of imported stock data"""
    try:
        conn = psycopg2.connect(**config.get_connection_params())
        cur = conn.cursor()
        
        print("\n🔍 Quick Data Verification:")
        print("-" * 40)
        
        # Basic stats
        cur.execute("""
            SELECT 
                COUNT(*) as records,
                COUNT(DISTINCT symbol) as symbols,
                MIN(date) as first_date,
                MAX(date) as last_date,
                ROUND(AVG(close), 2) as avg_close_price
            FROM stock_prices
        """)
        
        result = cur.fetchone()
        print(f"Records: {result[0]:,}")
        print(f"Symbols: {result[1]}")
        print(f"Date range: {result[2]} to {result[3]}")
        print(f"Average closing price: ${result[4]}")
        
        cur.close()
        conn.close()
        return True
        
    except Exception as e:
        print(f"Verification failed: {e}")
        return False

# Run verification
verify_stock_data(db_config)

🔄 Importing stock prices (skip existing)...
📈 Importing Stock Price Data
Step 1: Loading CSV file...
✅ Loaded 26,100 rows

Step 2: Preprocessing data...

Step 3: Validating data...
✅ All OHLC values are valid
✅ No negative prices found

Step 4: Checking for existing data...
📊 Found 26,100 existing records in database
🔍 Checking for overlapping data...
⏭️ Skipping 26,100 records that already exist
📥 Will import 0 new records
ℹ️ No new data to import

🎉 Stock price import complete!

🔄 Alternative: Replace existing data...
Uncomment the line below if you want to replace all existing data:
# import_stock_prices(db_config, replace_existing=True)

🔍 Quick Data Verification:
----------------------------------------
Records: 26,100
Symbols: 20
Date range: 2020-01-01 to 2024-12-31
Average closing price: $310.35


True

Importing Cryptocurrency Price Data
==========================================

Cryptocurrency data has special characteristics:
1. Timestamp instead of date (24/7 trading)
2. Higher price precision (8 decimal places)
3. More volatile (larger price swings)
4. Different symbols (BTC, ETH vs AAPL, GOOGL)

Let's handle these differences properly.

In [9]:
def import_crypto_prices(config, data_dir='mock_financial_data', replace_existing=False):
    """
    Import cryptocurrency price data with proper duplicate handling
    for timestamps and high precision values.
    
    Args:
        config: Database configuration object
        data_dir: Directory containing CSV files
        replace_existing: If True, clear existing data before import
    """
    csv_file = 'crypto_prices.csv'
    filepath = os.path.join(data_dir, csv_file)
    
    print(f"💎 Importing Cryptocurrency Price Data")
    print("=" * 60)
    
    try:
        # Step 1: Load CSV
        print("Step 1: Loading crypto data...")
        df = pd.read_csv(filepath)
        print(f"✅ Loaded {len(df):,} rows")
        
        # Show sample to understand structure
        print("\nSample data:")
        print(df.head(3))
        
        # Step 2: Preprocessing
        print("\nStep 2: Preprocessing crypto data...")
        
        # Convert timestamp and ensure lowercase columns
        df['timestamp'] = pd.to_datetime(df['Timestamp'])
        df = df.drop('Timestamp', axis=1)
        df.columns = [col.lower() for col in df.columns]
        
        # Check for duplicates in CSV
        csv_duplicates = df.duplicated(subset=['timestamp', 'symbol'])
        if csv_duplicates.any():
            print(f"⚠️ Found {csv_duplicates.sum()} duplicate rows in CSV")
            df = df[~csv_duplicates]
            print(f"✅ Removed CSV duplicates, {len(df):,} rows remaining")
        
        # Check timestamp frequency (should be 6-hourly)
        time_diffs = df.groupby('symbol')['timestamp'].diff()
        most_common_diff = time_diffs.mode()
        if len(most_common_diff) > 0:
            print(f"Most common time interval: {most_common_diff[0]}")
        
        # Step 3: Handle existing data
        print(f"\nStep 3: Checking for existing data...")
        
        conn = psycopg2.connect(**config.get_connection_params())
        cur = conn.cursor()
        
        # Check if table exists and has data
        try:
            cur.execute("SELECT COUNT(*) FROM crypto_prices")
            existing_count = cur.fetchone()[0]
        except psycopg2.Error:
            existing_count = 0
            print("ℹ️ crypto_prices table doesn't exist yet")
        
        if existing_count > 0:
            print(f"📊 Found {existing_count:,} existing crypto records in database")
            
            if replace_existing:
                print("🗑️ Clearing existing crypto data (replace_existing=True)...")
                cur.execute("DELETE FROM crypto_prices")
                conn.commit()
                print("✅ Existing crypto data cleared")
            else:
                print("🔍 Checking for overlapping crypto data...")
                
                # Get existing timestamp/symbol combinations
                cur.execute("""
                    SELECT DISTINCT timestamp, symbol 
                    FROM crypto_prices
                """)
                existing_combinations = set(cur.fetchall())
                
                # Filter out records that already exist
                df['temp_tuple'] = list(zip(df['timestamp'], df['symbol']))
                df_filtered = df[~df['temp_tuple'].isin(existing_combinations)]
                df_filtered = df_filtered.drop('temp_tuple', axis=1)
                
                if len(df_filtered) < len(df):
                    skipped = len(df) - len(df_filtered)
                    print(f"⏭️ Skipping {skipped:,} crypto records that already exist")
                    print(f"📥 Will import {len(df_filtered):,} new crypto records")
                    df = df_filtered
                else:
                    print("✅ No overlapping crypto data found")
        
        cur.close()
        conn.close()
        
        # Skip import if no new data
        if len(df) == 0:
            print("ℹ️ No new crypto data to import")
            return True
        
        # Step 4: Crypto-specific validation
        print(f"\nStep 4: Crypto-specific validation on {len(df):,} records...")
        
        # Check for extreme volatility (>50% in 6 hours)
        df['price_change_pct'] = df.groupby('symbol')['close'].pct_change()
        extreme_moves = df[df['price_change_pct'].abs() > 0.5]
        
        if len(extreme_moves) > 0:
            print(f"⚠️ Found {len(extreme_moves)} extreme price moves (>50% in 6 hours)")
            # Show top 3 extreme moves
            top_moves = extreme_moves.nlargest(3, 'price_change_pct')[['timestamp', 'symbol', 'close', 'price_change_pct']]
            for _, row in top_moves.iterrows():
                print(f"   {row['symbol']} on {row['timestamp']}: {row['price_change_pct']:.1%} change")
        else:
            print("✅ No extreme price movements detected")
        
        # Remove temporary column
        df = df.drop('price_change_pct', axis=1)
        
        # Check price precision requirements
        print("\nPrice precision analysis:")
        precision_check = []
        for symbol in df['symbol'].unique()[:5]:  # Check first 5 symbols
            symbol_data = df[df['symbol'] == symbol]
            max_decimals = symbol_data['close'].apply(
                lambda x: len(str(x).split('.')[-1]) if '.' in str(x) else 0
            ).max()
            avg_price = symbol_data['close'].mean()
            precision_check.append((symbol, avg_price, max_decimals))
            print(f"  {symbol}: avg price ${avg_price:.2f}, max decimals: {max_decimals}")
        
        # Step 5: Import to database
        print(f"\nStep 5: Importing {len(df):,} records to PostgreSQL...")
        
        engine = create_engine(config.get_connection_string())
        
        # Import in chunks (crypto data can be large)
        chunk_size = 10000
        total_rows = len(df)
        total_chunks = (total_rows // chunk_size) + 1
        
        for i in range(0, total_rows, chunk_size):
            chunk = df.iloc[i:i+chunk_size]
            
            try:
                chunk.to_sql(
                    'crypto_prices',
                    engine,
                    if_exists='append',
                    index=False,
                    method='multi'
                )
                
                # Progress bar
                progress = min((i + len(chunk)) / total_rows * 100, 100)
                bar_length = 30
                filled = int(bar_length * progress / 100)
                bar = '█' * filled + '░' * (bar_length - filled)
                current_chunk = (i // chunk_size) + 1
                print(f"\r  Progress: [{bar}] {progress:.1f}% (Chunk {current_chunk}/{total_chunks})", end='')
                
            except Exception as chunk_error:
                print(f"\n❌ Error in chunk {current_chunk}: {chunk_error}")
                continue
        
        print(f"\n✅ Successfully imported {len(df):,} crypto price records")
        
        # Step 6: Verification and analysis
        print("\nStep 6: Verification and analysis...")
        
        conn = psycopg2.connect(**config.get_connection_params())
        cur = conn.cursor()
        
        # Get summary statistics by symbol
        cur.execute("""
            SELECT 
                symbol,
                COUNT(*) as data_points,
                MIN(timestamp) as first_data,
                MAX(timestamp) as last_data,
                ROUND(AVG(close), 2) as avg_price,
                ROUND(STDDEV(close), 2) as price_volatility,
                MIN(close) as min_price,
                MAX(close) as max_price
            FROM crypto_prices
            GROUP BY symbol
            ORDER BY avg_price DESC
            LIMIT 10
        """)
        
        print("\nCrypto Summary by Symbol (Top 10 by Avg Price):")
        print(f"{'Symbol':>6} {'Points':>7} {'First Data':>12} {'Last Data':>12} {'Avg Price':>12} {'Volatility':>12}")
        print("-" * 85)
        
        for row in cur.fetchall():
            symbol, points, first, last, avg_price, volatility, min_price, max_price = row
            first_str = first.strftime('%Y-%m-%d') if first else 'N/A'
            last_str = last.strftime('%Y-%m-%d') if last else 'N/A'
            print(f"{symbol:>6} {points:>7,} {first_str:>12} {last_str:>12} ${avg_price:>11,.2f} ${volatility or 0:>11,.2f}")
        
        # Additional crypto-specific metrics
        print("\nCrypto Market Analysis:")
        
        # 24/7 trading verification
        cur.execute("""
            SELECT 
                COUNT(*) as total_records,
                COUNT(DISTINCT DATE(timestamp)) as unique_days,
                COUNT(DISTINCT EXTRACT(hour FROM timestamp)) as unique_hours
            FROM crypto_prices
        """)
        total_records, unique_days, unique_hours = cur.fetchone()
        
        print(f"  Total records: {total_records:,}")
        print(f"  Unique trading days: {unique_days:,}")
        print(f"  Unique hours of day: {unique_hours} (24/7 trading verification)")
        
        # Price range analysis
        cur.execute("""
            SELECT 
                symbol,
                MIN(close) as min_price,
                MAX(close) as max_price,
                ROUND((MAX(close) / MIN(close) - 1) * 100, 1) as total_return_pct
            FROM crypto_prices
            GROUP BY symbol
            ORDER BY total_return_pct DESC
            LIMIT 5
        """)
        
        print("\nTop 5 Crypto Total Returns (Min to Max Price):")
        for row in cur.fetchall():
            symbol, min_price, max_price, total_return = row
            print(f"  {symbol}: {total_return}% (${min_price:.2f} → ${max_price:.2f})")
        
        cur.close()
        conn.close()
        
        return True
        
    except Exception as e:
        print(f"\n❌ Crypto import failed: {e}")
        print("\nTroubleshooting steps:")
        print("1. Check if crypto_prices table exists with correct schema")
        print("2. Verify timestamp format compatibility")
        print("3. Check for sufficient disk space")
        print("4. Try with replace_existing=True to clear existing data")
        print("5. Verify high-precision numeric fields can handle crypto prices")
        return False

# Usage examples:

# Option 1: Skip existing records (default behavior)
print("🔄 Importing crypto prices (skip existing)...")
if import_crypto_prices(db_config):
    print("\n🎉 Crypto price import complete!")
else:
    print("\n⚠️ Please check crypto import errors")

# Option 2: Replace all existing data  
print("\n🔄 Alternative: Replace existing crypto data...")
print("Uncomment the line below if you want to replace all existing crypto data:")
print("# import_crypto_prices(db_config, replace_existing=True)")

# Quick crypto-specific verification
def verify_crypto_data(config):
    """Quick verification of imported crypto data with crypto-specific checks"""
    try:
        conn = psycopg2.connect(**config.get_connection_params())
        cur = conn.cursor()
        
        print("\n🔍 Quick Crypto Data Verification:")
        print("-" * 45)
        
        # Basic crypto stats
        cur.execute("""
            SELECT 
                COUNT(*) as total_records,
                COUNT(DISTINCT symbol) as unique_symbols,
                MIN(timestamp) as first_timestamp,
                MAX(timestamp) as last_timestamp,
                COUNT(DISTINCT DATE(timestamp)) as trading_days,
                ROUND(AVG(close), 4) as avg_price_all_cryptos
            FROM crypto_prices
        """)
        
        result = cur.fetchone()
        print(f"Total Records: {result[0]:,}")
        print(f"Unique Symbols: {result[1]}")
        print(f"Time Range: {result[2]} to {result[3]}")
        print(f"Trading Days: {result[4]:,}")
        print(f"Average Price (All Cryptos): ${result[5]}")
        
        # Check 24/7 trading pattern
        cur.execute("""
            SELECT EXTRACT(hour FROM timestamp) as hour, COUNT(*) as records
            FROM crypto_prices
            GROUP BY EXTRACT(hour FROM timestamp)
            ORDER BY hour
        """)
        
        hourly_data = cur.fetchall()
        print(f"\n24/7 Trading Verification:")
        print(f"Hours with data: {len(hourly_data)}/24")
        if len(hourly_data) == 24:
            print("✅ Confirmed 24/7 trading data")
        else:
            print("⚠️ Missing some hourly data")
        
        cur.close()
        conn.close()
        return True
        
    except Exception as e:
        print(f"Crypto verification failed: {e}")
        return False

# Run crypto verification
verify_crypto_data(db_config)

🔄 Importing crypto prices (skip existing)...
💎 Importing Cryptocurrency Price Data
Step 1: Loading crypto data...
✅ Loaded 73,050 rows

Sample data:
             Timestamp Symbol      Open      High       Low     Close  Volume
0  2020-01-01 00:00:00    BTC  55610.37  61577.10  55610.37  60842.74   57397
1  2020-01-01 06:00:00    BTC  58654.92  61403.00  58654.92  61398.09  178177
2  2020-01-01 12:00:00    BTC  60507.59  60507.59  59462.07  59701.21   59879

Step 2: Preprocessing crypto data...
Most common time interval: 0 days 06:00:00

Step 3: Checking for existing data...
📊 Found 73,050 existing crypto records in database
🔍 Checking for overlapping crypto data...
⏭️ Skipping 73,050 crypto records that already exist
📥 Will import 0 new crypto records
ℹ️ No new crypto data to import

🎉 Crypto price import complete!

🔄 Alternative: Replace existing crypto data...
Uncomment the line below if you want to replace all existing crypto data:
# import_crypto_prices(db_config, replace_existing=

True

 Importing Economic Indicators
=====================================

Economic indicators are different from price data:
1. Monthly frequency (not daily)
2. Different units (%, billions, index points)
3. Some can be negative (trade balance)
4. Wide value ranges (0.5% to millions)

Understanding the indicators:
- GDP_GROWTH: Quarterly GDP growth rate (%)
- INFLATION_RATE: Monthly CPI change (%)
- UNEMPLOYMENT_RATE: Monthly unemployment (%)
- INTEREST_RATE: Federal funds rate (%)
- etc.

In [10]:
def import_economic_indicators(config, data_dir='mock_financial_data', replace_existing=False):
    """
    Import economic indicators with proper unit handling and duplicate management.
    
    Args:
        config: Database configuration object
        data_dir: Directory containing CSV files
        replace_existing: If True, clear existing data before import
    """
    csv_file = 'economic_indicators.csv'
    filepath = os.path.join(data_dir, csv_file)
    
    print(f"🏛️ Importing Economic Indicators")
    print("=" * 60)
    
    try:
        # Step 1: Load data
        print("Step 1: Loading economic data...")
        df = pd.read_csv(filepath)
        print(f"✅ Loaded {len(df):,} rows")
        
        # Understand the structure
        print("\nEconomic indicators overview:")
        indicators = df['Indicator'].unique()
        for ind in indicators:
            ind_data = df[df['Indicator'] == ind]
            print(f"  {ind}: {len(ind_data)} observations, "
                  f"range: {ind_data['Value'].min():.2f} to {ind_data['Value'].max():.2f}")
        
        # Step 2: Preprocessing
        print("\nStep 2: Preprocessing...")
        df['date'] = pd.to_datetime(df['Date'])
        df = df.drop('Date', axis=1)
        df.columns = [col.lower() for col in df.columns]
        
        # Check for CSV duplicates
        csv_duplicates = df.duplicated(subset=['date', 'indicator'])
        if csv_duplicates.any():
            print(f"⚠️ Found {csv_duplicates.sum()} duplicate rows in CSV")
            df = df[~csv_duplicates]
            print(f"✅ Removed CSV duplicates, {len(df):,} rows remaining")
        
        # Add unit information based on indicator type
        unit_mapping = {
            'GDP_GROWTH': '%',
            'INFLATION_RATE': '%', 
            'UNEMPLOYMENT_RATE': '%',
            'INTEREST_RATE': '%',
            'CONSUMER_CONFIDENCE': 'index',
            'RETAIL_SALES': '%',
            'INDUSTRIAL_PRODUCTION': '%',
            'HOUSING_STARTS': 'thousands',
            'TRADE_BALANCE': 'millions USD',
            'MONEY_SUPPLY': 'billions USD'
        }
        
        df['unit'] = df['indicator'].map(unit_mapping)
        
        # Step 3: Handle existing data
        print(f"\nStep 3: Checking for existing economic data...")
        
        conn = psycopg2.connect(**config.get_connection_params())
        cur = conn.cursor()
        
        # Check if table exists and has data
        try:
            cur.execute("SELECT COUNT(*) FROM economic_indicators")
            existing_count = cur.fetchone()[0]
        except psycopg2.Error:
            existing_count = 0
            print("ℹ️ economic_indicators table doesn't exist yet")
        
        if existing_count > 0:
            print(f"📊 Found {existing_count:,} existing economic indicator records")
            
            if replace_existing:
                print("🗑️ Clearing existing economic data (replace_existing=True)...")
                cur.execute("DELETE FROM economic_indicators")
                conn.commit()
                print("✅ Existing economic data cleared")
            else:
                print("🔍 Checking for overlapping economic data...")
                
                # Get existing date/indicator combinations
                cur.execute("""
                    SELECT DISTINCT date, indicator 
                    FROM economic_indicators
                """)
                existing_combinations = set(cur.fetchall())
                
                # Filter out records that already exist
                df['temp_tuple'] = list(zip(df['date'].dt.date, df['indicator']))
                df_filtered = df[~df['temp_tuple'].isin(existing_combinations)]
                df_filtered = df_filtered.drop('temp_tuple', axis=1)
                
                if len(df_filtered) < len(df):
                    skipped = len(df) - len(df_filtered)
                    print(f"⏭️ Skipping {skipped:,} economic records that already exist")
                    print(f"📥 Will import {len(df_filtered):,} new economic records")
                    df = df_filtered
                else:
                    print("✅ No overlapping economic data found")
        
        cur.close()
        conn.close()
        
        # Skip import if no new data
        if len(df) == 0:
            print("ℹ️ No new economic data to import")
            return True
        
        # Step 4: Economic validation
        print(f"\nStep 4: Economic validation on {len(df):,} records...")
        
        # Check for unrealistic values with economic reasoning
        validations = {
            'GDP_GROWTH': (-10, 20, "GDP growth outside normal recession to boom range"),
            'INFLATION_RATE': (-5, 30, "Inflation outside deflation to hyperinflation range"), 
            'UNEMPLOYMENT_RATE': (0, 30, "Unemployment outside 0% to Great Depression levels"),
            'INTEREST_RATE': (-2, 20, "Interest rates outside negative to very high range"),
            'CONSUMER_CONFIDENCE': (0, 200, "Consumer confidence outside typical index range"),
            'RETAIL_SALES': (-20, 30, "Retail sales growth outside realistic range"),
            'INDUSTRIAL_PRODUCTION': (-30, 30, "Industrial production outside recession/boom range")
        }
        
        validation_issues = 0
        for indicator, (min_val, max_val, description) in validations.items():
            if indicator in df['indicator'].values:
                ind_data = df[df['indicator'] == indicator]
                out_of_range = ind_data[(ind_data['value'] < min_val) | (ind_data['value'] > max_val)]
                if len(out_of_range) > 0:
                    print(f"⚠️ {indicator}: {len(out_of_range)} values outside expected range [{min_val}, {max_val}]")
                    print(f"   Context: {description}")
                    validation_issues += 1
        
        if validation_issues == 0:
            print("✅ All economic indicators within expected ranges")
        
        # Economic correlation checks
        print("\nEconomic relationship validation:")
        if 'UNEMPLOYMENT_RATE' in df['indicator'].values and 'INFLATION_RATE' in df['indicator'].values:
            # Phillips Curve relationship check
            unemployment = df[df['indicator'] == 'UNEMPLOYMENT_RATE'].set_index('date')['value']
            inflation = df[df['indicator'] == 'INFLATION_RATE'].set_index('date')['value']
            
            # Align dates and calculate correlation
            common_dates = unemployment.index.intersection(inflation.index)
            if len(common_dates) > 10:
                corr = unemployment[common_dates].corr(inflation[common_dates])
                print(f"  Phillips Curve check (Unemployment vs Inflation): correlation = {corr:.3f}")
                if corr < -0.1:
                    print("  ✅ Negative correlation detected (consistent with Phillips Curve)")
                else:
                    print("  ℹ️ Correlation not strongly negative (may indicate supply shocks or stagflation)")
        
        # Step 5: Import to database
        print(f"\nStep 5: Importing {len(df):,} records to PostgreSQL...")
        
        engine = create_engine(config.get_connection_string())
        
        try:
            df.to_sql(
                'economic_indicators',
                engine,
                if_exists='append',
                index=False,
                method='multi'
            )
            
            print(f"✅ Successfully imported {len(df):,} economic indicator records")
            
        except Exception as import_error:
            print(f"❌ Import error: {import_error}")
            return False
        
        # Step 6: Verify and create analytical views
        print("\nStep 6: Creating economic summary views...")
        
        conn = psycopg2.connect(**config.get_connection_params())
        cur = conn.cursor()
        
        # Create a pivot view for easier analysis
        cur.execute("""
            CREATE OR REPLACE VIEW economic_indicators_pivot AS
            SELECT 
                date,
                MAX(CASE WHEN indicator = 'GDP_GROWTH' THEN value END) as gdp_growth,
                MAX(CASE WHEN indicator = 'INFLATION_RATE' THEN value END) as inflation_rate,
                MAX(CASE WHEN indicator = 'UNEMPLOYMENT_RATE' THEN value END) as unemployment_rate,
                MAX(CASE WHEN indicator = 'INTEREST_RATE' THEN value END) as interest_rate,
                MAX(CASE WHEN indicator = 'CONSUMER_CONFIDENCE' THEN value END) as consumer_confidence,
                MAX(CASE WHEN indicator = 'RETAIL_SALES' THEN value END) as retail_sales,
                MAX(CASE WHEN indicator = 'INDUSTRIAL_PRODUCTION' THEN value END) as industrial_production
            FROM economic_indicators
            GROUP BY date
            ORDER BY date DESC;
        """)
        
        print("✅ Created pivot view for easier economic analysis")
        
        # Create economic cycle indicator view
        cur.execute("""
            CREATE OR REPLACE VIEW economic_cycle_indicators AS
            SELECT 
                date,
                gdp_growth,
                unemployment_rate,
                inflation_rate,
                CASE 
                    WHEN gdp_growth > 3 AND unemployment_rate < 5 THEN 'Expansion'
                    WHEN gdp_growth < 0 THEN 'Recession' 
                    WHEN unemployment_rate > 8 THEN 'Recovery'
                    ELSE 'Normal'
                END as economic_cycle_phase
            FROM economic_indicators_pivot
            WHERE gdp_growth IS NOT NULL AND unemployment_rate IS NOT NULL;
        """)
        
        print("✅ Created economic cycle phase indicator view")
        
        # Show recent economic snapshot
        cur.execute("""
            SELECT 
                date,
                gdp_growth,
                inflation_rate, 
                unemployment_rate,
                interest_rate,
                consumer_confidence
            FROM economic_indicators_pivot
            ORDER BY date DESC
            LIMIT 6
        """)
        
        print("\nRecent Economic Snapshot (Last 6 Months):")
        print(f"{'Date':<12} {'GDP%':<6} {'Infl%':<6} {'Unemp%':<6} {'IntR%':<6} {'ConConf':<7}")
        print("-" * 50)
        
        for row in cur.fetchall():
            date_str = str(row[0])[:10] if row[0] else "N/A"
            gdp = f"{row[1]:.1f}" if row[1] is not None else "N/A"
            inf = f"{row[2]:.1f}" if row[2] is not None else "N/A" 
            une = f"{row[3]:.1f}" if row[3] is not None else "N/A"
            int_r = f"{row[4]:.1f}" if row[4] is not None else "N/A"
            conf = f"{row[5]:.0f}" if row[5] is not None else "N/A"
            
            print(f"{date_str:<12} {gdp:<6} {inf:<6} {une:<6} {int_r:<6} {conf:<7}")
        
        # Economic summary statistics
        cur.execute("""
            SELECT 
                COUNT(DISTINCT date) as months_of_data,
                COUNT(DISTINCT indicator) as unique_indicators,
                MIN(date) as first_month,
                MAX(date) as last_month
            FROM economic_indicators
        """)
        
        months_data, unique_indicators, first_month, last_month = cur.fetchone()
        
        print(f"\nEconomic Data Summary:")
        print(f"  Months of data: {months_data}")
        print(f"  Unique indicators: {unique_indicators}")
        print(f"  Date range: {first_month} to {last_month}")
        
        cur.close()
        conn.close()
        
        return True
        
    except Exception as e:
        print(f"\n❌ Economic indicators import failed: {e}")
        print("\nTroubleshooting steps:")
        print("1. Check if economic_indicators table exists with correct schema")
        print("2. Verify date format compatibility")
        print("3. Check indicator name spelling and case sensitivity")
        print("4. Try with replace_existing=True to clear existing data")
        print("5. Verify numeric fields can handle economic indicator ranges")
        return False

# Usage examples:

# Option 1: Skip existing records (default behavior)
print("🔄 Importing economic indicators (skip existing)...")
if import_economic_indicators(db_config):
    print("\n🎉 Economic indicators import complete!")
else:
    print("\n⚠️ Please check economic import errors")

# Option 2: Replace all existing data
print("\n🔄 Alternative: Replace existing economic data...")
print("Uncomment the line below if you want to replace all existing economic data:")
print("# import_economic_indicators(db_config, replace_existing=True)")

# Economic-specific verification function
def verify_economic_data(config):
    """Verify economic data with safe view handling and macroeconomic relationship checks"""
    try:
        conn = psycopg2.connect(**config.get_connection_params())
        cur = conn.cursor()
        
        print("\n🔍 Economic Data Verification:")
        print("-" * 40)
        
        # Basic economic stats
        cur.execute("""
            SELECT 
                COUNT(*) as total_records,
                COUNT(DISTINCT indicator) as unique_indicators,
                COUNT(DISTINCT date) as months_of_data,
                MIN(date) as first_date,
                MAX(date) as last_date
            FROM economic_indicators
        """)
        
        result = cur.fetchone()
        print(f"Total Records: {result[0]:,}")
        print(f"Unique Indicators: {result[1]}")
        print(f"Months of Data: {result[2]}")
        print(f"Date Range: {result[3]} to {result[4]}")
        
        # Check if pivot view exists and has data
        try:
            cur.execute("""
                SELECT COUNT(*) FROM economic_indicators_pivot 
                WHERE gdp_growth IS NOT NULL AND unemployment_rate IS NOT NULL
            """)
            pivot_count = cur.fetchone()[0]
            
            if pivot_count > 0:
                print(f"\nPivot View Status: ✅ {pivot_count} complete records")
                
                # Try to check if cycle indicators view exists
                try:
                    cur.execute("""
                        SELECT 
                            economic_cycle_phase,
                            COUNT(*) as months
                        FROM economic_cycle_indicators
                        GROUP BY economic_cycle_phase
                        ORDER BY months DESC
                    """)
                    
                    print(f"\nEconomic Cycle Distribution:")
                    cycle_data = cur.fetchall()
                    for phase, months in cycle_data:
                        print(f"  {phase}: {months} months")
                        
                except psycopg2.Error:
                    # Create the view manually if it doesn't exist
                    print(f"\n📊 Creating economic cycle indicators view...")
                    
                    cur.execute("""
                        CREATE OR REPLACE VIEW economic_cycle_indicators AS
                        SELECT 
                            date,
                            gdp_growth,
                            unemployment_rate,
                            inflation_rate,
                            CASE 
                                WHEN gdp_growth > 3 AND unemployment_rate < 5 THEN 'Expansion'
                                WHEN gdp_growth < 0 THEN 'Recession' 
                                WHEN unemployment_rate > 8 THEN 'Recovery'
                                ELSE 'Normal'
                            END as economic_cycle_phase
                        FROM economic_indicators_pivot
                        WHERE gdp_growth IS NOT NULL AND unemployment_rate IS NOT NULL;
                    """)
                    
                    conn.commit()
                    print("✅ Economic cycle view created")
                    
                    # Now get the cycle distribution
                    cur.execute("""
                        SELECT 
                            economic_cycle_phase,
                            COUNT(*) as months
                        FROM economic_cycle_indicators
                        GROUP BY economic_cycle_phase
                        ORDER BY months DESC
                    """)
                    
                    print(f"\nEconomic Cycle Distribution:")
                    cycle_data = cur.fetchall()
                    for phase, months in cycle_data:
                        print(f"  {phase}: {months} months")
                        
            else:
                print(f"\n⚠️ Pivot view exists but lacks complete GDP/unemployment data")
                
        except psycopg2.Error as view_error:
            print(f"\n⚠️ Pivot view issue: {view_error}")
            print("Creating basic economic analysis instead...")
            
            # Fallback: Direct analysis from raw table
            cur.execute("""
                SELECT 
                    indicator,
                    COUNT(*) as observations,
                    ROUND(AVG(value), 2) as avg_value,
                    ROUND(MIN(value), 2) as min_value,
                    ROUND(MAX(value), 2) as max_value
                FROM economic_indicators
                GROUP BY indicator
                ORDER BY indicator
            """)
            
            print(f"\nIndicator Summary:")
            print(f"{'Indicator':<20} {'Obs':<5} {'Avg':<8} {'Min':<8} {'Max':<8}")
            print("-" * 55)
            
            for row in cur.fetchall():
                indicator, obs, avg_val, min_val, max_val = row
                print(f"{indicator:<20} {obs:<5} {avg_val:<8} {min_val:<8} {max_val:<8}")
        
        # Additional validation checks
        print(f"\nData Quality Checks:")
        
        # Check for missing values
        cur.execute("""
            SELECT 
                indicator,
                COUNT(*) as total,
                COUNT(*) - COUNT(value) as missing_values
            FROM economic_indicators
            GROUP BY indicator
            HAVING COUNT(*) - COUNT(value) > 0
        """)
        
        missing_data = cur.fetchall()
        if missing_data:
            print("⚠️ Missing values detected:")
            for indicator, total, missing in missing_data:
                print(f"  {indicator}: {missing}/{total} missing")
        else:
            print("✅ No missing values detected")
        
        # Check data frequency
        cur.execute("""
            SELECT 
                DATE_PART('year', date) as year,
                COUNT(DISTINCT date) as months_per_year
            FROM economic_indicators
            GROUP BY DATE_PART('year', date)
            ORDER BY year
        """)
        
        print(f"\nData Frequency Check:")
        frequency_data = cur.fetchall()
        for year, months in frequency_data:
            expected = 12 if year < 2024 else 12  # Adjust based on current data
            status = "✅" if months >= 10 else "⚠️"  # Allow for some missing months
            print(f"  {int(year)}: {months}/12 months {status}")
        
        cur.close()
        conn.close()
        return True
        
    except Exception as e:
        print(f"Economic verification failed: {e}")
        return False

# Also create a function to ensure all views are properly created
def create_economic_views(config):
    """Ensure all economic analysis views are properly created"""
    try:
        conn = psycopg2.connect(**config.get_connection_params())
        cur = conn.cursor()
        
        print("📊 Creating/updating economic analysis views...")
        
        # Create pivot view
        cur.execute("""
            CREATE OR REPLACE VIEW economic_indicators_pivot AS
            SELECT 
                date,
                MAX(CASE WHEN indicator = 'GDP_GROWTH' THEN value END) as gdp_growth,
                MAX(CASE WHEN indicator = 'INFLATION_RATE' THEN value END) as inflation_rate,
                MAX(CASE WHEN indicator = 'UNEMPLOYMENT_RATE' THEN value END) as unemployment_rate,
                MAX(CASE WHEN indicator = 'INTEREST_RATE' THEN value END) as interest_rate,
                MAX(CASE WHEN indicator = 'CONSUMER_CONFIDENCE' THEN value END) as consumer_confidence,
                MAX(CASE WHEN indicator = 'RETAIL_SALES' THEN value END) as retail_sales,
                MAX(CASE WHEN indicator = 'INDUSTRIAL_PRODUCTION' THEN value END) as industrial_production,
                MAX(CASE WHEN indicator = 'HOUSING_STARTS' THEN value END) as housing_starts,
                MAX(CASE WHEN indicator = 'TRADE_BALANCE' THEN value END) as trade_balance,
                MAX(CASE WHEN indicator = 'MONEY_SUPPLY' THEN value END) as money_supply
            FROM economic_indicators
            GROUP BY date
            ORDER BY date DESC;
        """)
        
        # Create economic cycle view
        cur.execute("""
            CREATE OR REPLACE VIEW economic_cycle_indicators AS
            SELECT 
                date,
                gdp_growth,
                unemployment_rate,
                inflation_rate,
                interest_rate,
                CASE 
                    WHEN gdp_growth > 3 AND unemployment_rate < 5 THEN 'Expansion'
                    WHEN gdp_growth < 0 THEN 'Recession' 
                    WHEN unemployment_rate > 8 THEN 'Recovery'
                    ELSE 'Normal'
                END as economic_cycle_phase
            FROM economic_indicators_pivot
            WHERE gdp_growth IS NOT NULL AND unemployment_rate IS NOT NULL;
        """)
        
        # Create monthly summary view
        cur.execute("""
            CREATE OR REPLACE VIEW economic_monthly_summary AS
            SELECT 
                DATE_TRUNC('month', date) as month,
                COUNT(DISTINCT indicator) as indicators_reported,
                COUNT(*) as total_data_points,
                STRING_AGG(DISTINCT indicator, ', ' ORDER BY indicator) as available_indicators
            FROM economic_indicators
            GROUP BY DATE_TRUNC('month', date)
            ORDER BY month DESC;
        """)
        
        conn.commit()
        
        # Test the views
        cur.execute("SELECT COUNT(*) FROM economic_indicators_pivot")
        pivot_count = cur.fetchone()[0]
        
        cur.execute("SELECT COUNT(*) FROM economic_cycle_indicators")
        cycle_count = cur.fetchone()[0]
        
        print(f"✅ Pivot view: {pivot_count} records")
        print(f"✅ Cycle view: {cycle_count} records") 
        print("✅ All economic views created successfully")
        
        cur.close()
        conn.close()
        return True
        
    except Exception as e:
        print(f"❌ Failed to create economic views: {e}")
        return False

# Run the corrected verification
print("🔧 Creating economic analysis views...")
if create_economic_views(db_config):
    print("\n🔄 Running corrected economic verification...")
    verify_economic_data(db_config)
else:
    print("⚠️ Could not create economic views")

🔄 Importing economic indicators (skip existing)...
🏛️ Importing Economic Indicators
Step 1: Loading economic data...
✅ Loaded 600 rows

Economic indicators overview:
  GDP_GROWTH: 60 observations, range: 1.10 to 2.99
  INFLATION_RATE: 60 observations, range: 0.80 to 2.61
  UNEMPLOYMENT_RATE: 60 observations, range: 4.49 to 6.15
  INTEREST_RATE: 60 observations, range: 0.15 to 1.66
  CONSUMER_CONFIDENCE: 60 observations, range: 96.59 to 123.80
  RETAIL_SALES: 60 observations, range: -1.39 to 1.17
  INDUSTRIAL_PRODUCTION: 60 observations, range: -1.75 to 3.06
  HOUSING_STARTS: 60 observations, range: 907856.01 to 1403145.38
  TRADE_BALANCE: 60 observations, range: -110027.96 to -32348.12
  MONEY_SUPPLY: 60 observations, range: 15369.05 to 20427.64

Step 2: Preprocessing...

Step 3: Checking for existing economic data...
📊 Found 600 existing economic indicator records
🔍 Checking for overlapping economic data...
⏭️ Skipping 600 economic records that already exist
📥 Will import 0 new econom

Importing Portfolio Holdings Data
=========================================

Portfolio data represents:
- Investment accounts with different risk profiles
- Monthly snapshots of holdings
- Asset allocation (stocks, bonds, cash)
- Performance metrics

Key validations:
- Weights must sum to 1.0 (100%)
- Returns should be realistic
- Total value should be positive

In [11]:
def import_portfolio_data(config, data_dir='mock_financial_data', replace_existing=False):
    """
    Import portfolio holdings with allocation validation and duplicate handling.
    
    Args:
        config: Database configuration object
        data_dir: Directory containing CSV files
        replace_existing: If True, clear existing data before import
    """
    csv_file = 'portfolio_data.csv'
    filepath = os.path.join(data_dir, csv_file)
    
    print(f"💼 Importing Portfolio Holdings Data")
    print("=" * 60)
    
    try:
        # Step 1: Load data
        print("Step 1: Loading portfolio data...")
        df = pd.read_csv(filepath)
        print(f"✅ Loaded {len(df):,} rows")
        
        # Step 2: Basic preprocessing
        print("\nStep 2: Preprocessing...")
        df['date'] = pd.to_datetime(df['Date'])
        df = df.drop('Date', axis=1)
        df.columns = [col.lower() for col in df.columns]
        
        # Rename columns to match database schema
        column_mapping = {
            'portfolioid': 'portfolio_id',
            'risklevel': 'risk_level',
            'totalvalue': 'total_value',
            'stockweight': 'stock_weight',
            'bondweight': 'bond_weight',
            'cashweight': 'cash_weight',
            'monthlyreturn': 'monthly_return'
        }
        df = df.rename(columns=column_mapping)
        
        # Check for CSV duplicates
        csv_duplicates = df.duplicated(subset=['date', 'portfolio_id'])
        if csv_duplicates.any():
            print(f"⚠️ Found {csv_duplicates.sum()} duplicate rows in CSV")
            df = df[~csv_duplicates]
            print(f"✅ Removed CSV duplicates, {len(df):,} rows remaining")
        
        # Step 3: Handle existing data
        print(f"\nStep 3: Checking for existing portfolio data...")
        
        conn = psycopg2.connect(**config.get_connection_params())
        cur = conn.cursor()
        
        # Check if table exists and has data
        try:
            cur.execute("SELECT COUNT(*) FROM portfolio_holdings")
            existing_count = cur.fetchone()[0]
        except psycopg2.Error:
            existing_count = 0
            print("ℹ️ portfolio_holdings table doesn't exist yet")
        
        if existing_count > 0:
            print(f"📊 Found {existing_count:,} existing portfolio records")
            
            if replace_existing:
                print("🗑️ Clearing existing portfolio data (replace_existing=True)...")
                cur.execute("DELETE FROM portfolio_holdings")
                conn.commit()
                print("✅ Existing portfolio data cleared")
            else:
                print("🔍 Checking for overlapping portfolio data...")
                
                # Get existing date/portfolio_id combinations
                cur.execute("""
                    SELECT DISTINCT date, portfolio_id 
                    FROM portfolio_holdings
                """)
                existing_combinations = set(cur.fetchall())
                
                # Filter out records that already exist
                df['temp_tuple'] = list(zip(df['date'].dt.date, df['portfolio_id']))
                df_filtered = df[~df['temp_tuple'].isin(existing_combinations)]
                df_filtered = df_filtered.drop('temp_tuple', axis=1)
                
                if len(df_filtered) < len(df):
                    skipped = len(df) - len(df_filtered)
                    print(f"⏭️ Skipping {skipped:,} portfolio records that already exist")
                    print(f"📥 Will import {len(df_filtered):,} new portfolio records")
                    df = df_filtered
                else:
                    print("✅ No overlapping portfolio data found")
        
        cur.close()
        conn.close()
        
        # Skip import if no new data
        if len(df) == 0:
            print("ℹ️ No new portfolio data to import")
            return True
        
        # Step 4: Portfolio-specific validations
        print(f"\nStep 4: Portfolio validation on {len(df):,} records...")
        
        # Check if weights sum to 1.0 (portfolio allocation constraint)
        df['weight_sum'] = df['stock_weight'] + df['bond_weight'] + df['cash_weight']
        weight_errors = df[abs(df['weight_sum'] - 1.0) > 0.001]  # Allow small rounding errors
        
        if len(weight_errors) > 0:
            print(f"⚠️ Found {len(weight_errors)} rows where weights don't sum to 1.0")
            print("Sample weight errors:")
            sample_errors = weight_errors[['portfolio_id', 'date', 'weight_sum']].head()
            for _, row in sample_errors.iterrows():
                print(f"  {row['portfolio_id']} on {row['date'].date()}: sum = {row['weight_sum']:.4f}")
            
            # Fix by normalizing weights
            print("Normalizing weight allocations...")
            for idx in weight_errors.index:
                total = df.loc[idx, 'weight_sum']
                if total > 0:
                    df.loc[idx, 'stock_weight'] /= total
                    df.loc[idx, 'bond_weight'] /= total
                    df.loc[idx, 'cash_weight'] /= total
            print("✅ Weight allocations normalized")
        else:
            print("✅ All portfolio weights sum to 1.0")
        
        df = df.drop('weight_sum', axis=1)
        
        # Check for negative weights (shouldn't happen in practice)
        negative_weights = df[
            (df['stock_weight'] < 0) | 
            (df['bond_weight'] < 0) | 
            (df['cash_weight'] < 0)
        ]
        if len(negative_weights) > 0:
            print(f"⚠️ Found {len(negative_weights)} rows with negative weights")
        else:
            print("✅ No negative weights detected")
        
        # Check for unrealistic returns (>100% or <-50% monthly)
        extreme_returns = df[(df['monthly_return'] > 1.0) | (df['monthly_return'] < -0.5)]
        if len(extreme_returns) > 0:
            print(f"⚠️ Found {len(extreme_returns)} extreme monthly returns (>100% or <-50%)")
            extreme_sample = extreme_returns[['portfolio_id', 'date', 'monthly_return']].head()
            for _, row in extreme_sample.iterrows():
                print(f"  {row['portfolio_id']} on {row['date'].date()}: {row['monthly_return']:.1%}")
        else:
            print("✅ No extreme monthly returns detected")
        
        # Analyze performance by risk level
        print("\nPortfolio Performance Analysis by Risk Level:")
        risk_stats = df.groupby('risk_level').agg({
            'monthly_return': ['mean', 'std', 'min', 'max'],
            'portfolio_id': 'nunique',
            'total_value': 'mean'
        }).round(4)
        
        print("\nRisk Level Performance Summary:")
        print(f"{'Risk Level':<12} {'Count':<6} {'Avg Return':<11} {'Volatility':<10} {'Min Return':<11} {'Max Return':<11}")
        print("-" * 70)
        
        for risk_level in risk_stats.index:
            count = risk_stats.loc[risk_level, ('portfolio_id', 'nunique')]
            avg_ret = risk_stats.loc[risk_level, ('monthly_return', 'mean')]
            volatility = risk_stats.loc[risk_level, ('monthly_return', 'std')]
            min_ret = risk_stats.loc[risk_level, ('monthly_return', 'min')]
            max_ret = risk_stats.loc[risk_level, ('monthly_return', 'max')]
            
            print(f"{risk_level:<12} {count:<6} {avg_ret:<11.4f} {volatility:<10.4f} {min_ret:<11.4f} {max_ret:<11.4f}")
        
        # Risk-return validation
        risk_levels = ['Conservative', 'Moderate', 'Aggressive']
        if all(level in risk_stats.index for level in risk_levels):
            conservative_ret = risk_stats.loc['Conservative', ('monthly_return', 'mean')]
            moderate_ret = risk_stats.loc['Moderate', ('monthly_return', 'mean')]
            aggressive_ret = risk_stats.loc['Aggressive', ('monthly_return', 'mean')]
            
            if conservative_ret < moderate_ret < aggressive_ret:
                print("✅ Risk-return relationship is correct (Conservative < Moderate < Aggressive)")
            else:
                print("⚠️ Risk-return relationship may be inverted")
        
        # Step 5: Import to database
        print(f"\nStep 5: Importing {len(df):,} records to PostgreSQL...")
        
        engine = create_engine(config.get_connection_string())
        
        try:
            df.to_sql(
                'portfolio_holdings',
                engine,
                if_exists='append',
                index=False,
                method='multi',
                chunksize=5000
            )
            
            print(f"✅ Successfully imported {len(df):,} portfolio records")
            
        except Exception as import_error:
            print(f"❌ Import error: {import_error}")
            return False
        
        # Step 6: Create portfolio analysis views
        print("\nStep 6: Creating portfolio analysis views...")
        
        conn = psycopg2.connect(**config.get_connection_params())
        cur = conn.cursor()
        
        # Portfolio performance summary view
        cur.execute("""
            CREATE OR REPLACE VIEW portfolio_performance_summary AS
            WITH returns_calc AS (
                SELECT 
                    portfolio_id,
                    risk_level,
                    AVG(monthly_return) * 12 as annual_return,
                    STDDEV(monthly_return) * SQRT(12) as annual_volatility,
                    MIN(total_value) as min_value,
                    MAX(total_value) as max_value,
                    (MAX(total_value) / NULLIF(MIN(total_value), 0) - 1) as total_growth,
                    COUNT(*) as months_of_data
                FROM portfolio_holdings
                GROUP BY portfolio_id, risk_level
            )
            SELECT 
                risk_level,
                COUNT(*) as portfolio_count,
                ROUND(AVG(annual_return), 4) as avg_annual_return,
                ROUND(AVG(annual_volatility), 4) as avg_annual_volatility,
                ROUND(AVG(annual_return) / NULLIF(AVG(annual_volatility), 0), 4) as avg_sharpe_ratio,
                ROUND(AVG(total_growth), 4) as avg_total_growth,
                ROUND(AVG(months_of_data), 1) as avg_months_data
            FROM returns_calc
            WHERE annual_volatility > 0
            GROUP BY risk_level
            ORDER BY 
                CASE risk_level 
                    WHEN 'Conservative' THEN 1 
                    WHEN 'Moderate' THEN 2 
                    WHEN 'Aggressive' THEN 3 
                END;
        """)
        
        # Portfolio allocation drift view
        cur.execute("""
            CREATE OR REPLACE VIEW portfolio_allocation_drift AS
            SELECT 
                portfolio_id,
                risk_level,
                ROUND(AVG(stock_weight), 3) as avg_stock_weight,
                ROUND(STDDEV(stock_weight), 3) as stock_weight_volatility,
                ROUND(AVG(bond_weight), 3) as avg_bond_weight,
                ROUND(STDDEV(bond_weight), 3) as bond_weight_volatility,
                ROUND(AVG(cash_weight), 3) as avg_cash_weight,
                ROUND(STDDEV(cash_weight), 3) as cash_weight_volatility
            FROM portfolio_holdings
            GROUP BY portfolio_id, risk_level;
        """)
        
        print("✅ Created portfolio performance summary view")
        print("✅ Created portfolio allocation drift view")
        
        # Show the performance summary
        cur.execute("SELECT * FROM portfolio_performance_summary")
        
        print("\nPortfolio Performance Summary:")
        print(f"{'Risk Level':<12} {'Count':<6} {'Ann.Return':<10} {'Ann.Vol':<8} {'Sharpe':<7} {'Growth':<8} {'Months':<7}")
        print("-" * 65)
        
        for row in cur.fetchall():
            risk_level, count, ann_ret, ann_vol, sharpe, growth, months = row
            print(f"{risk_level:<12} {count:<6} {ann_ret:<10.4f} {ann_vol:<8.4f} {sharpe or 0:<7.2f} {growth or 0:<8.2f} {months:<7.1f}")
        
        # Verification queries
        cur.execute("""
            SELECT 
                COUNT(*) as total_records,
                COUNT(DISTINCT portfolio_id) as unique_portfolios,
                COUNT(DISTINCT date) as months_of_data,
                MIN(date) as first_date,
                MAX(date) as last_date
            FROM portfolio_holdings
        """)
        
        total_records, unique_portfolios, months_data, first_date, last_date = cur.fetchone()
        
        print(f"\nPortfolio Data Summary:")
        print(f"  Total records: {total_records:,}")
        print(f"  Unique portfolios: {unique_portfolios}")
        print(f"  Months of data: {months_data}")
        print(f"  Date range: {first_date} to {last_date}")
        
        cur.close()
        conn.close()
        
        return True
        
    except Exception as e:
        print(f"\n❌ Portfolio import failed: {e}")
        print("\nTroubleshooting steps:")
        print("1. Check if portfolio_holdings table exists with correct schema")
        print("2. Verify date format compatibility")
        print("3. Check portfolio_id format and case sensitivity")
        print("4. Try with replace_existing=True to clear existing data")
        print("5. Verify numeric fields can handle portfolio values and returns")
        return False

# Usage examples:

# Option 1: Skip existing records (default behavior)
print("🔄 Importing portfolio data (skip existing)...")
if import_portfolio_data(db_config):
    print("\n🎉 Portfolio data import complete!")
else:
    print("\n⚠️ Please check portfolio import errors")

# Option 2: Replace all existing data
print("\n🔄 Alternative: Replace existing portfolio data...")
print("Uncomment the line below if you want to replace all existing portfolio data:")
print("# import_portfolio_data(db_config, replace_existing=True)")

# Portfolio-specific verification function
def verify_portfolio_data(config):
    """Verify portfolio data with Modern Portfolio Theory validation and proper PostgreSQL type casting"""
    try:
        conn = psycopg2.connect(**config.get_connection_params())
        cur = conn.cursor()
        
        print("\n🔍 Portfolio Data Verification:")
        print("-" * 40)
        
        # Basic portfolio stats
        cur.execute("""
            SELECT 
                COUNT(*) as total_records,
                COUNT(DISTINCT portfolio_id) as unique_portfolios,
                COUNT(DISTINCT date) as months_of_data,
                MIN(date) as first_date,
                MAX(date) as last_date
            FROM portfolio_holdings
        """)
        
        result = cur.fetchone()
        print(f"Total Records: {result[0]:,}")
        print(f"Unique Portfolios: {result[1]}")
        print(f"Months of Data: {result[2]}")
        print(f"Date Range: {result[3]} to {result[4]}")
        
        # Risk-return verification with proper type casting
        cur.execute("""
            SELECT 
                risk_level,
                COUNT(*) as records,
                ROUND(CAST(AVG(monthly_return * 12) AS numeric), 4) as annualized_return,
                ROUND(CAST(STDDEV(monthly_return) * SQRT(12) AS numeric), 4) as annualized_volatility
            FROM portfolio_holdings
            GROUP BY risk_level
            ORDER BY 
                CASE risk_level 
                    WHEN 'Conservative' THEN 1 
                    WHEN 'Moderate' THEN 2 
                    WHEN 'Aggressive' THEN 3 
                END
        """)
        
        print(f"\nRisk-Return Verification:")
        risk_data = cur.fetchall()
        for row in risk_data:
            risk_level, records, ann_return, ann_vol = row
            if ann_return is not None and ann_vol is not None:
                print(f"  {risk_level}: {float(ann_return):.2%} return, {float(ann_vol):.2%} volatility ({records:,} records)")
            else:
                print(f"  {risk_level}: Insufficient data for calculation ({records:,} records)")
        
        # Portfolio allocation verification
        print(f"\nPortfolio Allocation Verification:")
        cur.execute("""
            SELECT 
                risk_level,
                ROUND(CAST(AVG(stock_weight) AS numeric), 3) as avg_stock_weight,
                ROUND(CAST(AVG(bond_weight) AS numeric), 3) as avg_bond_weight,
                ROUND(CAST(AVG(cash_weight) AS numeric), 3) as avg_cash_weight
            FROM portfolio_holdings
            GROUP BY risk_level
            ORDER BY 
                CASE risk_level 
                    WHEN 'Conservative' THEN 1 
                    WHEN 'Moderate' THEN 2 
                    WHEN 'Aggressive' THEN 3 
                END
        """)
        
        allocation_data = cur.fetchall()
        print(f"{'Risk Level':<12} {'Stock %':<8} {'Bond %':<8} {'Cash %':<8}")
        print("-" * 40)
        
        for row in allocation_data:
            risk_level, stock_w, bond_w, cash_w = row
            print(f"{risk_level:<12} {float(stock_w)*100:<8.1f} {float(bond_w)*100:<8.1f} {float(cash_w)*100:<8.1f}")
        
        # Weight sum validation
        cur.execute("""
            SELECT 
                COUNT(*) as total_records,
                COUNT(*) - COUNT(CASE WHEN ABS((stock_weight + bond_weight + cash_weight) - 1.0) <= 0.001 THEN 1 END) as weight_errors
            FROM portfolio_holdings
        """)
        
        total_records, weight_errors = cur.fetchone()
        print(f"\nWeight Allocation Validation:")
        if weight_errors == 0:
            print(f"✅ All {total_records:,} records have weights summing to 1.0")
        else:
            print(f"⚠️ {weight_errors:,} out of {total_records:,} records have weight sum errors")
        
        # Performance consistency check
        print(f"\nPerformance Consistency Check:")
        
        # Check if Conservative < Moderate < Aggressive returns (expected relationship)
        conservative_return = None
        moderate_return = None
        aggressive_return = None
        
        for row in risk_data:
            risk_level, _, ann_return, _ = row
            if ann_return is not None:
                if risk_level == 'Conservative':
                    conservative_return = float(ann_return)
                elif risk_level == 'Moderate':
                    moderate_return = float(ann_return)
                elif risk_level == 'Aggressive':
                    aggressive_return = float(ann_return)
        
        if all(x is not None for x in [conservative_return, moderate_return, aggressive_return]):
            if conservative_return < moderate_return < aggressive_return:
                print("✅ Risk-return relationship follows expected pattern (Conservative < Moderate < Aggressive)")
            else:
                print("⚠️ Risk-return relationship doesn't follow expected pattern")
                print(f"   Conservative: {conservative_return:.2%}")
                print(f"   Moderate: {moderate_return:.2%}")
                print(f"   Aggressive: {aggressive_return:.2%}")
        else:
            print("ℹ️ Insufficient data to verify risk-return relationship")
        
        # Data completeness check
        cur.execute("""
            SELECT 
                portfolio_id,
                COUNT(*) as months_of_data,
                MIN(date) as first_date,
                MAX(date) as last_date
            FROM portfolio_holdings
            GROUP BY portfolio_id
            HAVING COUNT(*) < 50  -- Flag portfolios with less than 50 months of data
            ORDER BY COUNT(*)
            LIMIT 5
        """)
        
        incomplete_portfolios = cur.fetchall()
        if incomplete_portfolios:
            print(f"\nData Completeness Warning:")
            print(f"Portfolios with incomplete data (< 50 months):")
            for portfolio_id, months, first_date, last_date in incomplete_portfolios:
                print(f"  {portfolio_id}: {months} months ({first_date} to {last_date})")
        else:
            print(f"\n✅ All portfolios have sufficient historical data")
        
        cur.close()
        conn.close()
        return True
        
    except Exception as e:
        print(f"Portfolio verification failed: {e}")
        print("Note: This may be due to PostgreSQL type casting requirements")
        return False

# Also create a simplified verification function that avoids complex calculations
def verify_portfolio_data_simple(config):
    """Simplified portfolio verification without complex PostgreSQL functions"""
    try:
        conn = psycopg2.connect(**config.get_connection_params())
        cur = conn.cursor()
        
        print("\n🔍 Simple Portfolio Data Verification:")
        print("-" * 45)
        
        # Basic counts and structure
        cur.execute("""
            SELECT 
                risk_level,
                COUNT(*) as records,
                COUNT(DISTINCT portfolio_id) as portfolios,
                AVG(monthly_return) as avg_monthly_return,
                AVG(total_value) as avg_portfolio_value
            FROM portfolio_holdings
            GROUP BY risk_level
            ORDER BY 
                CASE risk_level 
                    WHEN 'Conservative' THEN 1 
                    WHEN 'Moderate' THEN 2 
                    WHEN 'Aggressive' THEN 3 
                END
        """)
        
        print(f"{'Risk Level':<12} {'Records':<8} {'Portfolios':<10} {'Avg Monthly Return':<16} {'Avg Value':<12}")
        print("-" * 70)
        
        for row in cur.fetchall():
            risk_level, records, portfolios, avg_return, avg_value = row
            avg_return_pct = (avg_return or 0) * 100
            avg_value_formatted = f"${(avg_value or 0):,.0f}"
            print(f"{risk_level:<12} {records:<8} {portfolios:<10} {avg_return_pct:<16.3f}% {avg_value_formatted:<12}")
        
        # Simple allocation check
        cur.execute("""
            SELECT 
                risk_level,
                AVG(stock_weight) as avg_stock,
                AVG(bond_weight) as avg_bond,
                AVG(cash_weight) as avg_cash
            FROM portfolio_holdings
            GROUP BY risk_level
        """)
        
        print(f"\nAverage Asset Allocation by Risk Level:")
        print(f"{'Risk Level':<12} {'Stock %':<8} {'Bond %':<8} {'Cash %':<8}")
        print("-" * 40)
        
        for row in cur.fetchall():
            risk_level, stock_w, bond_w, cash_w = row
            print(f"{risk_level:<12} {(stock_w or 0)*100:<8.1f} {(bond_w or 0)*100:<8.1f} {(cash_w or 0)*100:<8.1f}")
        
        cur.close()
        conn.close()
        return True
        
    except Exception as e:
        print(f"Simple portfolio verification failed: {e}")
        return False

# Try the fixed verification first, fall back to simple if needed
print("🔄 Running portfolio data verification...")
if not verify_portfolio_data(db_config):
    print("\n🔄 Falling back to simplified verification...")
    verify_portfolio_data_simple(db_config)

🔄 Importing portfolio data (skip existing)...
💼 Importing Portfolio Holdings Data
Step 1: Loading portfolio data...
✅ Loaded 6,000 rows

Step 2: Preprocessing...

Step 3: Checking for existing portfolio data...
📊 Found 6,000 existing portfolio records
🔍 Checking for overlapping portfolio data...
⏭️ Skipping 6,000 portfolio records that already exist
📥 Will import 0 new portfolio records
ℹ️ No new portfolio data to import

🎉 Portfolio data import complete!

🔄 Alternative: Replace existing portfolio data...
Uncomment the line below if you want to replace all existing portfolio data:
# import_portfolio_data(db_config, replace_existing=True)
🔄 Running portfolio data verification...

🔍 Portfolio Data Verification:
----------------------------------------
Total Records: 6,000
Unique Portfolios: 100
Months of Data: 60
Date Range: 2020-01-31 to 2024-12-31

Risk-Return Verification:
  Conservative: 5.57% return, 9.13% volatility (2,100 records)
  Moderate: 7.26% return, 13.46% volatility (2,640

Importing Customer Demographics Data
============================================

Customer data is sensitive and requires:
1. Privacy considerations (even for mock data)
2. Data type validation (age, credit scores)
3. Business rule validation
4. Risk segmentation accuracy

This data supports:
- Credit scoring models
- Customer segmentation
- Risk assessment
- Marketing analytics

In [12]:
def import_customer_data_fixed(config, data_dir='mock_financial_data', replace_existing=False):
    """
    Import customer demographics data with correct schema mapping and duplicate handling.
    
    Args:
        config: Database configuration object
        data_dir: Directory containing CSV files
        replace_existing: If True, clear existing data before import
    """
    csv_file = 'customer_data.csv'
    filepath = os.path.join(data_dir, csv_file)
    
    print(f"👥 Importing Customer Demographics Data (Schema Fixed)")
    print("=" * 60)
    
    try:
        # Step 1: Load customer data
        print("Step 1: Loading customer data...")
        df = pd.read_csv(filepath)
        print(f"✅ Loaded {len(df):,} customer records")
        
        # Step 2: Preprocessing with correct column mapping
        print("\nStep 2: Preprocessing with schema mapping...")
        
        # Ensure column names match database schema
        df.columns = [col.lower() for col in df.columns]
        
        # Map CSV column names to database column names
        column_mapping = {
            'customerid': 'customer_id',
            'creditscore': 'credit_score',
            'accountagedays': 'account_age_days',
            'accountbalance': 'account_balance',
            'monthlytransactions': 'monthly_transactions',
            'avgtransactionamount': 'avg_transaction_amount',
            'numproducts': 'num_products',
            'hasloan': 'has_loan',
            'risksegment': 'risk_segment'
            # age and income stay the same
        }
        
        df = df.rename(columns=column_mapping)
        print("✅ Column names mapped to database schema")
        
        # Check for CSV duplicates on customer_id
        csv_duplicates = df.duplicated(subset=['customer_id'])
        if csv_duplicates.any():
            print(f"⚠️ Found {csv_duplicates.sum()} duplicate customer IDs in CSV")
            df = df[~csv_duplicates]
            print(f"✅ Removed CSV duplicates, {len(df):,} customers remaining")
        
        # Step 3: Handle existing data
        print(f"\nStep 3: Checking for existing customer data...")
        
        conn = psycopg2.connect(**config.get_connection_params())
        cur = conn.cursor()
        
        # Check if table exists and has data
        try:
            cur.execute("SELECT COUNT(*) FROM customers")
            existing_count = cur.fetchone()[0]
        except psycopg2.Error:
            existing_count = 0
            print("ℹ️ customers table doesn't exist yet")
        
        if existing_count > 0:
            print(f"📊 Found {existing_count:,} existing customer records")
            
            if replace_existing:
                print("🗑️ Clearing existing customer data (replace_existing=True)...")
                cur.execute("DELETE FROM customers")
                conn.commit()
                print("✅ Existing customer data cleared")
            else:
                print("🔍 Checking for overlapping customer data...")
                
                # Get existing customer IDs (using correct column name)
                cur.execute("SELECT DISTINCT customer_id FROM customers")
                existing_customer_ids = set(row[0] for row in cur.fetchall())
                
                # Filter out customers that already exist
                df_filtered = df[~df['customer_id'].isin(existing_customer_ids)]
                
                if len(df_filtered) < len(df):
                    skipped = len(df) - len(df_filtered)
                    print(f"⏭️ Skipping {skipped:,} customers that already exist")
                    print(f"📥 Will import {len(df_filtered):,} new customers")
                    df = df_filtered
                else:
                    print("✅ No overlapping customer data found")
        
        cur.close()
        conn.close()
        
        # Skip import if no new data
        if len(df) == 0:
            print("ℹ️ No new customer data to import")
            return True
        
        # Step 4: Customer data validation (using correct column names)
        print(f"\nStep 4: Customer data validation on {len(df):,} customers...")
        
        # Age validation
        age_issues = df[(df['age'] < 18) | (df['age'] > 100)]
        if len(age_issues) > 0:
            print(f"⚠️ Found {len(age_issues)} customers with unusual ages")
        else:
            print("✅ All customer ages are within reasonable range (18-100)")
        
        # Income validation
        income_issues = df[(df['income'] < 0) | (df['income'] > 1000000)]
        if len(income_issues) > 0:
            print(f"⚠️ Found {len(income_issues)} customers with unusual income levels")
        else:
            print("✅ All customer incomes are within expected range")
        
        # Credit score validation (using correct column name)
        credit_issues = df[(df['credit_score'] < 300) | (df['credit_score'] > 850)]
        if len(credit_issues) > 0:
            print(f"⚠️ Found {len(credit_issues)} customers with invalid credit scores")
        else:
            print("✅ All credit scores are within FICO range (300-850)")
        
        # Customer segmentation analysis (using correct column names)
        print("\nCustomer Segmentation Analysis:")
        segmentation = df.groupby('risk_segment').agg({
            'customer_id': 'count',
            'age': 'mean',
            'income': 'mean', 
            'credit_score': 'mean',
            'account_balance': 'mean',
            'has_loan': 'mean'
        }).round(2)
        
        segmentation.columns = ['count', 'avg_age', 'avg_income', 'avg_credit', 'avg_balance', 'loan_rate']
        print(segmentation)
        
        # Behavioral analysis (using correct column names)
        print("\nCustomer Behavioral Analysis:")
        
        transaction_analysis = df.groupby('risk_segment').agg({
            'monthly_transactions': ['mean', 'std'],
            'avg_transaction_amount': ['mean', 'std'],
            'num_products': 'mean'
        }).round(2)
        
        print("Transaction Patterns by Risk Segment:")
        for risk_segment in ['Low', 'Medium', 'High']:
            if risk_segment in transaction_analysis.index:
                avg_txns = transaction_analysis.loc[risk_segment, ('monthly_transactions', 'mean')]
                avg_amount = transaction_analysis.loc[risk_segment, ('avg_transaction_amount', 'mean')]
                avg_products = transaction_analysis.loc[risk_segment, ('num_products', 'mean')]
                print(f"  {risk_segment} Risk: {avg_txns:.1f} monthly txns, ${avg_amount:.2f} avg amount, {avg_products:.1f} products")
        
        # Data quality checks
        print("\nData Quality Checks:")
        
        # Check for duplicate customer IDs (should be none after preprocessing)
        if df['customer_id'].duplicated().any():
            print("⚠️ Duplicate customer IDs detected")
        else:
            print("✅ No duplicate customer IDs")
        
        # Income-credit score correlation (should be positive)
        income_credit_corr = df['income'].corr(df['credit_score'])
        print(f"Income-Credit Score correlation: {income_credit_corr:.3f}")
        if income_credit_corr > 0.05:
            print("✅ Positive correlation between income and credit score (expected)")
        else:
            print("⚠️ Weak correlation between income and credit score")
        
        # Step 5: Import to database
        print(f"\nStep 5: Importing {len(df):,} records to PostgreSQL...")
        
        engine = create_engine(config.get_connection_string())
        
        try:
            df.to_sql(
                'customers',
                engine,
                if_exists='append',
                index=False,
                method='multi',
                chunksize=5000
            )
            
            print(f"✅ Successfully imported {len(df):,} customer records")
            
        except Exception as import_error:
            print(f"❌ Import error: {import_error}")
            return False
        
        # Step 6: Create customer analysis views (using correct column names)
        print("\nStep 6: Creating customer analysis views...")
        
        conn = psycopg2.connect(**config.get_connection_params())
        cur = conn.cursor()
        
        # Customer segmentation summary view
        cur.execute("""
            CREATE OR REPLACE VIEW customer_segmentation_summary AS
            SELECT 
                risk_segment,
                COUNT(*) as customer_count,
                ROUND(AVG(age), 1) as avg_age,
                ROUND(AVG(income), 0) as avg_income,
                ROUND(AVG(credit_score), 0) as avg_credit_score,
                ROUND(AVG(account_balance), 0) as avg_account_balance,
                ROUND(AVG(monthly_transactions), 1) as avg_monthly_transactions,
                ROUND(AVG(avg_transaction_amount), 2) as avg_transaction_amount,
                ROUND(AVG(num_products), 1) as avg_num_products,
                ROUND(AVG(CASE WHEN has_loan THEN 1 ELSE 0 END), 3) as loan_penetration_rate
            FROM customers
            GROUP BY risk_segment
            ORDER BY 
                CASE risk_segment 
                    WHEN 'Low' THEN 1 
                    WHEN 'Medium' THEN 2 
                    WHEN 'High' THEN 3 
                END;
        """)
        
        # Customer lifetime value estimation view
        cur.execute("""
            CREATE OR REPLACE VIEW customer_value_analysis AS
            SELECT 
                customer_id,
                risk_segment,
                age,
                income,
                credit_score,
                account_balance,
                monthly_transactions * avg_transaction_amount as estimated_monthly_volume,
                CASE 
                    WHEN risk_segment = 'Low' AND account_balance > 50000 THEN 'High Value'
                    WHEN risk_segment = 'Low' OR account_balance > 25000 THEN 'Medium Value'
                    ELSE 'Standard Value'
                END as value_segment
            FROM customers;
        """)
        
        print("✅ Created customer segmentation summary view")
        print("✅ Created customer value analysis view")
        
        # Show segmentation summary
        cur.execute("SELECT * FROM customer_segmentation_summary")
        
        print("\nCustomer Segmentation Summary:")
        print(f"{'Risk':<6} {'Count':<6} {'Age':<5} {'Income':<8} {'Credit':<6} {'Balance':<8} {'Txns':<5} {'TxnAmt':<7} {'Prods':<5} {'Loans':<6}")
        print("-" * 70)
        
        for row in cur.fetchall():
            risk, count, age, income, credit, balance, txns, txn_amt, prods, loans = row
            print(f"{risk:<6} {count:<6} {age:<5.1f} ${income:<7,.0f} {credit:<6.0f} ${balance:<7,.0f} {txns:<5.1f} ${txn_amt:<6.2f} {prods:<5.1f} {loans:<6.1%}")
        
        # Customer value distribution
        cur.execute("""
            SELECT 
                value_segment,
                COUNT(*) as customers,
                AVG(account_balance) as avg_balance
            FROM customer_value_analysis
            GROUP BY value_segment
            ORDER BY AVG(account_balance) DESC
        """)
        
        print(f"\nCustomer Value Distribution:")
        for row in cur.fetchall():
            value_segment, customers, avg_balance = row
            print(f"  {value_segment}: {customers:,} customers (avg balance: ${avg_balance:,.0f})")
        
        # Final verification
        cur.execute("""
            SELECT 
                COUNT(*) as total_customers,
                COUNT(DISTINCT customer_id) as unique_customer_ids,
                MIN(age) as min_age,
                MAX(age) as max_age,
                MIN(credit_score) as min_credit,
                MAX(credit_score) as max_credit
            FROM customers
        """)
        
        total, unique_ids, min_age, max_age, min_credit, max_credit = cur.fetchone()
        
        print(f"\nCustomer Data Summary:")
        print(f"  Total customers: {total:,}")
        print(f"  Unique customer IDs: {unique_ids:,}")
        print(f"  Age range: {min_age} to {max_age} years")
        print(f"  Credit score range: {min_credit} to {max_credit}")
        
        if total == unique_ids:
            print("✅ Customer ID uniqueness verified")
        else:
            print("⚠️ Customer ID uniqueness issue detected")
        
        cur.close()
        conn.close()
        
        return True
        
    except Exception as e:
        print(f"\n❌ Customer import failed: {e}")
        print("\nTroubleshooting steps:")
        print("1. Check if customers table exists with correct schema")
        print("2. Verify customer_id format matches database expectations")
        print("3. Check for data type mismatches")
        print("4. Try with replace_existing=True to clear existing data")
        print("5. Verify boolean fields (has_loan) are properly formatted")
        return False

def verify_customer_data_fixed(config):
    """Verify customer data with correct column names"""
    try:
        conn = psycopg2.connect(**config.get_connection_params())
        cur = conn.cursor()
        
        print("\n🔍 Customer Data Verification (Schema Fixed):")
        print("-" * 50)
        
        # Basic customer stats (using correct column names)
        cur.execute("""
            SELECT 
                COUNT(*) as total_customers,
                AVG(age) as avg_age,
                AVG(income) as avg_income,
                AVG(credit_score) as avg_credit_score,
                AVG(account_balance) as avg_balance
            FROM customers
        """)
        
        result = cur.fetchone()
        print(f"Total Customers: {result[0]:,}")
        print(f"Average Age: {result[1]:.1f} years")
        print(f"Average Income: ${result[2]:,.0f}")
        print(f"Average Credit Score: {result[3]:.0f}")
        print(f"Average Account Balance: ${result[4]:,.0f}")
        
        # Risk distribution (using correct column name)
        cur.execute("""
            SELECT 
                risk_segment,
                COUNT(*) as customers,
                ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) as percentage
            FROM customers
            GROUP BY risk_segment
            ORDER BY COUNT(*) DESC
        """)
        
        print(f"\nRisk Segment Distribution:")
        for row in cur.fetchall():
            risk_segment, count, percentage = row
            print(f"  {risk_segment}: {count:,} customers ({percentage}%)")
        
        cur.close()
        conn.close()
        return True
        
    except Exception as e:
        print(f"Customer verification failed: {e}")
        return False

# Run the fixed customer import
print("🔄 Running FIXED customer import (skip existing)...")
if import_customer_data_fixed(db_config):
    print("\n🎉 Customer data import complete!")
    # Run verification
    verify_customer_data_fixed(db_config)
else:
    print("\n⚠️ Please check customer import errors")

# Alternative option
print("\n🔄 Alternative: Replace existing customer data...")
print("Uncomment the line below if you want to replace all existing customer data:")
print("# import_customer_data_fixed(db_config, replace_existing=True)")

🔄 Running FIXED customer import (skip existing)...
👥 Importing Customer Demographics Data (Schema Fixed)
Step 1: Loading customer data...
✅ Loaded 10,000 customer records

Step 2: Preprocessing with schema mapping...
✅ Column names mapped to database schema

Step 3: Checking for existing customer data...
📊 Found 10,000 existing customer records
🔍 Checking for overlapping customer data...
⏭️ Skipping 10,000 customers that already exist
📥 Will import 0 new customers
ℹ️ No new customer data to import

🎉 Customer data import complete!

🔍 Customer Data Verification (Schema Fixed):
--------------------------------------------------
Total Customers: 10,000
Average Age: 39.8 years
Average Income: $41,638
Average Credit Score: 690
Average Account Balance: $7,754

Risk Segment Distribution:
  Medium: 5,391 customers (53.9%)
  Low: 2,855 customers (28.6%)
  High: 1,754 customers (17.5%)

🔄 Alternative: Replace existing customer data...
Uncomment the line below if you want to replace all existing 

Comprehensive Data Validation
=====================================

Now that all data is imported, let's run comprehensive
validation queries to ensure data integrity across tables.

This includes:
1. Referential integrity checks
2. Data completeness analysis
3. Anomaly detection
4. Cross-table consistency

In [13]:
def run_data_validation(config):
    """
    Run comprehensive validation queries across all tables.
    Generate a data quality report.
    """
    print(f"🔍 Running Comprehensive Data Validation")
    print("=" * 60)
    
    try:
        conn = psycopg2.connect(**config.get_connection_params())
        cur = conn.cursor()
        
        validation_results = []
        
        # 1. Table Row Counts
        print("\n1. Table Row Counts:")
        print("-" * 40)
        
        tables = ['stock_prices', 'crypto_prices', 'economic_indicators', 
                  'portfolio_holdings', 'customers']
        
        for table in tables:
            cur.execute(f"SELECT COUNT(*) FROM {table}")
            count = cur.fetchone()[0]
            print(f"  {table:20}: {count:>10,} rows")
            validation_results.append({
                'check': f'{table} row count',
                'result': count,
                'status': 'PASS' if count > 0 else 'FAIL'
            })
        
        # 2. Date Range Consistency
        print("\n2. Date Range Analysis:")
        print("-" * 40)
        
        # Stock prices date range
        cur.execute("""
            SELECT MIN(date) as min_date, MAX(date) as max_date,
                   COUNT(DISTINCT date) as trading_days
            FROM stock_prices
        """)
        stock_dates = cur.fetchone()
        print(f"  Stock prices: {stock_dates[0]} to {stock_dates[1]} ({stock_dates[2]} days)")
        
        # Economic indicators date range
        cur.execute("""
            SELECT MIN(date) as min_date, MAX(date) as max_date,
                   COUNT(DISTINCT date) as months
            FROM economic_indicators
        """)
        econ_dates = cur.fetchone()
        print(f"  Economic data: {econ_dates[0]} to {econ_dates[1]} ({econ_dates[2]} months)")
        
        # 3. Missing Data Analysis
        print("\n3. Missing Data Check:")
        print("-" * 40)
        
        # Check for missing trading days (excluding weekends)
        cur.execute("""
            WITH expected_days AS (
                SELECT generate_series(
                    (SELECT MIN(date) FROM stock_prices),
                    (SELECT MAX(date) FROM stock_prices),
                    '1 day'::interval
                )::date AS trading_date
            ),
            actual_days AS (
                SELECT DISTINCT date FROM stock_prices
            )
            SELECT COUNT(*) as missing_days
            FROM expected_days e
            LEFT JOIN actual_days a ON e.trading_date = a.date
            WHERE a.date IS NULL
              AND EXTRACT(DOW FROM e.trading_date) NOT IN (0, 6)
        """)
        
        missing_days = cur.fetchone()[0]
        print(f"  Missing trading days: {missing_days}")
        validation_results.append({
            'check': 'Missing trading days',
            'result': missing_days,
            'status': 'PASS' if missing_days < 10 else 'WARNING'
        })
        
        # 4. Data Integrity Checks
        print("\n4. Data Integrity Checks:")
        print("-" * 40)
        
        # Check OHLC consistency
        cur.execute("""
            SELECT COUNT(*) as invalid_ohlc
            FROM stock_prices
            WHERE high < low 
               OR high < open 
               OR high < close
               OR low > open 
               OR low > close
        """)
        invalid_ohlc = cur.fetchone()[0]
        print(f"  Invalid OHLC relationships: {invalid_ohlc}")
        validation_results.append({
            'check': 'OHLC consistency',
            'result': invalid_ohlc,
            'status': 'PASS' if invalid_ohlc == 0 else 'FAIL'
        })
        
        # Check portfolio weight sums
        cur.execute("""
            SELECT COUNT(*) as invalid_weights
            FROM portfolio_holdings
            WHERE ABS((stock_weight + bond_weight + cash_weight) - 1.0) > 0.01
        """)
        invalid_weights = cur.fetchone()[0]
        print(f"  Invalid portfolio weights: {invalid_weights}")
        validation_results.append({
            'check': 'Portfolio weight sums',
            'result': invalid_weights,
            'status': 'PASS' if invalid_weights == 0 else 'FAIL'
        })
        
        # 5. Statistical Anomaly Detection
        print("\n5. Statistical Anomaly Detection:")
        print("-" * 40)
        
        # Check for extreme returns
        cur.execute("""
            WITH daily_returns AS (
                SELECT 
                    symbol,
                    date,
                    (close / LAG(close) OVER (PARTITION BY symbol ORDER BY date) - 1) as return
                FROM stock_prices
            )
            SELECT 
                COUNT(*) as extreme_returns,
                MAX(ABS(return)) as max_return
            FROM daily_returns
            WHERE ABS(return) > 0.20  -- 20% daily move
        """)
        result = cur.fetchone()
        extreme_returns, max_return = result[0], result[1] if result[1] is not None else 0
        print(f"  Extreme daily returns (>20%): {extreme_returns}")
        print(f"  Maximum daily return: {max_return*100:.1f}%")
        
        # 6. Cross-Table Consistency
        print("\n6. Cross-Table Consistency:")
        print("-" * 40)
        
        # Check date overlap between tables
        cur.execute("""
            WITH stock_dates AS (
                SELECT MIN(date) as min_date, MAX(date) as max_date FROM stock_prices
            ),
            econ_dates AS (
                SELECT MIN(date) as min_date, MAX(date) as max_date FROM economic_indicators
            )
            SELECT 
                CASE 
                    WHEN s.min_date <= e.min_date AND s.max_date >= e.max_date THEN 'Full overlap'
                    WHEN s.max_date < e.min_date OR s.min_date > e.max_date THEN 'No overlap'
                    ELSE 'Partial overlap'
                END as date_overlap
            FROM stock_dates s, econ_dates e
        """)
        date_overlap = cur.fetchone()[0]
        print(f"  Stock/Economic date overlap: {date_overlap}")
        
        # 7. Generate Quality Score
        print("\n7. Data Quality Score:")
        print("-" * 40)
        
        total_checks = len(validation_results)
        passed_checks = sum(1 for r in validation_results if r['status'] == 'PASS')
        quality_score = (passed_checks / total_checks) * 100 if total_checks > 0 else 0
        
        print(f"  Total checks: {total_checks}")
        print(f"  Passed: {passed_checks}")
        print(f"  Failed: {total_checks - passed_checks}")
        print(f"  Quality Score: {quality_score:.1f}%")
        
        # Generate detailed report
        report_content = f"""
# Data Quality Report
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

## Summary
- Quality Score: {quality_score:.1f}%
- Total Checks: {total_checks}
- Passed: {passed_checks}
- Failed: {total_checks - passed_checks}

## Detailed Results
"""
        
        for result in validation_results:
            status_emoji = "PASS" if result['status'] == 'PASS' else "FAIL"
            report_content += f"- {result['check']}: {result['result']} [{status_emoji}]\n"
        
        # Save report with UTF-8 encoding
        with open('data_quality_report.md', 'w', encoding='utf-8') as f:
            f.write(report_content)
        
        print(f"\n📄 Detailed report saved to: data_quality_report.md")
        
        cur.close()
        conn.close()
        
        return validation_results
        
    except Exception as e:
        print(f"❌ Validation error: {e}")
        return []

# Run validation
validation_results = run_data_validation(db_config)

🔍 Running Comprehensive Data Validation

1. Table Row Counts:
----------------------------------------
  stock_prices        :     26,100 rows
  crypto_prices       :     73,050 rows
  economic_indicators :        600 rows
  portfolio_holdings  :      6,000 rows
  customers           :     10,000 rows

2. Date Range Analysis:
----------------------------------------
  Stock prices: 2020-01-01 to 2024-12-31 (1305 days)
  Economic data: 2020-01-31 to 2024-12-31 (60 months)

3. Missing Data Check:
----------------------------------------
  Missing trading days: 0

4. Data Integrity Checks:
----------------------------------------
  Invalid OHLC relationships: 0
  Invalid portfolio weights: 0

5. Statistical Anomaly Detection:
----------------------------------------
  Extreme daily returns (>20%): 0
  Maximum daily return: 0.0%

6. Cross-Table Consistency:
----------------------------------------
  Stock/Economic date overlap: Full overlap

7. Data Quality Score:
-------------------------

Creating Analysis Views and Performance Indexes
=======================================================

Views are like "saved queries" that simplify complex analysis.
Indexes make queries faster by creating lookup tables.

We'll create views for:
1. Daily returns calculation
2. Moving averages
3. Volatility metrics
4. Correlation matrices
5. Portfolio performance

In [14]:
"""
Analysis Infrastructure Setup
==================================
"""

def create_fixed_analysis_infrastructure(config):
    """
    Create analysis infrastructure with all PostgreSQL compatibility fixes.
    """
    print("🏗️ Creating Fixed Analysis Infrastructure")
    print("=" * 60)
    
    # Initialize results tracking
    results = {
        'views': {'created': [], 'failed': []},
        'indexes': {'created': [], 'failed': []},
        'correlations': {'status': 'pending', 'count': 0}
    }
    
    # PART 0: COMPREHENSIVE CLEANUP
    print("\n🧹 PART 0: Comprehensive Cleanup")
    print("-" * 40)
    
    try:
        conn = psycopg2.connect(**config.get_connection_params())
        conn.autocommit = True
        cur = conn.cursor()
        
        # Drop all problematic objects
        cleanup_commands = [
            "DROP VIEW IF EXISTS correlation_matrix CASCADE",
            "DROP MATERIALIZED VIEW IF EXISTS correlation_matrix CASCADE", 
            "DROP VIEW IF EXISTS moving_averages CASCADE",
            "DROP VIEW IF EXISTS volatility_metrics CASCADE"
        ]
        
        for cmd in cleanup_commands:
            try:
                cur.execute(cmd)
                print(f"✅ Executed: {cmd.split()[2]} {cmd.split()[3]}")
            except Exception as e:
                print(f"ℹ️ {cmd.split()[2]} {cmd.split()[3]} didn't exist")
        
        cur.close()
        conn.close()
        print("✅ Cleanup completed")
        
    except Exception as e:
        print(f"⚠️ Cleanup error: {e}")
    
    # PART 1: CREATE FIXED VIEWS
    print("\n📊 PART 1: Creating Fixed Analysis Views")
    print("-" * 40)
    
    views = {
        'daily_returns': """
            CREATE OR REPLACE VIEW daily_returns AS
            WITH price_data AS (
                SELECT 
                    date,
                    symbol,
                    close,
                    LAG(close) OVER (PARTITION BY symbol ORDER BY date) as prev_close
                FROM stock_prices
            )
            SELECT 
                date,
                symbol,
                close,
                prev_close,
                CASE 
                    WHEN prev_close IS NOT NULL AND prev_close > 0 
                    THEN (close - prev_close) / prev_close 
                    ELSE NULL 
                END as simple_return,
                CASE 
                    WHEN prev_close IS NOT NULL AND prev_close > 0 
                    THEN LN(close / prev_close) 
                    ELSE NULL 
                END as log_return
            FROM price_data
            ORDER BY symbol, date
        """,
        
        'moving_averages_fixed': """
            CREATE OR REPLACE VIEW moving_averages_fixed AS
            SELECT 
                date,
                symbol,
                close,
                volume,
                -- Simple moving averages with unique names
                AVG(close) OVER (
                    PARTITION BY symbol 
                    ORDER BY date 
                    ROWS BETWEEN 9 PRECEDING AND CURRENT ROW
                ) as sma_ten,
                AVG(close) OVER (
                    PARTITION BY symbol 
                    ORDER BY date 
                    ROWS BETWEEN 19 PRECEDING AND CURRENT ROW
                ) as sma_twenty,
                AVG(close) OVER (
                    PARTITION BY symbol 
                    ORDER BY date 
                    ROWS BETWEEN 49 PRECEDING AND CURRENT ROW
                ) as sma_fifty,
                -- Volume weighted average price (20-day)
                CASE 
                    WHEN SUM(volume) OVER (
                        PARTITION BY symbol 
                        ORDER BY date 
                        ROWS BETWEEN 19 PRECEDING AND CURRENT ROW
                    ) > 0 THEN
                    SUM(close * volume) OVER (
                        PARTITION BY symbol 
                        ORDER BY date 
                        ROWS BETWEEN 19 PRECEDING AND CURRENT ROW
                    ) / SUM(volume) OVER (
                        PARTITION BY symbol 
                        ORDER BY date 
                        ROWS BETWEEN 19 PRECEDING AND CURRENT ROW
                    )
                    ELSE NULL
                END as vwap_twenty
            FROM stock_prices
            ORDER BY symbol, date
        """,
        
        'volatility_metrics_fixed': """
            CREATE OR REPLACE VIEW volatility_metrics_fixed AS
            WITH returns AS (
                SELECT * FROM daily_returns WHERE log_return IS NOT NULL
            )
            SELECT 
                symbol,
                date,
                log_return,
                -- Rolling volatility (20-day) with proper casting
                CASE 
                    WHEN COUNT(log_return) OVER (
                        PARTITION BY symbol 
                        ORDER BY date 
                        ROWS BETWEEN 19 PRECEDING AND CURRENT ROW
                    ) >= 20 THEN
                    STDDEV(log_return) OVER (
                        PARTITION BY symbol 
                        ORDER BY date 
                        ROWS BETWEEN 19 PRECEDING AND CURRENT ROW
                    ) * SQRT(252)
                    ELSE NULL
                END as volatility_20d,
                -- Rolling volatility (60-day)
                CASE 
                    WHEN COUNT(log_return) OVER (
                        PARTITION BY symbol 
                        ORDER BY date 
                        ROWS BETWEEN 59 PRECEDING AND CURRENT ROW
                    ) >= 60 THEN
                    STDDEV(log_return) OVER (
                        PARTITION BY symbol 
                        ORDER BY date 
                        ROWS BETWEEN 59 PRECEDING AND CURRENT ROW
                    ) * SQRT(252)
                    ELSE NULL
                END as volatility_60d
            FROM returns
        """,
        
        'market_summary_fixed': """
            CREATE OR REPLACE VIEW market_summary_fixed AS
            WITH latest_data AS (
                SELECT DISTINCT ON (symbol) 
                    symbol,
                    date,
                    close,
                    volume
                FROM stock_prices
                ORDER BY symbol, date DESC
            ),
            period_stats AS (
                SELECT 
                    symbol,
                    AVG(close) as avg_price_30d,
                    MIN(close) as min_price_30d,
                    MAX(close) as max_price_30d,
                    SUM(volume) as total_volume_30d
                FROM stock_prices
                WHERE date >= (SELECT MAX(date) FROM stock_prices) - INTERVAL '30 days'
                GROUP BY symbol
            ),
            return_stats AS (
                SELECT 
                    symbol,
                    AVG(log_return) * 252 as annual_return,
                    STDDEV(log_return) * SQRT(252) as annual_volatility
                FROM daily_returns
                WHERE log_return IS NOT NULL
                  AND date >= (SELECT MAX(date) FROM stock_prices) - INTERVAL '252 days'
                GROUP BY symbol
            )
            SELECT 
                l.symbol,
                l.date as last_update,
                l.close as current_price,
                p.avg_price_30d,
                CASE 
                    WHEN p.avg_price_30d > 0 
                    THEN (l.close - p.avg_price_30d) / p.avg_price_30d 
                    ELSE 0 
                END as pct_from_avg_30d,
                p.min_price_30d,
                p.max_price_30d,
                r.annual_return,
                r.annual_volatility,
                CASE 
                    WHEN r.annual_volatility > 0 
                    THEN r.annual_return / r.annual_volatility 
                    ELSE 0 
                END as sharpe_ratio
            FROM latest_data l
            LEFT JOIN period_stats p ON l.symbol = p.symbol
            LEFT JOIN return_stats r ON l.symbol = r.symbol
        """
    }
    
    # Create each view with error handling
    for view_name, view_sql in views.items():
        try:
            conn = psycopg2.connect(**config.get_connection_params())
            conn.autocommit = True
            cur = conn.cursor()
            
            print(f"Creating view: {view_name}...", end='')
            cur.execute(view_sql)
            results['views']['created'].append(view_name)
            print(" ✅")
            
            cur.close()
            conn.close()
            
        except Exception as e:
            print(f" ❌ Error: {str(e)[:80]}...")
            results['views']['failed'].append((view_name, str(e)))
            if 'conn' in locals():
                conn.close()
    
    # PART 2: CREATE CORRELATION TABLE (NOT VIEW)
    print("\n📊 PART 2: Setting Up Fixed Correlation Analysis")
    print("-" * 40)
    
    try:
        conn = psycopg2.connect(**config.get_connection_params())
        conn.autocommit = True
        cur = conn.cursor()
        
        # Create correlation storage table
        print("Creating correlation storage table...", end='')
        cur.execute("""
            CREATE TABLE IF NOT EXISTS stock_correlations (
                symbol1 VARCHAR(10),
                symbol2 VARCHAR(10),
                correlation NUMERIC(10,6),
                observations INTEGER,
                period_start DATE,
                period_end DATE,
                calculated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                PRIMARY KEY (symbol1, symbol2)
            )
        """)
        print(" ✅")
        
        # Get list of symbols for correlation calculation
        cur.execute("SELECT DISTINCT symbol FROM stock_prices ORDER BY symbol LIMIT 10")
        symbols = [row[0] for row in cur.fetchall()]
        print(f"Calculating correlations for {len(symbols)} symbols...")
        
        # Clear old correlations
        cur.execute("TRUNCATE stock_correlations")
        
        # Calculate correlations for each pair
        correlation_count = 0
        total_pairs = len(symbols) * (len(symbols) - 1) // 2
        
        for i, symbol1 in enumerate(symbols):
            for j, symbol2 in enumerate(symbols[i+1:], i+1):
                # Progress indicator
                if correlation_count % 5 == 0:
                    progress = (correlation_count / total_pairs) * 100 if total_pairs > 0 else 0
                    print(f"\r  Progress: {progress:.1f}% ({correlation_count}/{total_pairs})", end='')
                
                # Calculate correlation with sufficient data
                cur.execute("""
                    WITH paired_returns AS (
                        SELECT 
                            r1.date,
                            r1.log_return as return1,
                            r2.log_return as return2
                        FROM daily_returns r1
                        JOIN daily_returns r2 ON r1.date = r2.date
                        WHERE r1.symbol = %s 
                          AND r2.symbol = %s
                          AND r1.log_return IS NOT NULL
                          AND r2.log_return IS NOT NULL
                    )
                    SELECT 
                        CORR(return1, return2) as correlation,
                        COUNT(*) as observations,
                        MIN(date) as period_start,
                        MAX(date) as period_end
                    FROM paired_returns
                    HAVING COUNT(*) >= 30
                """, (symbol1, symbol2))
                
                result = cur.fetchone()
                if result and result[0] is not None:
                    cur.execute("""
                        INSERT INTO stock_correlations 
                        (symbol1, symbol2, correlation, observations, period_start, period_end)
                        VALUES (%s, %s, %s, %s, %s, %s)
                        ON CONFLICT (symbol1, symbol2) DO UPDATE SET
                        correlation = EXCLUDED.correlation,
                        observations = EXCLUDED.observations,
                        calculated_at = CURRENT_TIMESTAMP
                    """, (symbol1, symbol2, result[0], result[1], result[2], result[3]))
                    correlation_count += 1
        
        print(f"\n✅ Calculated {correlation_count} correlations")
        results['correlations']['count'] = correlation_count
        results['correlations']['status'] = 'success'
        
        # Create simple correlation view
        print("Creating correlation view...", end='')
        cur.execute("""
            CREATE OR REPLACE VIEW stock_correlation_view AS
            SELECT 
                symbol1, 
                symbol2, 
                ROUND(CAST(correlation AS numeric), 4) as correlation, 
                observations
            FROM stock_correlations
            ORDER BY correlation DESC
        """)
        print(" ✅")
        
        cur.close()
        conn.close()
        
    except Exception as e:
        print(f"\n❌ Correlation calculation error: {e}")
        results['correlations']['status'] = 'failed'
        if 'conn' in locals():
            conn.close()
    
    # PART 3: TEST FIXED INFRASTRUCTURE
    print("\n🔍 PART 3: Testing Fixed Infrastructure")
    print("-" * 40)
    
    test_queries = {
        "Market Summary (Top Movers)": """
            SELECT symbol, 
                   ROUND(CAST(current_price AS numeric), 2) as current_price,
                   ROUND(CAST(pct_from_avg_30d * 100 AS numeric), 2) as pct_change_30d
            FROM market_summary_fixed
            WHERE current_price IS NOT NULL
            ORDER BY ABS(pct_from_avg_30d) DESC NULLS LAST
            LIMIT 5
        """,
        
        "Recent Volatility Leaders (Fixed)": """
            SELECT symbol, 
                   date,
                   ROUND(CAST(volatility_20d * 100 AS numeric), 1) as volatility_pct
            FROM volatility_metrics_fixed
            WHERE volatility_20d IS NOT NULL
              AND date >= (SELECT MAX(date) FROM volatility_metrics_fixed) - INTERVAL '5 days'
            ORDER BY volatility_20d DESC
            LIMIT 5
        """,
        
        "Top Correlations": """
            SELECT symbol1, symbol2, 
                   correlation as corr,
                   observations as obs
            FROM stock_correlation_view
            WHERE correlation IS NOT NULL
            ORDER BY correlation DESC
            LIMIT 5
        """,
        
        "Moving Average Signals (Fixed)": """
            SELECT symbol,
                   ROUND(CAST(close AS numeric), 2) as price,
                   ROUND(CAST(sma_twenty AS numeric), 2) as sma20,
                   CASE 
                       WHEN close > sma_twenty THEN 'Above'
                       WHEN close < sma_twenty THEN 'Below'
                       ELSE 'At'
                   END as signal
            FROM moving_averages_fixed
            WHERE date = (SELECT MAX(date) FROM moving_averages_fixed)
              AND sma_twenty IS NOT NULL
            ORDER BY symbol
            LIMIT 5
        """
    }
    
    for query_name, query_sql in test_queries.items():
        print(f"\n{query_name}:")
        try:
            conn = psycopg2.connect(**config.get_connection_params())
            cur = conn.cursor()
            
            cur.execute(query_sql)
            results_data = cur.fetchall()
            
            if results_data:
                # Get column names
                columns = [desc[0] for desc in cur.description]
                
                # Print header
                header = " | ".join(f"{col[:12]:^12}" for col in columns)
                print(f"  {header}")
                print(f"  {'-' * len(header)}")
                
                # Print data (limit to first 3 rows for readability)
                for row in results_data[:3]:
                    formatted_row = []
                    for val in row:
                        if isinstance(val, (int, float)):
                            formatted_row.append(f"{val:^12.2f}")
                        else:
                            formatted_row.append(f"{str(val)[:12]:^12}")
                    print(f"  {' | '.join(formatted_row)}")
            else:
                print("  No data available")
            
            cur.close()
            conn.close()
            
        except Exception as e:
            print(f"  Query failed: {e}")
    
    # FINAL SUMMARY
    print("\n" + "=" * 60)
    print("📊 FIXED ANALYSIS INFRASTRUCTURE SUMMARY")
    print("=" * 60)
    
    print(f"\nViews:")
    print(f"  ✅ Created: {len(results['views']['created'])}")
    print(f"  ❌ Failed: {len(results['views']['failed'])}")
    if results['views']['created']:
        print(f"  Available views: {', '.join(results['views']['created'])}")
    
    print(f"\nCorrelations:")
    print(f"  Status: {results['correlations']['status']}")
    print(f"  Calculated: {results['correlations']['count']} pairs")
    
    print("\n📚 Available Fixed Analysis Tools:")
    print("-" * 40)
    print("Views:")
    for view in results['views']['created']:
        print(f"  ✓ {view}")
    print("  ✓ stock_correlation_view")
    
    print("\n📝 Example Usage (Fixed):")
    print("-" * 40)
    print("-- Get top correlations")
    print("SELECT * FROM stock_correlation_view LIMIT 10;")
    print("")
    print("-- Find volatile stocks")
    print("SELECT symbol, volatility_20d")
    print("FROM volatility_metrics_fixed ")
    print("WHERE date = (SELECT MAX(date) FROM volatility_metrics_fixed)")
    print("  AND volatility_20d IS NOT NULL")
    print("ORDER BY volatility_20d DESC;")
    print("")
    print("-- Moving average analysis")
    print("SELECT symbol, close, sma_twenty, (close/sma_twenty - 1)*100 as pct_above_sma")
    print("FROM moving_averages_fixed ")
    print("WHERE date = (SELECT MAX(date) FROM moving_averages_fixed)")
    print("  AND sma_twenty IS NOT NULL;")
    
    print("\n✅ Fixed analysis infrastructure setup complete!")
    return results

# Run the fixed infrastructure setup
print("🚀 Starting FIXED infrastructure setup...")
fixed_results = create_fixed_analysis_infrastructure(db_config)

# Quick validation
print("\n🔍 Final Validation:")
try:
    conn = psycopg2.connect(**db_config.get_connection_params())
    cur = conn.cursor()
    
    # Test each created view
    for view_name in fixed_results['views']['created']:
        try:
            cur.execute(f"SELECT COUNT(*) FROM {view_name}")
            count = cur.fetchone()[0]
            print(f"  {view_name}: {count:,} records ✅")
        except Exception as e:
            print(f"  {view_name}: Error - {e}")
    
    # Test correlation table
    try:
        cur.execute("SELECT COUNT(*) FROM stock_correlations")
        corr_count = cur.fetchone()[0]
        print(f"  stock_correlations: {corr_count} pairs ✅")
    except Exception as e:
        print(f"  stock_correlations: Error - {e}")
    
    cur.close()
    conn.close()
    
except Exception as e:
    print(f"  Validation error: {e}")

print("\n💾 FIXED Infrastructure setup complete! All PostgreSQL compatibility issues resolved.")

🚀 Starting FIXED infrastructure setup...
🏗️ Creating Fixed Analysis Infrastructure

🧹 PART 0: Comprehensive Cleanup
----------------------------------------
✅ Executed: IF EXISTS
✅ Executed: VIEW IF
✅ Executed: IF EXISTS
✅ Executed: IF EXISTS
✅ Cleanup completed

📊 PART 1: Creating Fixed Analysis Views
----------------------------------------
Creating view: daily_returns... ✅
Creating view: moving_averages_fixed... ✅
Creating view: volatility_metrics_fixed... ✅
Creating view: market_summary_fixed... ✅

📊 PART 2: Setting Up Fixed Correlation Analysis
----------------------------------------
Creating correlation storage table... ✅
Calculating correlations for 10 symbols...
  Progress: 88.9% (40/45)
✅ Calculated 45 correlations
Creating correlation view... ✅

🔍 PART 3: Testing Fixed Infrastructure
----------------------------------------

Market Summary (Top Movers):
     symbol    | current_pric | pct_change_3
  ------------------------------------------
       KO      |    77.20     |  

SQL Practice Exercises for Financial Analysis
=====================================================

Now let's practice SQL with real financial queries.
These exercises progress from basic to advanced.

Try to solve each one before looking at the solution!


In [15]:
def generate_sql_exercises():
    """
    Generate a set of SQL exercises with solutions.
    """
    
    exercises = [
        {
            'level': 'Beginner',
            'title': 'Find Recent Prices',
            'question': 'Get the last 10 closing prices for Apple (AAPL)',
            'hint': 'Use WHERE for filtering and ORDER BY with LIMIT',
            'solution': """
SELECT date, close
FROM stock_prices
WHERE symbol = 'AAPL'
ORDER BY date DESC
LIMIT 10;
            """
        },
        {
            'level': 'Beginner',
            'title': 'Average Volume',
            'question': 'Calculate the average daily trading volume for each stock',
            'hint': 'Use GROUP BY and AVG()',
            'solution': """
SELECT 
    symbol,
    AVG(volume) as avg_daily_volume,
    MIN(volume) as min_volume,
    MAX(volume) as max_volume
FROM stock_prices
GROUP BY symbol
ORDER BY avg_daily_volume DESC;
            """
        },
        {
            'level': 'Intermediate',
            'title': 'Monthly Performance',
            'question': 'Calculate monthly returns for each stock in 2023',
            'hint': 'Use DATE_TRUNC() and window functions',
            'solution': """
WITH monthly_prices AS (
    SELECT 
        symbol,
        DATE_TRUNC('month', date) as month,
        FIRST_VALUE(open) OVER (PARTITION BY symbol, DATE_TRUNC('month', date) ORDER BY date) as month_open,
        LAST_VALUE(close) OVER (PARTITION BY symbol, DATE_TRUNC('month', date) ORDER BY date 
                                RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as month_close
    FROM stock_prices
    WHERE EXTRACT(YEAR FROM date) = 2023
)
SELECT DISTINCT
    symbol,
    month,
    month_open,
    month_close,
    (month_close - month_open) / month_open * 100 as monthly_return_pct
FROM monthly_prices
ORDER BY symbol, month;
            """
        },
        {
            'level': 'Intermediate',
            'title': 'Volatility Ranking',
            'question': 'Rank stocks by their 30-day volatility',
            'hint': 'Calculate standard deviation of returns',
            'solution': """
WITH daily_returns AS (
    SELECT 
        symbol,
        date,
        (close - LAG(close) OVER (PARTITION BY symbol ORDER BY date)) / 
         LAG(close) OVER (PARTITION BY symbol ORDER BY date) as return
    FROM stock_prices
    WHERE date >= CURRENT_DATE - INTERVAL '30 days'
)
SELECT 
    symbol,
    STDDEV(return) * SQRT(252) as annualized_volatility,
    COUNT(*) as trading_days,
    RANK() OVER (ORDER BY STDDEV(return) DESC) as volatility_rank
FROM daily_returns
WHERE return IS NOT NULL
GROUP BY symbol
HAVING COUNT(*) > 20  -- Ensure enough data points
ORDER BY annualized_volatility DESC;
            """
        },
        {
            'level': 'Advanced',
            'title': 'Portfolio Correlation',
            'question': 'Find which stocks move together (correlation > 0.7)',
            'hint': 'Self-join daily returns and use CORR() function',
            'solution': """
WITH returns AS (
    SELECT 
        date,
        symbol,
        (close - LAG(close) OVER (PARTITION BY symbol ORDER BY date)) / 
         LAG(close) OVER (PARTITION BY symbol ORDER BY date) as return
    FROM stock_prices
)
SELECT 
    r1.symbol as symbol1,
    r2.symbol as symbol2,
    CORR(r1.return, r2.return) as correlation,
    COUNT(*) as observations
FROM returns r1
JOIN returns r2 ON r1.date = r2.date
WHERE r1.symbol < r2.symbol  -- Avoid duplicates
  AND r1.return IS NOT NULL
  AND r2.return IS NOT NULL
GROUP BY r1.symbol, r2.symbol
HAVING CORR(r1.return, r2.return) > 0.7
   AND COUNT(*) > 100
ORDER BY correlation DESC;
            """
        },
        {
            'level': 'Advanced',
            'title': 'Economic Impact Analysis',
            'question': 'Analyze how stock returns correlate with GDP growth',
            'hint': 'JOIN stock and economic data, calculate correlations',
            'solution': """
WITH monthly_stock_returns AS (
    SELECT 
        DATE_TRUNC('month', date) as month,
        symbol,
        (MAX(close) - MIN(open)) / MIN(open) as monthly_return
    FROM stock_prices
    GROUP BY DATE_TRUNC('month', date), symbol
),
gdp_data AS (
    SELECT 
        date,
        value as gdp_growth
    FROM economic_indicators
    WHERE indicator = 'GDP_GROWTH'
)
SELECT 
    s.symbol,
    CORR(s.monthly_return, g.gdp_growth) as return_gdp_correlation,
    COUNT(*) as observations,
    AVG(s.monthly_return) * 12 as avg_annual_return,
    AVG(g.gdp_growth) as avg_gdp_growth
FROM monthly_stock_returns s
JOIN gdp_data g ON DATE_TRUNC('month', g.date) = s.month
GROUP BY s.symbol
HAVING COUNT(*) > 12  -- At least 1 year of data
ORDER BY return_gdp_correlation DESC;
            """
        }
    ]
    
    # Create exercise notebook
    print("📝 SQL Exercise Notebook")
    print("=" * 60)
    
    for i, exercise in enumerate(exercises, 1):
        print(f"\n{'🟢' if exercise['level'] == 'Beginner' else '🟡' if exercise['level'] == 'Intermediate' else '🔴'} Exercise {i}: {exercise['title']} ({exercise['level']})")
        print("-" * 40)
        print(f"Question: {exercise['question']}")
        print(f"Hint: {exercise['hint']}")
        print("\nTry writing your query here first:")
        print("```sql")
        print("-- Your solution\n\n")
        print("```")
        
        show_solution = input("\nPress Enter to see solution (or 's' to skip): ")
        if show_solution.lower() != 's':
            print("\nSolution:")
            print("```sql")
            print(exercise['solution'].strip())
            print("```")
    
    return exercises

# Run SQL exercises
exercises = generate_sql_exercises()

📝 SQL Exercise Notebook

🟢 Exercise 1: Find Recent Prices (Beginner)
----------------------------------------
Question: Get the last 10 closing prices for Apple (AAPL)
Hint: Use WHERE for filtering and ORDER BY with LIMIT

Try writing your query here first:
```sql
-- Your solution


```



Press Enter to see solution (or 's' to skip):  



Solution:
```sql
SELECT date, close
FROM stock_prices
WHERE symbol = 'AAPL'
ORDER BY date DESC
LIMIT 10;
```

🟢 Exercise 2: Average Volume (Beginner)
----------------------------------------
Question: Calculate the average daily trading volume for each stock
Hint: Use GROUP BY and AVG()

Try writing your query here first:
```sql
-- Your solution


```



Press Enter to see solution (or 's' to skip):  



Solution:
```sql
SELECT 
    symbol,
    AVG(volume) as avg_daily_volume,
    MIN(volume) as min_volume,
    MAX(volume) as max_volume
FROM stock_prices
GROUP BY symbol
ORDER BY avg_daily_volume DESC;
```

🟡 Exercise 3: Monthly Performance (Intermediate)
----------------------------------------
Question: Calculate monthly returns for each stock in 2023
Hint: Use DATE_TRUNC() and window functions

Try writing your query here first:
```sql
-- Your solution


```



Press Enter to see solution (or 's' to skip):  



Solution:
```sql
WITH monthly_prices AS (
    SELECT 
        symbol,
        DATE_TRUNC('month', date) as month,
        FIRST_VALUE(open) OVER (PARTITION BY symbol, DATE_TRUNC('month', date) ORDER BY date) as month_open,
        LAST_VALUE(close) OVER (PARTITION BY symbol, DATE_TRUNC('month', date) ORDER BY date 
                                RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as month_close
    FROM stock_prices
    WHERE EXTRACT(YEAR FROM date) = 2023
)
SELECT DISTINCT
    symbol,
    month,
    month_open,
    month_close,
    (month_close - month_open) / month_open * 100 as monthly_return_pct
FROM monthly_prices
ORDER BY symbol, month;
```

🟡 Exercise 4: Volatility Ranking (Intermediate)
----------------------------------------
Question: Rank stocks by their 30-day volatility
Hint: Calculate standard deviation of returns

Try writing your query here first:
```sql
-- Your solution


```



Press Enter to see solution (or 's' to skip):  



Solution:
```sql
WITH daily_returns AS (
    SELECT 
        symbol,
        date,
        (close - LAG(close) OVER (PARTITION BY symbol ORDER BY date)) / 
         LAG(close) OVER (PARTITION BY symbol ORDER BY date) as return
    FROM stock_prices
    WHERE date >= CURRENT_DATE - INTERVAL '30 days'
)
SELECT 
    symbol,
    STDDEV(return) * SQRT(252) as annualized_volatility,
    COUNT(*) as trading_days,
    RANK() OVER (ORDER BY STDDEV(return) DESC) as volatility_rank
FROM daily_returns
WHERE return IS NOT NULL
GROUP BY symbol
HAVING COUNT(*) > 20  -- Ensure enough data points
ORDER BY annualized_volatility DESC;
```

🔴 Exercise 5: Portfolio Correlation (Advanced)
----------------------------------------
Question: Find which stocks move together (correlation > 0.7)
Hint: Self-join daily returns and use CORR() function

Try writing your query here first:
```sql
-- Your solution


```



Press Enter to see solution (or 's' to skip):  



Solution:
```sql
WITH returns AS (
    SELECT 
        date,
        symbol,
        (close - LAG(close) OVER (PARTITION BY symbol ORDER BY date)) / 
         LAG(close) OVER (PARTITION BY symbol ORDER BY date) as return
    FROM stock_prices
)
SELECT 
    r1.symbol as symbol1,
    r2.symbol as symbol2,
    CORR(r1.return, r2.return) as correlation,
    COUNT(*) as observations
FROM returns r1
JOIN returns r2 ON r1.date = r2.date
WHERE r1.symbol < r2.symbol  -- Avoid duplicates
  AND r1.return IS NOT NULL
  AND r2.return IS NOT NULL
GROUP BY r1.symbol, r2.symbol
HAVING CORR(r1.return, r2.return) > 0.7
   AND COUNT(*) > 100
ORDER BY correlation DESC;
```

🔴 Exercise 6: Economic Impact Analysis (Advanced)
----------------------------------------
Question: Analyze how stock returns correlate with GDP growth
Hint: JOIN stock and economic data, calculate correlations

Try writing your query here first:
```sql
-- Your solution


```



Press Enter to see solution (or 's' to skip):  



Solution:
```sql
WITH monthly_stock_returns AS (
    SELECT 
        DATE_TRUNC('month', date) as month,
        symbol,
        (MAX(close) - MIN(open)) / MIN(open) as monthly_return
    FROM stock_prices
    GROUP BY DATE_TRUNC('month', date), symbol
),
gdp_data AS (
    SELECT 
        date,
        value as gdp_growth
    FROM economic_indicators
    WHERE indicator = 'GDP_GROWTH'
)
SELECT 
    s.symbol,
    CORR(s.monthly_return, g.gdp_growth) as return_gdp_correlation,
    COUNT(*) as observations,
    AVG(s.monthly_return) * 12 as avg_annual_return,
    AVG(g.gdp_growth) as avg_gdp_growth
FROM monthly_stock_returns s
JOIN gdp_data g ON DATE_TRUNC('month', g.date) = s.month
GROUP BY s.symbol
HAVING COUNT(*) > 12  -- At least 1 year of data
ORDER BY return_gdp_correlation DESC;
```


Week 1 Complete - Integration and Next Steps
====================================================

Congratulations! You've successfully:
✅ Set up PostgreSQL database
✅ Created proper table schemas
✅ Imported all financial datasets
✅ Created analysis views
✅ Validated data quality
✅ Practiced SQL queries

For your 10-minute presentation, prepare:
1. **Database Schema Diagram**: Show your understanding of table relationships
2. **Interesting Query**: One SQL query that reveals a pattern
3. **Data Quality Issue**: Any problem you found and how to fix it
4. **Project Vision**: How this data supports your charter goals

Let's wrap up and prepare for Week 2.

Your data pipeline is ready! Next week we'll:
1. Clean and validate data using pandas/polars
2. Handle missing values and outliers
3. Create derived features
4. Build visualizations
5. Generate insights
