In [1]:
# ==========================================================
# Cell 1: Environment Setup & Verification
# ==========================================================
# Purpose:
#   - Confirm that the working directory is correct
#   - Import required libraries for the ETL pipeline
#   - Verify version compatibility and environment readiness
# ==========================================================

import os
import sys
import pandas as pd
import numpy as np
import sqlalchemy
import yaml

# --- Display environment info ---
print("Python version :", sys.version.split()[0])
print("Pandas version :", pd.__version__)
print("NumPy version  :", np.__version__)
print("SQLAlchemy     :", sqlalchemy.__version__)

# --- Verify working directory ---
cwd = os.getcwd()
print("\nCurrent working directory:")
print(cwd)

expected_dirs = ["data", "scripts", "config", "logs"]
print("\nChecking for required directories...")
for d in expected_dirs:
    if not os.path.exists(d):
        os.makedirs(d, exist_ok=True)
        print(f"Created missing directory: {d}")
    else:
        print(f"✓ Found: {d}")

print("\n✅ Environment verification complete.")


Python version : 3.11.14
Pandas version : 2.2.3
NumPy version  : 2.1.2
SQLAlchemy     : 2.0.36

Current working directory:
/home/parallels/projects/insurance-data-ingestion

Checking for required directories...
Created missing directory: data
Created missing directory: scripts
Created missing directory: config
Created missing directory: logs

✅ Environment verification complete.


In [2]:
# ==========================================================
# Cell 2: Configuration and Mock Data Setup
# ==========================================================
# Purpose:
#   - Generate sample YAML mapping for column renaming and normalization
#   - Create synthetic raw insurance data to simulate a client feed
#   - Verify files are saved for downstream transformation
# ==========================================================

import pandas as pd
import numpy as np
import yaml
import os

# --- Ensure directories exist ---
os.makedirs("config", exist_ok=True)
os.makedirs("data/raw", exist_ok=True)

# --- 1. Create YAML mapping configuration ------------------
mapping = {
    "rename_columns": {
        "policy_no": "policy_number",
        "insured_nm": "insured_name",
        "eff_dt": "effective_date",
        "exp_dt": "expiration_date",
        "state_cd": "state"
    },
    "date_fields": ["effective_date", "expiration_date"],
    "normalize_fields": {
        "state": {
            "ind.": "IN",
            "ill.": "IL",
            "": "UNKNOWN"
        }
    },
    "drop_duplicates": True
}

with open("config/client_mapping.yml", "w") as f:
    yaml.dump(mapping, f, sort_keys=False)

print("✅ YAML mapping configuration created → config/client_mapping.yml")


# --- 2. Create mock insurance data feed --------------------
np.random.seed(42)
n_records = 20

data = {
    "policy_no": [f"P{1000+i}" for i in range(n_records)],
    "insured_nm": [f"Customer {i}" for i in range(1, n_records+1)],
    "eff_dt": pd.date_range("2024-01-01", periods=n_records, freq="15D"),
    "exp_dt": pd.date_range("2024-12-31", periods=n_records, freq="15D"),
    "state_cd": np.random.choice(["ind.", "ill.", ""], size=n_records)
}

df_raw = pd.DataFrame(data)
df_raw.to_csv("data/raw/client_insurance_feed.csv", index=False)

print("✅ Mock raw data created → data/raw/client_insurance_feed.csv")
print("\nSample:")
display(df_raw.head())


✅ YAML mapping configuration created → config/client_mapping.yml
✅ Mock raw data created → data/raw/client_insurance_feed.csv

Sample:


Unnamed: 0,policy_no,insured_nm,eff_dt,exp_dt,state_cd
0,P1000,Customer 1,2024-01-01,2024-12-31,
1,P1001,Customer 2,2024-01-16,2025-01-15,ind.
2,P1002,Customer 3,2024-01-31,2025-01-30,
3,P1003,Customer 4,2024-02-15,2025-02-14,
4,P1004,Customer 5,2024-03-01,2025-03-01,ind.


In [3]:
# ==========================================================
# Cell 3: Data Transformation
# ==========================================================
# Purpose:
#   - Apply ETL transformation rules using YAML configuration
#   - Standardize column names, formats, and categories
#   - Save cleaned file to data/processed/ for validation
# ==========================================================

import pandas as pd
import yaml
import os

