# 01: Data Ingestion & Database Loading

## Purpose
This notebook loads, cleans, and ingests CSV data into PostgreSQL with **correct column mappings from the start**.

## Key Features
- Loads CSV files (with original column names like `PL_YTD`, `MV_Base`)
- Maps columns to database format during ingestion (`plytd`, `mvbase`)
- Cleans data (duplicates, missing values, date parsing)
- Loads into PostgreSQL with correct column names
- Creates indexes for performance

## Column Mapping Strategy
**CRITICAL**: During ingestion, CSV columns are mapped to database column names:
- `PL_YTD` → `plytd` (no underscore, lowercase)
- `PL_MTD` → `plmtd`
- `PL_QTD` → `plqtd`
- `PL_DTD` → `pldtd`
- `MV_Base` → `mvbase`
- `MV_Local` → `mvlocal`
- `PortfolioName` → `portfolioname`
- All other columns → lowercase

This ensures the database tables have the correct column names from the start, eliminating the need for normalization later.

In [1]:
# Cell 1: Imports and Setup
import pandas as pd
import json
import re
import numpy as np
from pathlib import Path
from datetime import datetime, timezone
from sqlalchemy import create_engine, text, inspect, pool
import os
from dotenv import load_dotenv

# Project root
project_root = Path.cwd().parent
data_dir = project_root / "data"

# Load environment variables
load_dotenv(project_root.parent / ".env")

# Database URL
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/fund_data")
if DATABASE_URL.startswith("postgresql://") and "+psycopg" not in DATABASE_URL:
    DATABASE_URL = DATABASE_URL.replace("postgresql://", "postgresql+psycopg://", 1)

print("✅ Imports loaded")
print(f"   Data directory: {data_dir}")
print(f"   Database: {DATABASE_URL.split('@')[-1] if '@' in DATABASE_URL else DATABASE_URL}")

✅ Imports loaded
   Data directory: /Users/suren/Desktop/untitled folder 2/loop-task/data
   Database: localhost:5432/funddb


In [2]:
# Cell 2: Column Mapping Function
# CRITICAL: Maps CSV column names to database column names

def map_column_to_database(csv_col: str) -> str:
    """Map CSV column name to database column name.
    
    Database uses lowercase, no underscores for P&L columns.
    """
    col_lower = csv_col.lower()
    
    # P&L columns: remove underscores
    if "pl_ytd" in col_lower or col_lower == "plytd":
        return "plytd"
    elif "pl_mtd" in col_lower or col_lower == "plmtd":
        return "plmtd"
    elif "pl_qtd" in col_lower or col_lower == "plqtd":
        return "plqtd"
    elif "pl_dtd" in col_lower or col_lower == "pldtd":
        return "pldtd"
    # Market value columns
    elif "mv_base" in col_lower or col_lower == "mvbase":
        return "mvbase"
    elif "mv_local" in col_lower or col_lower == "mvlocal":
        return "mvlocal"
    # All other columns: convert to lowercase
    else:
        return col_lower

# Test mapping
test_cols = ["PL_YTD", "PL_MTD", "MV_Base", "PortfolioName", "Quantity"]
print("Column Mapping Examples:")
for col in test_cols:
    print(f"  {col} → {map_column_to_database(col)}")

Column Mapping Examples:
  PL_YTD → plytd
  PL_MTD → plmtd
  MV_Base → mvbase
  PortfolioName → portfolioname
  Quantity → quantity


In [3]:
# Cell 3: Load and Map Holdings Data

holdings_file = data_dir / "holdings.csv"
holdings_df = pd.read_csv(holdings_file)

print(f"✅ Holdings loaded: {len(holdings_df)} rows, {len(holdings_df.columns)} columns")
print(f"   Original columns: {list(holdings_df.columns[:5])}...")

# Map columns to database names
column_mapping = {col: map_column_to_database(col) for col in holdings_df.columns}
holdings_df = holdings_df.rename(columns=column_mapping)

print(f"✅ Columns mapped to database format")
print(f"   Database columns: {list(holdings_df.columns[:5])}...")
print(f"   Key mappings: PL_YTD → plytd, MV_Base → mvbase")

# Remove duplicates
original_count = len(holdings_df)
holdings_df = holdings_df.drop_duplicates()
removed = original_count - len(holdings_df)
if removed > 0:
    print(f"⚠️  Removed {removed} duplicate rows")

# Add ingestion timestamp
holdings_df['ingested_at'] = datetime.now(timezone.utc)

print(f"✅ Holdings ready: {len(holdings_df)} rows")

✅ Holdings loaded: 1022 rows, 25 columns
   Original columns: ['AsOfDate', 'OpenDate', 'CloseDate', 'ShortName', 'PortfolioName']...
✅ Columns mapped to database format
   Database columns: ['asofdate', 'opendate', 'closedate', 'shortname', 'portfolioname']...
   Key mappings: PL_YTD → plytd, MV_Base → mvbase
⚠️  Removed 2 duplicate rows
✅ Holdings ready: 1020 rows


In [4]:
# Cell 4: Load and Map Trades Data

trades_file = data_dir / "trades.csv"
trades_df = pd.read_csv(trades_file)

print(f"✅ Trades loaded: {len(trades_df)} rows, {len(trades_df.columns)} columns")
print(f"   Original columns: {list(trades_df.columns[:5])}...")

