In [None]:
import warnings
warnings.filterwarnings("ignore")

import os, re, json, hashlib, time
from pathlib import Path
from typing import Dict, Any, List, Optional, Callable

import numpy as np
import pandas as pd
from dateutil.relativedelta import relativedelta

# ------------------------------------------------------------
# 0. CONFIG (match what you showed in the screenshot)
# ------------------------------------------------------------
issuer            = "Manhattan Life"
paycode           = "Default"
trandate          = "2025-11-12"
load_task_id      = "11497"
company_issuer_id = "3205"

csv_path     = r".\inbound\raw - 9836c995-9e25-45ae-af39-3b4cc8ac1bbd.csv"
template_dir = r".\carrier_prompts"

server_name   = "QWSD8SQLB401.nguottit.com"   # or "NGCS" etc.
database_name = "NGCS"

# ------------------------------------------------------------
# 1. CORE CONSTANTS (only change: PlanCode added to FINAL_COLUMNS)
# ------------------------------------------------------------

CARRIERS = {
    "Molina": {"loader": "csv"},
    "Ameritas": {"loader": "csv"},
    "Manhattan Life": {"loader": "two_header"},
}

FINAL_COLUMNS = [
    "PolicyNO","PHFirst","PHLast","Status","Issuer","State",
    "ProductType","PlanName","PlanCode",
    "SubmittedDate","EffectiveDate","TermDate","Paysched",
    "PayCode","WritingAgentID","Premium","CommPrem",
    "TranDate","CommReceived","PTD","NoPayMon","Membercount"
]

ALLOWED_OPS = [
    "copy","const","date_mmddyyyy","date_plus_1m_mmddyyyy",
    "name_first_from_full","name_last_from_full",
    "money","membercount_from_commission","blank"
]

# ------------------------------------------------------------
# 2. SMALL UTILS (same as processor.py)
# ------------------------------------------------------------
def _norm_key(s: str) -> str:
    return re.sub(r"[^a-z0-9]", "", str(s).lower())

def _sig_from_cols(cols: List[str]) -> str:
    joined = "||".join(map(str, cols))
    return hashlib.sha1(joined.encode("utf-8")).hexdigest()[:12]

def _build_header_index(cols: List[str]) -> Dict[str, str]:
    return {_norm_key(h): h for h in cols}

# ------------------------------------------------------------
# 3. HEADER PROBE + CSV READER (csv vs two_header)
# ------------------------------------------------------------
def _fast_read_header(path: str, loader: str) -> List[str]:
    if loader == "csv":
        dfo = pd.read_csv(path, nrows=0, dtype=str)
        return list(dfo.columns)

    # two_header: first 2 rows
    probe = pd.read_csv(path, header=None, nrows=2, dtype=str).fillna("")
    top, bottom = probe.iloc[0].tolist(), probe.iloc[1].tolist()

    ff, last = [], ""
    for x in top:
        x = str(x).strip()
        if x:
            last = x
        ff.append(last)

    cols: List[str] = []
    for a, b in zip(tuple(ff), bottom):
        a, b = str(a).strip(), str(b).strip()
        if not a and not b:
            name = "unnamed"
        elif not a:
            name = b
        elif not b:
            name = a
        else:
            name = f"{a} {b}"
        name = re.sub(r"[\s+]", "_", name).replace("/", "_").replace(".", "_").strip()
        name = re.sub(r"[^a-zA-Z0-9_]", "", name)
        cols.append(name)

    return cols

def _read_csv_usecols(path: str,
                      usecols: Optional[List[str]],
                      loader: str) -> pd.DataFrame:
    if loader == "csv":
        return pd.read_csv(path, dtype=str, usecols=usecols if usecols else None).fillna("")

    # two_header
    tmp = pd.read_csv(path, header=None, dtype=str).fillna("")
    top, bottom = tmp.iloc[0].tolist(), tmp.iloc[1].tolist()

    ff, last = [], ""
    for x in top:
        x = str(x).strip()
        if x:
            last = x
        ff.append(last)

    cols: List[str] = []
    for a, b in zip(tuple(ff), bottom):
        a, b = str(a).strip(), str(b).strip()
        if not a and not b:
            name = "unnamed"
        elif not a:
            name = b
        elif not b:
            name = a
        else:
            name = f"{a} {b}"
        name = re.sub(r"[\s+]", "_", name).replace("/", "_").replace(".", "_").strip()
        name = re.sub(r"[^a-zA-Z0-9_]", "", name)
        cols.append(name)

    df = tmp.iloc[2:].reset_index(drop=True)
    df.columns = cols
    if usecols:
        df = df[[c for c in usecols if c in df.columns]]
    return df.fillna("")