# --- Directories ---
RAW_FILE = "data/raw/client_insurance_feed.csv"
CONFIG_FILE = "config/client_mapping.yml"
PROCESSED_DIR = "data/processed"
os.makedirs(PROCESSED_DIR, exist_ok=True)

# --- Load configuration ---
with open(CONFIG_FILE, "r") as f:
    mapping = yaml.safe_load(f)

# --- Read raw file ---
df = pd.read_csv(RAW_FILE)
print(f"Loaded raw data: {df.shape[0]} records, {df.shape[1]} columns")

# --- Apply transformations ---
# 1. Rename columns
df = df.rename(columns=mapping["rename_columns"])

# 2. Convert date fields
for col in mapping.get("date_fields", []):
    if col in df.columns:
        df[col] = pd.to_datetime(df[col], errors="coerce")

# 3. Normalize categorical text
for col, rule in mapping.get("normalize_fields", {}).items():
    if col in df.columns:
        df[col] = df[col].astype(str).str.strip().str.lower().replace(rule)

# 4. Drop duplicates if specified
if mapping.get("drop_duplicates", True):
    before = len(df)
    df = df.drop_duplicates()
    after = len(df)
    print(f"Dropped {before - after} duplicate rows (if any)")

# --- Save output ---
output_path = os.path.join(PROCESSED_DIR, "client_insurance_feed_processed.csv")
df.to_csv(output_path, index=False)

print(f"✅ Transformation complete → {output_path}")
print("\nSample of transformed data:")
display(df.head())


Loaded raw data: 20 records, 5 columns
Dropped 0 duplicate rows (if any)
✅ Transformation complete → data/processed/client_insurance_feed_processed.csv

Sample of transformed data:


Unnamed: 0,policy_number,insured_name,effective_date,expiration_date,state
0,P1000,Customer 1,2024-01-01,2024-12-31,
1,P1001,Customer 2,2024-01-16,2025-01-15,IN
2,P1002,Customer 3,2024-01-31,2025-01-30,
3,P1003,Customer 4,2024-02-15,2025-02-14,
4,P1004,Customer 5,2024-03-01,2025-03-01,IN


In [4]:
# ==========================================================
# Cell 4: Validation & Quality Checks
# ==========================================================
# Purpose:
#   - Verify processed data conforms to expected schema
#   - Detect missing or invalid values
#   - Summarize validation results for downstream import
# ==========================================================

import pandas as pd
import os

# --- Paths ---
PROCESSED_FILE = "data/processed/client_insurance_feed_processed.csv"

# --- Load processed data ---
if not os.path.exists(PROCESSED_FILE):
    raise FileNotFoundError("Processed file not found. Run Cell 3 first.")

df = pd.read_csv(PROCESSED_FILE)
print(f"Loaded processed file: {df.shape[0]} rows, {df.shape[1]} columns")

# --- Define expected schema ---
expected_columns = ["policy_number", "insured_name", "effective_date", "expiration_date", "state"]

missing_cols = [c for c in expected_columns if c not in df.columns]
extra_cols = [c for c in df.columns if c not in expected_columns]

if missing_cols:
    print(f"❌ Missing expected columns: {missing_cols}")
else:
    print("✅ All expected columns present.")

if extra_cols:
    print(f"⚠️ Extra columns detected: {extra_cols}")

# --- Check for missing values ---
missing_summary = df.isna().sum()
print("\nMissing value summary:")
print(missing_summary[missing_summary > 0] if missing_summary.any() else "✅ No missing values found.")

# --- Check data types ---
print("\nData types:")
print(df.dtypes)

# --- Record count sanity check ---
if len(df) == 0:
    print("❌ Validation failed: no records present.")
else:
    print(f"✅ Record count OK: {len(df)} rows.")

# --- Summary flag ---
if not missing_cols and len(df) > 0:
    print("\n🎯 Validation PASSED — file ready for SQL import.")
else:
    print("\n⚠️ Validation FAILED — investigate missing or invalid fields.")


Loaded processed file: 20 rows, 5 columns
✅ All expected columns present.

Missing value summary:
state    9
dtype: int64

Data types:
policy_number      object
insured_name       object
effective_date     object
expiration_date    object
state              object
dtype: object
✅ Record count OK: 20 rows.

🎯 Validation PASSED — file ready for SQL import.


