<a href="https://colab.research.google.com/github/prince0586/AI-Powered-Marketing-Copy-Generator/blob/master/VitalFlow_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import numpy as np
import logging
import os
import time
import argparse
import functools
from datetime import datetime, timedelta
from typing import Optional, Tuple, List, Dict, Any
from sqlalchemy import create_engine, text, exc
from contextlib import contextmanager
import sqlite3

# Secrets should never be hardcoded. We fetch from Env with safe defaults for the demo.
CONFIG = {
    'DB_CONNECTION_STRING': os.getenv('VF_DB_CONN', 'sqlite:///icu_warehouse.db'),
    'BATCH_SIZE': int(os.getenv('VF_BATCH_SIZE', 100)),
    'RETENTION_DAYS': int(os.getenv('VF_RETENTION', 30)),
    'LOG_LEVEL': os.getenv('VF_LOG_LEVEL', 'INFO').upper()
}

# --- OBSERVABILITY SETUP ---
logging.basicConfig(
    level=CONFIG['LOG_LEVEL'],
    format='%(asctime)s - [VitalFlow] - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)

# --- RESILIENCE: CUSTOM RETRY DECORATOR ---
def retry_with_backoff(retries=3, backoff_in_seconds=1):
    """
    Decorator to retry transient database failures with exponential backoff.
    Crucial for production systems where network blips occur.
    """
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            x = 0
            while True:
                try:
                    return func(*args, **kwargs)
                except (exc.OperationalError, exc.TimeoutError, sqlite3.OperationalError) as e:
                    if x == retries:
                        logger.critical(f"Failed after {retries} retries. Error: {e}")
                        raise
                    sleep = (backoff_in_seconds * 2 ** x)
                    logger.warning(f"Transient Error: {e}. Retrying in {sleep}s...")
                    time.sleep(sleep)
                    x += 1
        return wrapper
    return decorator

@contextmanager
def operation_timer(op_name: str):
    """Context manager to measure execution time of pipeline stages."""
    start = time.time()
    yield
    elapsed = time.time() - start
    logger.info(f"‚è±Ô∏è  {op_name} completed in {elapsed:.4f}s")

# --- CORE CLASSES ---

class IoTGenerator:
    """Simulates an external high-velocity IoT Message Queue (Kafka/Kinesis)."""

    @staticmethod
    def fetch_stream_data(last_timestamp: Optional[str] = None, batch_size: int = 50) -> pd.DataFrame:
        with operation_timer("Data Extraction"):
            logger.info("Connecting to IoT Sensor Stream...")

            # Determine start time (Incremental Logic)
            if last_timestamp:
                start_time = pd.to_datetime(last_timestamp) + timedelta(seconds=1)
                logger.info(f"Incremental Load: Fetching data strictly after {start_time}")
            else:
                start_time = datetime.now() - timedelta(hours=1)
                logger.info("Cold Start: Fetching historical data from 1 hour ago")

            data = []
            patient_ids = [f"P-{i:03d}" for i in range(1, 10)]
            current_sim_time = start_time

            for _ in range(batch_size):
                current_sim_time += timedelta(seconds=np.random.randint(1, 30))
                pid = np.random.choice(patient_ids)

                # Simulate "Dirty" Data (Sensor Failures)
                hr = int(np.random.normal(80, 20))
                if np.random.random() < 0.05: hr = -999

                # Simulate Missing Data
                bp = int(np.random.normal(120, 15))
                if np.random.random() < 0.05: bp = None # type: ignore

                record = {
                    "event_id": f"{pid}_{current_sim_time.timestamp()}",
                    "timestamp": current_sim_time.isoformat(),
                    "patient_id": pid,
                    "heart_rate": hr,
                    "systolic_bp": bp,
                    "sensor_type": "WIFI_V2"
                }
                data.append(record)

            logger.info(f"Stream fetched {len(data)} new records.")
            return pd.DataFrame(data)

class QualityGate:
    """Enforces Data Contracts and segregates bad data."""

    def validate(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
        if df.empty: return df, pd.DataFrame()

        with operation_timer("Data Validation"):
            total_rows = len(df)

            # 1. Schema Validation (Implicit via DataFrame, but could be Pydantic/Pandera)

            # 2. Business Logic Validation
            # Rule: BP cannot be null
            has_bp = df['systolic_bp'].notnull()
            # Rule: Heart Rate must be physiological (0-300)
            valid_hr = (df['heart_rate'] > 0) & (df['heart_rate'] < 300)

            valid_mask = has_bp & valid_hr

            silver_df = df[valid_mask].copy()
            quarantine_df = df[~valid_mask].copy()

            if not quarantine_df.empty:
                quarantine_df['error_reason'] = "Validation Failed (Null BP or Invalid HR)"
                quarantine_df['ingested_at'] = datetime.now().isoformat()
                logger.warning(f"‚ö†Ô∏è Quarantined {len(quarantine_df)} records.")

            pass_rate = (len(silver_df)/total_rows)*100
            logger.info(f"Quality Pass Rate: {pass_rate:.1f}%")

            return silver_df, quarantine_df

class DataWarehouse:
    """Manages Database persistence with Resilience Patterns."""

    def __init__(self, connection_string: str):
        self.engine = create_engine(connection_string)

    @retry_with_backoff(retries=3)
    def get_watermark(self) -> Optional[str]:
        """Get max timestamp for incremental loading. Retries on connection failure."""
        try:
            with self.engine.connect() as conn:
                if not conn.dialect.has_table(conn, "silver_vitals"):
                    return None
                result = conn.execute(text("SELECT MAX(timestamp) FROM silver_vitals"))
                return result.scalar()
        except exc.SQLAlchemyError as e:
            # If table doesn't exist yet, that's fine, not a retryable error
            return None

    @retry_with_backoff(retries=3)
    def load_batch(self, table_name: str, df: pd.DataFrame, if_exists: str = 'append'):
        if df.empty: return
        with operation_timer(f"Load to {table_name}"):
            df.to_sql(table_name, self.engine, if_exists=if_exists, index=False)
            logger.info(f"Successfully loaded {len(df)} rows into {table_name}")

    @retry_with_backoff(retries=3)
    def refresh_gold_layer(self) -> pd.DataFrame:
        """Runs ELT transformation logic inside the Data Warehouse."""
        query = """
        SELECT
            patient_id,
            strftime('%Y-%m-%d', timestamp) as date,
            strftime('%H', timestamp) as hour,
            AVG(heart_rate) as avg_hr_hourly,
            MAX(heart_rate) as max_hr_hourly,
            AVG(systolic_bp) as avg_bp_hourly,
            COUNT(*) as record_count
        FROM silver_vitals
        GROUP BY patient_id, date, hour
        ORDER BY date DESC, hour DESC
        """
        with operation_timer("Refining Gold Layer"):
            with self.engine.connect() as conn:
                gold_df = pd.read_sql(query, conn)
                gold_df.to_sql('gold_patient_hourly', self.engine, if_exists='replace', index=False)
                return gold_df

    def reset_warehouse(self):
        """Admin utility to wipe data."""
        with self.engine.connect() as conn:
            conn.execute(text("DROP TABLE IF EXISTS silver_vitals"))
            conn.execute(text("DROP TABLE IF EXISTS gold_patient_hourly"))
            conn.execute(text("DROP TABLE IF EXISTS error_quarantine"))
            logger.warning("‚ôªÔ∏è  Warehouse wiped clean.")

class VitalFlowPipeline:
    """Orchestrator."""

    def __init__(self, reset_db: bool = False):
        self.dw = DataWarehouse(CONFIG['DB_CONNECTION_STRING'])
        self.quality = QualityGate()
        self.generator = IoTGenerator()

        if reset_db:
            self.dw.reset_warehouse()

    def run(self, dry_run: bool = False):
        logger.info(">>> üöÄ STARTING PIPELINE JOB")

        # 1. State Management
        last_ts = self.dw.get_watermark()
        logger.info(f"Watermark: {last_ts if last_ts else 'None (Cold Start)'}")

        # 2. Extract
        raw_df = self.generator.fetch_stream_data(last_ts, batch_size=CONFIG['BATCH_SIZE'])
        if raw_df.empty:
            logger.info("No new data. Sleeping...")
            return

        # 3. Transform & Validate
        silver_df, error_df = self.quality.validate(raw_df)

        if dry_run:
            logger.info("DRY RUN ENABLED: Skipping DB Load")
            logger.info(f"Would load {len(silver_df)} Silver rows and {len(error_df)} Quarantine rows.")
            return

        # 4. Load
        self.dw.load_batch('silver_vitals', silver_df)
        self.dw.load_batch('error_quarantine', error_df)

        # 5. Aggregate
        gold_preview = self.dw.refresh_gold_layer()

        logger.info(">>> ‚úÖ JOB COMPLETE")
        print("\n--- GOLD LAYER PREVIEW (BI Ready) ---")
        print(gold_preview.head(5).to_string())
        print("-------------------------------------\n")

if __name__ == "__main__":
    # 11/10 Upgrade: CLI Arguments for Operational Control
    parser = argparse.ArgumentParser(description="VitalFlow ETL Pipeline CLI")
    parser.add_argument('--reset', action='store_true', help="Wipe database before running")
    parser.add_argument('--dry-run', action='store_true', help="Process data but do not commit to DB")
    parser.add_argument('--loop', action='store_true', help="Run continuously in a loop")

    # Fix: Pass an empty list to parse_args() when running in a notebook
    # to prevent argparse from trying to interpret kernel-specific arguments.
    args = parser.parse_args([]) # Changed this line

    pipeline = VitalFlowPipeline(reset_db=args.reset)

    if args.loop:
        try:
            while True:
                pipeline.run(dry_run=args.dry_run)
                time.sleep(5)
        except KeyboardInterrupt:
            logger.info("Pipeline stopped by user.")
    else:
        pipeline.run(dry_run=args.dry_run)

dw = DataWarehouse(CONFIG['DB_CONNECTION_STRING'])
with dw.engine.connect() as conn:
    quarantined_data = pd.read_sql('SELECT * FROM error_quarantine', conn)
display(quarantined_data)




--- GOLD LAYER PREVIEW (BI Ready) ---
  patient_id        date hour  avg_hr_hourly  max_hr_hourly  avg_bp_hourly  record_count
0      P-001  2025-12-01   12      87.647059            129     118.705882            17
1      P-002  2025-12-01   12      76.875000            112     128.750000            16
2      P-003  2025-12-01   12      83.259259            119     120.962963            27
3      P-004  2025-12-01   12      83.739130            120     116.391304            23
4      P-005  2025-12-01   12      77.705882            114     119.294118            17
-------------------------------------



Unnamed: 0,event_id,timestamp,patient_id,heart_rate,systolic_bp,sensor_type,error_reason,ingested_at
0,P-001_1764590594.958653,2025-12-01T12:03:14.958653,P-001,69,,WIFI_V2,Validation Failed (Null BP or Invalid HR),2025-12-01T13:02:35.986409
1,P-006_1764590863.958653,2025-12-01T12:07:43.958653,P-006,77,,WIFI_V2,Validation Failed (Null BP or Invalid HR),2025-12-01T13:02:35.986409
2,P-002_1764591146.958653,2025-12-01T12:12:26.958653,P-002,69,,WIFI_V2,Validation Failed (Null BP or Invalid HR),2025-12-01T13:02:35.986409
3,P-008_1764591396.958653,2025-12-01T12:16:36.958653,P-008,104,,WIFI_V2,Validation Failed (Null BP or Invalid HR),2025-12-01T13:02:35.986409
4,P-004_1764591421.958653,2025-12-01T12:17:01.958653,P-004,62,,WIFI_V2,Validation Failed (Null BP or Invalid HR),2025-12-01T13:02:35.986409
5,P-001_1764591455.958653,2025-12-01T12:17:35.958653,P-001,51,,WIFI_V2,Validation Failed (Null BP or Invalid HR),2025-12-01T13:02:35.986409
6,P-008_1764591647.958653,2025-12-01T12:20:47.958653,P-008,-999,126.0,WIFI_V2,Validation Failed (Null BP or Invalid HR),2025-12-01T13:02:35.986409
7,P-005_1764591729.958653,2025-12-01T12:22:09.958653,P-005,-999,143.0,WIFI_V2,Validation Failed (Null BP or Invalid HR),2025-12-01T13:02:35.986409
8,P-001_1764591754.958653,2025-12-01T12:22:34.958653,P-001,-999,124.0,WIFI_V2,Validation Failed (Null BP or Invalid HR),2025-12-01T13:02:35.986409
9,P-007_1764592028.958653,2025-12-01T12:27:08.958653,P-007,-999,111.0,WIFI_V2,Validation Failed (Null BP or Invalid HR),2025-12-01T13:02:35.986409