# ------------------------------------------------------------
# 4. RULE SPEC NORMALIZATION + BINDING  (fixes your list .get error)
# ------------------------------------------------------------
_CANON = {_norm_key(k): k for k in FINAL_COLUMNS + ["PID"]}

def canonicalize_spec(spec_in: Dict[str, Any]) -> Dict[str, Any]:
    fixed: Dict[str, Any] = {}
    for k, v in spec_in.items():
        nk = _norm_key(k)
        fixed[_CANON.get(nk, k)] = v
    for req in FINAL_COLUMNS:
        if req not in fixed and (req != "PTD" or "PID" not in fixed):
            fixed[req] = {"op": "blank"}
    return fixed

def normalize_rule_spec(spec_in: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
    out: Dict[str, Dict[str, Any]] = {}
    for k, v in spec_in.items():
        if isinstance(v, dict):
            out[k] = v
        elif isinstance(v, str):
            sv = v.strip()
            out[k] = {"op": "blank"} if sv.lower() in ("blank", "tbd") else {
                "op": "const",
                "value": sv,
            }
        else:
            out[k] = {"op": "blank"}
    return out

def needs_source(op: str) -> bool:
    return op in {
        "copy","date_mmddyyyy","date_plus_1m_mmddyyyy",
        "name_first_from_full","name_last_from_full",
        "money","membercount_from_commission",
    }

def bind_sources_to_headers(headers: List[str],
                            rule_spec_in: Dict[str, Any]) -> Dict[str, Any]:
    norm_map = _build_header_index(headers)
    fixed: Dict[str, Any] = {}
    spec = normalize_rule_spec(rule_spec_in)   # <-- IMPORTANT: avoids list.get error

    for tgt, s in spec.items():
        op = str(s.get("op", "")).strip()
        if op not in ALLOWED_OPS:
            fixed[tgt] = {"op": "blank"}
            continue
        if not needs_source(op):
            fixed[tgt] = s
            continue

        src = str(s.get("source", "")).strip()
        if not src:
            fixed[tgt] = s
            continue

        if src in headers:
            s["source"] = src
        else:
            ci = next((h for h in headers if h.lower() == src.lower()), None)
            if ci:
                s["source"] = ci
            else:
                nk = _norm_key(src)
                if nk in norm_map:
                    s["source"] = norm_map[nk]

        fixed[tgt] = s

    return fixed

def promote_pid_to_ptd(spec: Dict[str, Any]) -> Dict[str, Any]:
    if "PID" in spec and ("PTD" not in spec or
                          str(spec["PTD"].get("op", "")).lower() in ("", "blank")):
        spec["PTD"] = spec["PID"]
    return spec

def collect_usecols(bound_spec: Dict[str, Any]) -> List[str]:
    cols: set[str] = set()
    for _, spec in bound_spec.items():
        if isinstance(spec, dict) and needs_source(str(spec.get("op", "")).strip()):
            src = spec.get("source")
            if src:
                cols.add(str(src))
    return sorted(cols)

# ------------------------------------------------------------
# 5. TRANSFORM FUNCTIONS + apply_rules / Ray
# ------------------------------------------------------------
def _to_mmddyyyy(s: pd.Series) -> pd.Series:
    dt = pd.to_datetime(s, errors="coerce")
    return dt.dt.strftime("%m/%d/%Y").fillna("").astype("string")

def _add_one_month_mmddyyyy(s: pd.Series) -> pd.Series:
    dt = pd.to_datetime(s, errors="coerce")
    dtp = dt.apply(lambda x: x + relativedelta(months=1) if pd.notnull(x) else pd.NaT)
    return pd.Series(dtp).dt.strftime("%m/%d/%Y").fillna("").astype("string")

def _parse_case_name_first_last(series: pd.Series):
    s = series.fillna("").astype(str).str.strip()
    first = s.str.split().str[0].fillna("")
    last  = s.str.split().str[-1].fillna("")
    return first.str.title().astype("string"), last.str.title().astype("string")

def apply_rules(df: pd.DataFrame, bound_spec: Dict[str, Any]) -> pd.DataFrame:
    out: Dict[str, pd.Series] = {}

    def empty() -> pd.Series:
        return pd.Series([""] * len(df), index=df.index, dtype="string")

    for tgt in FINAL_COLUMNS:
        spec = bound_spec.get(tgt, {"op": "blank"})
        if not isinstance(spec, dict):
            out[tgt] = empty()
            continue
        op = str(spec.get("op", "")).strip()
        if op not in ALLOWED_OPS:
            out[tgt] = empty()
            continue

        if op == "copy":
            s = spec.get("source")
            out[tgt] = df.get(s, empty()).astype(str)
        elif op == "const":
            out[tgt] = pd.Series(
                [str(spec.get("value", ""))] * len(df),
                index=df.index,
                dtype="string",
            )
        elif op == "date_mmddyyyy":
            s = spec.get("source")
            out[tgt] = _to_mmddyyyy(df.get(s, empty()))
        elif op == "date_plus_1m_mmddyyyy":
            s = spec.get("source")
            out[tgt] = _add_one_month_mmddyyyy(df.get(s, empty()))
        elif op == "name_first_from_full":
            s = spec.get("source")
            out[tgt] = _parse_case_name_first_last(df.get(s, empty()))[0]
        elif op == "name_last_from_full":
            s = spec.get("source")
            out[tgt] = _parse_case_name_first_last(df.get(s, empty()))[1]
        elif op == "money":
            s = spec.get("source")
            out[tgt] = df.get(s, empty()).astype(str)
        elif op == "membercount_from_commission":
            s = spec.get("source")
            vals = df.get(s, empty()).astype(str)
            out[tgt] = pd.Series(
                np.where(vals.str.contains("-"), "-1", "1"),
                index=df.index,
                dtype="string",
            )
        else:
            out[tgt] = empty()

    return pd.DataFrame(out, columns=FINAL_COLUMNS).fillna("").astype("string")

ENABLE_RAY          = os.getenv("ENABLE_RAY", "auto")
RAY_PARTITIONS      = int(os.getenv("RAY_PARTITIONS", "8"))
RAY_MIN_ROWS_TO_USE = int(os.getenv("RAY_MIN_ROWS_TO_USE", "30000"))

def should_use_ray(n_rows: int) -> bool:
    if ENABLE_RAY == "on":
        return True
    if ENABLE_RAY == "off":
        return False
    return n_rows >= RAY_MIN_ROWS_TO_USE

def apply_rules_parallel(df: pd.DataFrame,
                         bound_spec: Dict[str, Any]) -> pd.DataFrame:
    import ray
    if not ray.is_initialized():
        ray.init(ignore_reinit_error=True, include_dashboard=False, log_to_driver=False)
    spec_ref = ray.put(bound_spec)

    @ray.remote
    def _worker(chunk: pd.DataFrame, spec_ref):
        return apply_rules(chunk, ray.get(spec_ref))

    parts = np.array_split(df, max(1, RAY_PARTITIONS))
    futures = [_worker.remote(part, spec_ref) for part in parts]
    outs = ray.get(futures)
    return pd.concat(outs, ignore_index=True)

# ------------------------------------------------------------
# 6. MINI-PIPELINE (everything up to BEFORE Manhattan enrichment)
# ------------------------------------------------------------
loader      = CARRIERS.get(issuer, {}).get("loader", "csv")
headers     = _fast_read_header(csv_path, loader)
sig         = _sig_from_cols(headers)

prompt_path   = Path(template_dir) / f"{issuer}_prompt.txt"
rules_path    = Path(template_dir) / f"{issuer}_rules.json"
compiled_path = Path(template_dir) / f"{issuer}_compiled_rules_{sig}.json"

# --- load or build rules ---
if compiled_path.exists():
    bound_spec = json.loads(compiled_path.read_text(encoding="utf-8"))
else:
    # === SAME behavior as llm_processor_dbconn.py ===
    raw_spec = llm_generate_rule_spec(headers, prompt_path, rules_path)
    raw_spec = canonicalize_spec(raw_spec)
    raw_spec = bind_sources_to_headers(headers, raw_spec)
    raw_spec = promote_pid_to_ptd(raw_spec)

    compiled_path.write_text(
        json.dumps(raw_spec, indent=2),
        encoding="utf-8"
    )
    bound_spec = raw_spec

# Make sure spec structure is normalized like in processor.py
bound_spec = canonicalize_spec(bound_spec)
bound_spec = bind_sources_to_headers(headers, bound_spec)
bound_spec = promote_pid_to_ptd(bound_spec)

usecols = collect_usecols(bound_spec)
df = _read_csv_usecols(csv_path, usecols if usecols else None, loader)
print(f"Rows loaded: {len(df)}, usecols: {usecols}")

if should_use_ray(len(df)):
    out_df = apply_rules_parallel(df, bound_spec)
else:
    out_df = apply_rules(df, bound_spec)

# ---- ADD CONSTANTS + PLANCODE BEFORE MANHATTAN ENRICHMENT ----
out_df["TranDate"] = trandate
out_df["PayCode"]  = paycode
out_df["Issuer"]   = issuer
out_df["ProductType"] = ""
out_df["PlanName"]    = ""
if "PlanCode" not in out_df.columns:
    out_df["PlanCode"] = ""      # <-- PlanCode guaranteed BEFORE #6
out_df["Note"] = ""

display(out_df.head())
display(out_df[["PolicyNO","PlanCode"]].head(20))