In [5]:
# ==========================================================
# Cell 5: SQL Import Simulation
# ==========================================================
# Purpose:
#   - Simulate loading validated insurance data into a SQL table
#   - Demonstrate schema creation, data insertion, and verification
# ==========================================================

import pandas as pd
import os
from sqlalchemy import create_engine, text

# --- Paths and setup ---
os.makedirs("data/outputs", exist_ok=True)
DB_PATH = "data/outputs/insurance_data.db"
TABLE_NAME = "insurance_policies"
PROCESSED_FILE = "data/processed/client_insurance_feed_processed.csv"

# --- Load processed data ---
df = pd.read_csv(PROCESSED_FILE)

# --- Create SQLite database connection ---
engine = create_engine(f"sqlite:///{DB_PATH}", echo=False)

# --- Write data to SQL table (replace if exists) ---
df.to_sql(TABLE_NAME, con=engine, if_exists="replace", index=False)

# --- Confirm row count ---
with engine.connect() as conn:
    result = conn.execute(text(f"SELECT COUNT(*) FROM {TABLE_NAME}"))
    row_count = result.scalar_one()
    print(f"✅ Loaded {row_count} records into '{TABLE_NAME}' table.")

# --- Preview data ---
preview = pd.read_sql(f"SELECT * FROM {TABLE_NAME} LIMIT 5;", engine)
print("\nSample of data loaded into SQL table:")
display(preview)

# --- Verify schema ---
with engine.connect() as conn:
    schema_info = conn.execute(text(f"PRAGMA table_info({TABLE_NAME});")).fetchall()

print("\nTable schema:")
for col in schema_info:
    print(f" - {col[1]} ({col[2]})")

print("\n✅ SQL import simulation complete → data/outputs/insurance_data.db")


✅ Loaded 20 records into 'insurance_policies' table.

Sample of data loaded into SQL table:


Unnamed: 0,policy_number,insured_name,effective_date,expiration_date,state
0,P1000,Customer 1,2024-01-01,2024-12-31,
1,P1001,Customer 2,2024-01-16,2025-01-15,IN
2,P1002,Customer 3,2024-01-31,2025-01-30,
3,P1003,Customer 4,2024-02-15,2025-02-14,
4,P1004,Customer 5,2024-03-01,2025-03-01,IN



Table schema:
 - policy_number (TEXT)
 - insured_name (TEXT)
 - effective_date (TEXT)
 - expiration_date (TEXT)
 - state (TEXT)

✅ SQL import simulation complete → data/outputs/insurance_data.db


In [6]:
# ==========================================================
# Cell 6: Logging & Summary Report
# ==========================================================
# Purpose:
#   - Provide end-to-end summary of ETL pipeline run
#   - Write timestamped log to /logs directory
# ==========================================================

import os
import json
from datetime import datetime

# --- Paths ---
os.makedirs("logs", exist_ok=True)
LOG_PATH = f"logs/pipeline_run_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"

# --- Gather stats ---
summary = {
    "timestamp": datetime.now().isoformat(),
    "raw_input": "data/raw/client_insurance_feed.csv",
    "processed_output": "data/processed/client_insurance_feed_processed.csv",
    "database_output": "data/outputs/insurance_data.db",
    "table_name": "insurance_policies",
    "records_processed": len(df),
    "status": "SUCCESS",
    "validation_passed": True,
    "notes": "All pipeline stages completed without errors."
}

# --- Write log ---
with open(LOG_PATH, "w") as f:
    json.dump(summary, f, indent=4)

# --- Display summary ---
print("📋 PIPELINE SUMMARY")
print("-" * 60)
for key, value in summary.items():
    print(f"{key:20s}: {value}")
print("-" * 60)
print(f"✅ Log written to → {LOG_PATH}")

📋 PIPELINE SUMMARY
------------------------------------------------------------
timestamp           : 2025-10-26T15:19:39.837574
raw_input           : data/raw/client_insurance_feed.csv
processed_output    : data/processed/client_insurance_feed_processed.csv
database_output     : data/outputs/insurance_data.db
table_name          : insurance_policies
records_processed   : 20
status              : SUCCESS
validation_passed   : True
notes               : All pipeline stages completed without errors.
------------------------------------------------------------
✅ Log written to → logs/pipeline_run_20251026_151939.json