# Map columns to database names
column_mapping = {col: map_column_to_database(col) for col in trades_df.columns}
trades_df = trades_df.rename(columns=column_mapping)

print(f"✅ Columns mapped to database format")
print(f"   Database columns: {list(trades_df.columns[:5])}...")

# Remove duplicates
original_count = len(trades_df)
trades_df = trades_df.drop_duplicates()
removed = original_count - len(trades_df)
if removed > 0:
    print(f"⚠️  Removed {removed} duplicate rows")

# Add ingestion timestamp
trades_df['ingested_at'] = datetime.now(timezone.utc)

print(f"✅ Trades ready: {len(trades_df)} rows")

✅ Trades loaded: 649 rows, 31 columns
   Original columns: ['id', 'RevisionId', 'AllocationId', 'TradeTypeName', 'SecurityId']...
✅ Columns mapped to database format
   Database columns: ['id', 'revisionid', 'allocationid', 'tradetypename', 'securityid']...
✅ Trades ready: 649 rows


In [5]:
# Cell 5: Clean Data (Date Parsing, Type Conversion)

def clean_dataframe(df: pd.DataFrame, name: str) -> pd.DataFrame:
    """Clean dataframe: parse dates, convert types."""
    df = df.copy()
    
    # Parse date columns
    date_cols = [col for col in df.columns if 'date' in col.lower()]
    for col in date_cols:
        if col in df.columns:
            try:
                # Try common date formats
                df[col] = pd.to_datetime(df[col], format='%m/%d/%y', errors='coerce')
                if df[col].isna().all():
                    df[col] = pd.to_datetime(df[col], format='%m/%d/%Y', errors='coerce')
            except:
                df[col] = pd.to_datetime(df[col], errors='coerce')
    
    # Clean string columns
    string_cols = df.select_dtypes(include=['object']).columns
    for col in string_cols:
        if col != 'ingested_at':
            df[col] = df[col].astype(str).str.strip()
            df[col] = df[col].replace(['', 'nan', 'None', 'null', 'NULL'], np.nan)
    
    return df

# Clean both dataframes
holdings_df = clean_dataframe(holdings_df, "Holdings")
trades_df = clean_dataframe(trades_df, "Trades")

print("✅ Data cleaned")
print(f"   Holdings: {len(holdings_df)} rows")
print(f"   Trades: {len(trades_df)} rows")

✅ Data cleaned
   Holdings: 1020 rows
   Trades: 649 rows


In [6]:
# Cell 6: Connect to Database and Load Data

# Create database engine
engine = create_engine(
    DATABASE_URL,
    poolclass=pool.QueuePool,
    pool_size=10,
    max_overflow=20,
    pool_pre_ping=True,
    echo=False
)

# Test connection
with engine.connect() as conn:
    conn.execute(text("SELECT 1"))
print("✅ Database connected")

# Load holdings
holdings_df.to_sql(
    "fund_holdings",
    engine,
    if_exists="replace",
    index=False,
    chunksize=1000
)
print(f"✅ Holdings loaded to database: {len(holdings_df)} rows")

# Load trades
trades_df.to_sql(
    "fund_trades",
    engine,
    if_exists="replace",
    index=False,
    chunksize=1000
)
print(f"✅ Trades loaded to database: {len(trades_df)} rows")

✅ Database connected
✅ Holdings loaded to database: 1020 rows
✅ Trades loaded to database: 649 rows


In [7]:
# Cell 7: Create Indexes and Verify

# Create indexes for performance
with engine.connect() as conn:
    # Holdings indexes
    conn.execute(text("CREATE INDEX IF NOT EXISTS idx_holdings_portfolio ON fund_holdings(portfolioname)"))
    conn.execute(text("CREATE INDEX IF NOT EXISTS idx_holdings_date ON fund_holdings(asofdate)"))
    conn.commit()
    print("✅ Holdings indexes created")
    
    # Trades indexes
    conn.execute(text("CREATE INDEX IF NOT EXISTS idx_trades_portfolio ON fund_trades(portfolioname)"))
    conn.execute(text("CREATE INDEX IF NOT EXISTS idx_trades_date ON fund_trades(tradedate)"))
    conn.commit()
    print("✅ Trades indexes created")

# Verify table structure
inspector = inspect(engine)
holdings_cols = [col['name'] for col in inspector.get_columns('fund_holdings')]
trades_cols = [col['name'] for col in inspector.get_columns('fund_trades')]

print(f"\n✅ Database tables verified")
print(f"   Holdings columns: {len(holdings_cols)}")
print(f"   Key columns: {[c for c in holdings_cols if 'pl' in c.lower() or 'mv' in c.lower()][:5]}")
print(f"   Trades columns: {len(trades_cols)}")

# Verify row counts
with engine.connect() as conn:
    holdings_count = conn.execute(text("SELECT COUNT(*) FROM fund_holdings")).scalar()
    trades_count = conn.execute(text("SELECT COUNT(*) FROM fund_trades")).scalar()
    
print(f"\n✅ Row counts verified")
print(f"   Holdings: {holdings_count} rows")
print(f"   Trades: {trades_count} rows")

✅ Holdings indexes created
✅ Trades indexes created

✅ Database tables verified
   Holdings columns: 26
   Key columns: ['mvlocal', 'mvbase', 'pldtd', 'plqtd', 'plmtd']
   Trades columns: 32

✅ Row counts verified
   Holdings: 1020 rows
   Trades: 649 rows
