In [3]:
import sys
from pathlib import Path
import pandas as pd
import numpy as np
import logging
import re
from datetime import datetime
from collections import defaultdict
import argparse
from dataclasses import dataclass, field
from typing import Dict, List, Tuple

# Jupyter protection
if len(sys.argv) > 1 and sys.argv[1] == "-f":
    sys.argv = [sys.argv[0]]
    
@dataclass
class Thresholds:
    load_warning: float = 1.2
    load_high: float = 1.8
    shortfall_severe: float = 0.60

@dataclass
class Config:
    data_dir: Path = Path("../data")
    output_dir: Path = Path("../outputs")
    log_dir: Path = Path("../logs")
    thresholds: Thresholds = field(default_factory=Thresholds)

    state_map: Dict[str, str] = field(default_factory=lambda: {
        "TN": "Tamil Nadu",
        "AP": "Andhra Pradesh",
        "KA": "Karnataka",
        "MH": "Maharashtra",
        "DL": "Delhi",
        "UP": "Uttar Pradesh"
    })

    age_cols: List[str] = field(default_factory=lambda: [
        "age_0_5", "age_5_17", "age_18_greater"
    ])

    filename_regex: str = r"^([A-Z]{2}),\s*(.+?)(?:\s*aadhaar.*)?$"

    def __post_init__(self):
        for d in [self.data_dir, self.output_dir, self.log_dir]:
            d.mkdir(exist_ok=True, parents=True)

def setup_logging(config: Config) -> logging.Logger:
    log_file = config.log_dir / f"aadhaar_health_{datetime.now():%Y%m%d}.log"
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s | %(levelname)-8s | %(message)s",
        handlers=[
            logging.StreamHandler(sys.stdout),
            logging.FileHandler(log_file, encoding="utf-8")
        ]
    )
    return logging.getLogger("AadhaarHealth")

class AadhaarProcessor:
    def __init__(self, config: Config, logger: logging.Logger):
        self.config = config
        self.logger = logger

    def parse_filename(self, filename: str) -> Tuple[str, str]:
        stem = Path(filename).stem
        match = re.match(self.config.filename_regex, stem, re.IGNORECASE)
        if match:
            return match.group(1).upper(), match.group(2).title()
        return "UNK", "Unknown"

    def read_csv(self, path: Path) -> pd.DataFrame:
        try:
            return pd.read_csv(path, low_memory=False)
        except Exception as e:
            self.logger.error(f"Read failed: {path.name} : {e}")
            return pd.DataFrame()

    def process_enrolments(self) -> pd.DataFrame:
        files = list(self.config.data_dir.glob("*aadhaar_enrolments.csv"))
        rows = []

        for f in files:
            df = self.read_csv(f)
            if df.empty:
                continue

            state, district = self.parse_filename(f.name)
            if all(c in df.columns for c in self.config.age_cols):
                total = df[self.config.age_cols].sum(axis=1)
            else:
                total = df.select_dtypes(include=np.number).sum(axis=1)

            rows.append({
                "state": state,
                "district": district,
                "total_enrolments": total.sum()
            })

        return pd.DataFrame(rows)

    def process_updates(self, pattern: str, col_name: str) -> pd.DataFrame:
        files = list(self.config.data_dir.glob(pattern))
        totals = defaultdict(float)

        for f in files:
            df = self.read_csv(f)
            if df.empty:
                continue

            state, district = self.parse_filename(f.name)
            numeric = df.select_dtypes(include=np.number)
            totals[(state, district)] += numeric.sum().sum()

        return pd.DataFrame([
            {"state": s, "district": d, col_name: v}
            for (s, d), v in totals.items()
        ])

    def calculate_metrics(self, df: pd.DataFrame) -> pd.DataFrame:
        avg_updates = df["total_updates"].mean()
        avg_enrol = df["total_enrolments"].mean()

        df["updates_norm"] = df["total_updates"] / avg_updates
        df["enrol_norm"] = df["total_enrolments"] / avg_enrol

        df["load_score"] = (
            df["updates_norm"] / df["enrol_norm"]
        ).replace([np.inf, -np.inf], 0).round(3)

        df["shortfall_pct"] = (
            (avg_enrol - df["total_enrolments"]) / avg_enrol
        ).clip(lower=0).round(3)

        df["shortfall_pctile"] = (df["shortfall_pct"].rank(pct=True) * 100).round(1)

        return df

    def classify(self, row):
        t = self.config.thresholds
        if row["load_score"] > t.load_high and row["shortfall_pctile"] / 100 > t.shortfall_severe:
            return 5, "Critical", "Emergency response"
        if row["load_score"] > t.load_high:
            return 4, "Overloaded", "Increase capacity"
        if row["shortfall_pctile"] / 100 > t.shortfall_severe:
            return 3, "Under-covered", "Outreach required"
        if row["load_score"] > t.load_warning:
            return 2, "Moderate", "Monitor"
        return 1, "Healthy", "Maintain"

    def build_master(self) -> pd.DataFrame:
        enrol = self.process_enrolments()
        demo = self.process_updates("*aadhaar_demographic_updates.csv", "demo_updates")
        bio = self.process_updates("*aadhaar_biometric_updates.csv", "bio_updates")

        updates = demo.merge(bio, on=["state", "district"], how="outer").fillna(0)
        updates["total_updates"] = updates["demo_updates"] + updates["bio_updates"]

        master = enrol.merge(updates, on=["state", "district"], how="left").fillna(0)
        master = self.calculate_metrics(master)

        master[["priority", "status", "action"]] = pd.DataFrame(
            master.apply(self.classify, axis=1).tolist(),
            index=master.index
        )

        master["state_name"] = master["state"].map(self.config.state_map).fillna(master["state"])
        return master.sort_values("priority", ascending=False).reset_index(drop=True)

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--format", default="csv")
    args = parser.parse_args()

    config = Config()
    logger = setup_logging(config)

    logger.info("Starting Aadhaar Health Analysis")

    processor = AadhaarProcessor(config, logger)
    master = processor.build_master()

    print(master["status"].value_counts().to_string())

    out = config.output_dir / f"aadhaar_health_{datetime.now():%Y%m%d_%H%M}.csv"
    master.to_csv(out, index=False)
    logger.info(f"CSV saved to {out}")

    print(master.head(10)[
        ["state_name", "district", "status", "priority", "load_score", "shortfall_pct"]
    ].to_string(index=False))

if __name__ == "__main__":
    main()


2026-01-17 11:37:03,153 | INFO     | Starting Aadhaar Health Analysis
status
Under-covered    3
Healthy          3
2026-01-17 11:37:03,274 | INFO     | CSV saved to ..\outputs\aadhaar_health_20260117_1137.csv
    state_name      district        status  priority  load_score  shortfall_pct
Andhra Pradesh       Krishna Under-covered         3       1.611          0.050
Andhra Pradesh    Srikakulam Under-covered         3       1.618          0.398
    Tamil Nadu    Dharmapuri Under-covered         3       0.228          0.234
Andhra Pradesh Visakhapatnam       Healthy         1       1.149          0.000
    Tamil Nadu       Chennai       Healthy         1       1.068          0.000
    Tamil Nadu    Coimbatore       Healthy         1       0.478          0.000
