In [None]:

# If running on Colab, uncomment the next line.
%pip -q install spacy rapidfuzz sentence-transformers transformers accelerate faiss-cpu PyMuPDF pandas openpyxl tqdm word2number

import sys, os, re, json, math, string, itertools, pathlib, textwrap, typing
from typing import List, Dict, Tuple, Optional


[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m21.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m31.4/31.4 MB[0m [31m22.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.1/24.1 MB[0m [31m36.7 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:

import re
import json
import math
import pandas as pd
from pathlib import Path
from dataclasses import dataclass, asdict
from tqdm import tqdm

import spacy
from rapidfuzz import fuzz
import numpy as np

import fitz  

from sentence_transformers import SentenceTransformer, util as sbert_util
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
try:
    from google.colab import files
    ON_COLAB = True
except Exception:
    ON_COLAB = False

print("ON_COLAB:", ON_COLAB)


ON_COLAB: True


In [None]:

ATTRIBUTE_EXCEL_PATH = 'data/Attribute Dictionary.xlsx'   
TEMPLATE_FILES = ['data/TN_Standard_Template_Redacted_extracted_text.txt','data/WA_Standard_Template_Redacted_extracted_text.txt']         
CONTRACT_FILES = ['data/TN_Contract1_Redacted.pdf','data/WA_2_Redacted.pdf']         

ATTR_COL_CANDIDATES = ['Attribute']

KEYWORDS_COL_CANDIDATES = ['Keywords']

REGEX_COL_CANDIDATES = ['Regex']

MAX_TARGET_ATTRIBUTES = 5
TARGET_ATTRIBUTES = [
    "Claims Submission & Adjudication",
    "Compensation / Fee Schedule",
    "Termination",
    "Use of Symbols and Marks",
    "Confidentiality / Provider Information"
]

EXCEPTION_TOKENS = [
    'except', 'unless', 'provided that',
    'subject to', 'however,', 'save that',
    'notwithstanding', 'only if'
]

PLACEHOLDER_MAP = {
    # Percentages (XX%, 100%, 95% etc.)
    r"\[\(?\s*XX\s*%\s*\)?\]": "<PCT>",                # generic percentage placeholder
    r"\b\d{1,3}\s*%\b": "<PCT>",                       # numeric percentages like 100%, 95%
    r"\b(one\s*hundred|ninety[-\s]*five|fifty)\s*percent\b": "<PCT>",

    # Compensation / Fee references
    r"\b(Fee\s+Schedule|Compensation\s+Schedule|Plan\s+Compensation\s+Schedule|WCS|PCS)\b": "<FEE_SCHEDULE>",
    r"\b(Rate|Eligible\s+Charge[s]?)\b": "<RATE>",

    # Parties / Organization
    r"\b(Plan|Company|Network|Agency|Affiliate|Other\s+Payors?)\b": "<ORG>",
    r"\b(Provider|Participating\s+Provider)\b": "<PROVIDER>",

    # Members
    r"\b(Member|Enrollee|Subscriber|Insured|Beneficiary|Covered\s+(Person|Individual)|Dependent)\b": "<MEMBER>",

    # Programs
    r"\b(Government\s+Program|Medicare|Medicaid|CMS|HCA)\b": "<GOV_PROGRAM>",

    # Documents
    r"\b(Participation\s+Attachment[s]?)\b": "<ATTACHMENT>",
    r"\b(provider\s+manual\(s\))\b": "<PROVIDER_MANUAL>",
    r"\b(Health\s+Benefit\s+Plan)\b": "<PLAN_DOC>",

    # Payments
    r"\b(Cost\s*Share[s]?|copayment[s]?|coinsurance|deductible[s]?)\b": "<COST_SHARE>",
    r"\b(Claim[s]?)\b": "<CLAIM>",

    # Legal placeholders
    r"\b(Regulatory\s+Requirements?)\b": "<REG_REQ>",
    r"\b(Effective\s+Date|MM/DD/YYYY)\b": "<DATE>",
    r"\[\s*_{2,}\s*\]": "<BLANK>",   # underscores for blanks like [_________]

    # Misc
    r"\b(Health\s+Services?|Covered\s+Services?)\b": "<SERVICE>",
    r"\b(Medically\s+Necessary|Medical\s+Necessity)\b": "<MEDICAL_NECESSITY>",
}


FUZZY_THRESHOLD = 92             
SBERT_THRESHOLD = 0.86            
SBERT_AMBIG_LOW, SBERT_AMBIG_HIGH = 0.75, 0.86

# Model toggles
USE_SPACY_MODEL = "en_core_web_sm"     # or "en_core_web_trf" for larger (slower) transformer pipeline
USE_SBERT_MODEL = "sentence-transformers/all-MiniLM-L6-v2"   
USE_DEBERTA_CROSS_ENCODER = False
DEBERTA_CE_MODEL = "cross-encoder/nli-deberta-v3-large"     # optional heavy model for pair scoring
 
OUT_DIR = Path("/notebooks/outputs")
OUT_DIR.mkdir(parents=True, exist_ok=True)


In [None]:

import re
from word2number import w2n 

def normalize_whitespace(s: str) -> str:
    s = re.sub(r'\s+', ' ', s or '').strip()
    return s

def to_ascii_lower(s: str) -> str:
    return normalize_whitespace(s).lower()

def apply_placeholders(s: str) -> str:
    """Replace known placeholders to canonical tokens for fair comparison."""
    out = s

    # 1. Replace known placeholders from config
    for pat, repl in PLACEHOLDER_MAP.items():
        out = re.sub(pat, repl, out, flags=re.IGNORECASE)

    # 2. Normalize percentages written as digits (e.g., "95 percent" → "95%")
    out = re.sub(r'(\d+)\s*percent', r'\1%', out, flags=re.IGNORECASE)

    # 3. Normalize percentages written in words (e.g., "ninety five percent" → "95%")
    def word_percent_to_num(match):
        words = match.group(1).lower()
        try:
            num = w2n.word_to_num(words)   
            return f"{num}%"
        except ValueError:
            return match.group(0)  

    out = re.sub(r'\b([a-z\s-]+)\s+percent\b', word_percent_to_num, out, flags=re.IGNORECASE)

    return out

def normalize_for_compare(s: str) -> str:
    s = apply_placeholders(s)
    # drop punctuation but keep %
    keep_percent = '%'
    punct = ''.join(ch for ch in r"""!"#$&'()*+,-./:;<=>?@[\\]^_`{|}~""" if ch != '%')
    s = s.translate(str.maketrans('', '', punct))
    s = to_ascii_lower(s)
    return s

def contains_exception_tokens(text: str, template_has_exception: bool = False) -> bool:
    text_l = to_ascii_lower(text)
    if template_has_exception:
        return False
    return any(tok in text_l for tok in EXCEPTION_TOKENS)


In [None]:

def autodetect_column(df: pd.DataFrame, candidates: list) -> Optional[str]:
    cols = {c.lower(): c for c in df.columns}
    for cand in candidates:
        if cand.lower() in cols:
            return cols[cand.lower()]
    # fallback: fuzzy find by substring
    for c in df.columns:
        cl = c.lower()
        if any(k.lower() in cl for k in candidates):
            return c
    return None

from dataclasses import dataclass

@dataclass
class AttributeSpec:
    name: str
    keywords: List[str]
    regexes: List[str]

def load_attribute_dictionary(xlsx_path: str) -> List[AttributeSpec]:
    df = pd.read_excel(xlsx_path, sheet_name=0)
    attr_col = autodetect_column(df, ATTR_COL_CANDIDATES)
    if not attr_col:
        raise ValueError(f"Could not find attribute column. Checked: {ATTR_COL_CANDIDATES}. Found columns: {df.columns.tolist()}")

    kw_col = autodetect_column(df, KEYWORDS_COL_CANDIDATES)
    rx_col = autodetect_column(df, REGEX_COL_CANDIDATES)

    specs = []
    for _, row in df.iterrows():
        name = str(row[attr_col]).strip()
        if not name or name.lower() in ['nan', 'none']:
            continue
        keywords = []
        regexes = []
        if kw_col and not pd.isna(row.get(kw_col, None)):
            keywords = [normalize_whitespace(x) for x in re.split(r'[;,]', str(row[kw_col])) if str(x).strip()]
        if rx_col and not pd.isna(row.get(rx_col, None)):
            regexes = [normalize_whitespace(x) for x in re.split(r'[;,]', str(row[rx_col])) if str(x).strip()]

        specs.append(AttributeSpec(name=name, keywords=keywords, regexes=regexes))

    seen = set()
    uniq_specs = []
    for spec in specs:
        if spec.name not in seen:
            uniq_specs.append(spec)
            seen.add(spec.name)
        if len(uniq_specs) >= MAX_TARGET_ATTRIBUTES:
            break

    print("Loaded attributes:", [s.name for s in uniq_specs])
    return uniq_specs


In [None]:

from dataclasses import dataclass

@dataclass
class TemplateClause:
    name: str                 # e.g., 'TN' or 'WA'
    raw_text: str
    norm_text: str
    has_exception_tokens: bool

def read_text_file(p: Path) -> str:
    return p.read_text(encoding='utf-8', errors='ignore')

def load_templates(paths: List[str]) -> List[TemplateClause]:
    tpls = []
    for path in paths:
        p = Path(path)
        if not p.exists():
            print(f"[WARN] Template not found: {p}")
            continue
        raw = read_text_file(p)
        has_exc = contains_exception_tokens(raw, template_has_exception=False)
        tpls.append(TemplateClause(
            name=p.stem,
            raw_text=raw,
            norm_text=normalize_for_compare(raw),
            has_exception_tokens=has_exc
        ))
    if not tpls:
        raise ValueError("No templates loaded. Please upload 1–2 template .txt files.")
    print("Templates:", [t.name for t in tpls])
    return tpls


In [None]:

def pdf_to_text(path: str) -> str:
    doc = fitz.open(path)
    texts = []
    for page in doc:
        texts.append(page.get_text("text"))
    return "\n".join(texts)

def is_pdf(path: str) -> bool:
    return Path(path).suffix.lower() == '.pdf'

def split_into_clauses(text: str) -> List[Dict]:
    paras = [normalize_whitespace(p) for p in re.split(r'\n\s*\n+', text) if normalize_whitespace(p)]
    clauses = []
    clause_id = 1
    for para in paras:
        splits = re.split(r'(?<=[.;])\s+(?=[A-Z(\d])|(?<=: )\s+', para)
        for sp in splits:
            s = normalize_whitespace(sp)
            if len(s) < 5:
                continue
            clauses.append({
                "clause_id": clause_id,
                "text": s,
                "norm": normalize_whitespace(s.lower())     
            })
            clause_id += 1
    return clauses

path = 'data/TN_Contract1_Redacted.pdf'
text = pdf_to_text(path)
clauses = split_into_clauses(text)
print(len(clauses))
for c in clauses[:10]:
    print(c)
    print()


In [None]:

class AttributeSpec:
    def __init__(self, name: str, keywords: List[str], regexes: List[str]):
        self.name = name
        self.keywords = [k.lower().strip() for k in keywords if k.strip()]
        self.regexes = [r.strip() for r in regexes if r.strip()]

    def __repr__(self):
        return f"AttributeSpec(name={self.name})"

ATTRIBUTE_SEEDS = {
    "Medicaid Timely Filing": {
        "keywords": ["medicaid", "timely filing", "claim submission", "days"],
        "regexes":  [r"\bmedicaid\b.*\b\d{1,3}\s*days?\b"]
    },
    "Medicare Timely Filing": {
        "keywords": ["medicare", "timely filing", "claim submission", "days"],
        "regexes":  [r"\bmedicare\b.*\b\d{1,3}\s*days?\b"]
    },
    "No Steerage/SOC": {
        "keywords": ["steerage", "soc", "freedom of choice"],
        "regexes":  [r"\bno\s+steerage\b", r"\bSOC\b"]
    },
    "Medicaid Fee Schedule": {
        "keywords": ["medicaid fee schedule"],
        "regexes":  [r"\bmedicaid\b.*\bfee schedule\b"]
    },
    "Medicare Fee Schedule": {
        "keywords": ["medicare fee schedule"],
        "regexes":  [r"\bmedicare\b.*\bfee schedule\b"]
    },
}

def load_specs_from_excel(path: str) -> List[AttributeSpec]:
    df = pd.read_excel(path)
    specs = []
    for _, row in df.iterrows():
        name = str(row["Attribute"]).strip()
        if name in ["nan", "", "None"]:
            continue
        if name not in ATTRIBUTE_SEEDS:   
            continue
        kws = ATTRIBUTE_SEEDS[name]["keywords"]
        rxs = ATTRIBUTE_SEEDS[name]["regexes"]
        specs.append(AttributeSpec(name, kws, rxs))
    return specs

def split_into_clauses(text: str) -> List[Dict]:
    paras = [normalize_whitespace(p) for p in re.split(r'\n\s*\n+', text) if normalize_whitespace(p)]
    clauses = []
    clause_id = 1
    for para in paras:
        splits = re.split(r'(?<=[.;])\s+(?=[A-Z(\d])|(?<=: )\s+', para)
        for sp in splits:
            s = normalize_whitespace(sp)
            if len(s) < 5:
                continue
            clauses.append({"clause_id": clause_id, "text": s})
            clause_id += 1
    return clauses

In [None]:
# ------------------ Only regex ------------------
import re
import pandas as pd
from typing import List, Dict, Optional
import fitz 


def detect_attribute_for_clause_using_regex(clause_text: str, specs: List[AttributeSpec]) -> Optional[str]:
    t = clause_text.lower()
    for spec in specs:
        if spec.name in ["Medicaid Timely Filing", "Medicare Timely Filing"]:
            payer = spec.name.split()[0].lower()
            if re.search(rf"\b{payer}\b.*\b\d{{1,3}}\s*days?\b", t) or \
               ("timely filing" in t and payer in t):
                return spec.name
            continue

        elif spec.name in ["Medicaid Fee Schedule", "Medicare Fee Schedule"]:
            payer = spec.name.split()[0].lower()
            if payer in t and re.search(r"\bfee schedule\b", t):
                return spec.name

        elif spec.name == "No Steerage/SOC":
            if re.search(r"\bsteerage\b", t) or re.search(r"\bsoc\b", t) or "freedom of choice" in t:
                return spec.name

        for rx in spec.regexes:
            if re.search(rx, clause_text, flags=re.IGNORECASE):
                return spec.name

        if any(kw in t for kw in spec.keywords):
            return spec.name

    return None

excel_path = "data/Attribute Dictionary.xlsx"
pdf_path = "data/TN_Contract1_Redacted.pdf"

specs = load_specs_from_excel(excel_path)
text = pdf_to_text(pdf_path) 
clauses = split_into_clauses(text)

for c in clauses:
    attr = detect_attribute_for_clause_using_regex(c["text"], specs)
    if attr:
        print(f"ID {c['clause_id']} → Attribute: {attr}\nText: {c['text']}\n")


In [None]:
# ------------------ Using regix and spacy ------------------
import re
import pandas as pd
from typing import List, Dict, Optional
import fitz  
import spacy


def init_spacy(model: str = "en_core_web_sm"):
    try:
        nlp = spacy.load(model)
    except OSError:
        raise OSError(f"spaCy model '{model}' not found. Install via: python -m spacy download {model}")
    return nlp

def detect_attribute_for_clause_spacy_regex(
    clause_text: str, specs: List[AttributeSpec], nlp=None
) -> Optional[str]:
    """Detect attributes using regex first, then spaCy lemma-based keywords with strict payer context."""
    if not clause_text or not clause_text.strip():
        return None

    text_lower = clause_text.lower()
    doc = nlp(clause_text) if nlp else None

    for spec in specs:
        payer = spec.name.split()[0].lower() if " " in spec.name else ""

        for rx in spec.regexes:
            try:
                if re.search(rx, clause_text, flags=re.IGNORECASE):
                    return f"{spec.name} (regex)"
            except re.error:
                pass

        if doc:
            lemmas = {token.lemma_.lower() for token in doc if not token.is_stop and not token.is_punct}

            if spec.name in ["Medicaid Timely Filing", "Medicare Timely Filing"]:
                if "timely" in lemmas and "filing" in lemmas and payer in text_lower:
                    return f"{spec.name} (spacy)"
                if re.search(r"\b\d{1,3}\s*days?\b", text_lower) and payer in text_lower:
                    return f"{spec.name} (spacy)"

            elif spec.name in ["Medicaid Fee Schedule", "Medicare Fee Schedule"]:
                if "fee" in lemmas and "schedule" in lemmas and payer in text_lower:
                    return f"{spec.name} (spacy)"

            elif spec.name == "No Steerage/SOC":
                if "steerage" in lemmas or "soc" in lemmas or "freedom of choice" in text_lower:
                    return f"{spec.name} (spacy)"

    return None

nlp = init_spacy()

excel_path = "data/Attribute Dictionary.xlsx"
pdf_path = "data/TN_Contract1_Redacted.pdf"

specs = load_specs_from_excel(excel_path)
text = pdf_to_text(pdf_path)
clauses = split_into_clauses(text)

for c in clauses:
    attr = detect_attribute_for_clause_spacy_regex(c["text"], specs, nlp)
    if attr:
        print(f"ID {c['clause_id']} → Attribute: {attr}\nText: {c['text']}\n")


In [None]:

class SimilarityEngines:
    def __init__(self, sbert_model_name: str, use_cross_encoder: bool, cross_encoder_name: str):
        self.sbert = SentenceTransformer(sbert_model_name)
        self.use_cross_encoder = use_cross_encoder
        self.cross_encoder = None
        self.ce_tokenizer = None
        if use_cross_encoder:
            self.cross_encoder = AutoModelForSequenceClassification.from_pretrained(cross_encoder_name)
            self.ce_tokenizer = AutoTokenizer.from_pretrained(cross_encoder_name)
            self.cross_encoder.eval()

    @torch.no_grad()
    def sbert_score(self, a: str, b: str) -> float:
        embs = self.sbert.encode([a, b], convert_to_tensor=True, normalize_embeddings=True)
        sim = sbert_util.cos_sim(embs[0], embs[1]).item()
        return float(sim)

    @torch.no_grad()
    def cross_encoder_score(self, a: str, b: str) -> Optional[float]:
        if not self.use_cross_encoder or self.cross_encoder is None:
            return None
        inputs = self.ce_tokenizer(a, b, return_tensors="pt", truncation=True, padding=True, max_length=512)
        logits = self.cross_encoder(**inputs).logits
        if logits.shape[-1] == 3:
            probs = torch.softmax(logits, dim=-1).squeeze(0)
            entail = probs[-1].item()   
            return float(entail)
        return torch.sigmoid(logits).mean().item()


In [None]:

from dataclasses import dataclass

@dataclass
class StepResult:
    step: str
    satisfied: bool
    score: Optional[float]
    comment: str

@dataclass
class ClauseDecision:
    clause_id: int
    attribute: Optional[str]
    template_used: Optional[str]
    label: str                # "Standard" | "Non-Standard" | "Ambiguous" | "Skip"
    score: float
    rule: str
    steps: List[StepResult]
    text: str

def classify_against_template(clause, template: TemplateClause, engines: SimilarityEngines) -> Tuple[str, float, str, List[StepResult]]:
    steps = []
    c_raw, c_norm = clause["text"], clause["norm"]
    t_norm = template.norm_text

    # Step A: Exception/condition tokens (Non-Standard if present in clause, absent in template)
    has_exc = contains_exception_tokens(c_raw, template_has_exception=template.has_exception_tokens)
    steps.append(StepResult("exception_check", has_exc, None, "Detected conditional/exception tokens in clause; template lacks them."))
    if has_exc:
        return "Non-Standard", 0.90, "new_condition", steps

    # Step B: Exact normalized match
    exact = (c_norm == t_norm)
    steps.append(StepResult("exact_normalized_match", exact, None, "Clause equals template after normalization."))
    if exact:
        return "Standard", 0.99, "exact_norm", steps

    # Step C: Placeholder-aware equality (no-op if same as normalized equality)
    placeholder_like = (c_norm == t_norm)
    steps.append(StepResult("placeholder_substitution", placeholder_like, None, "Placeholders/value substitutions align (e.g., percent)."))
    if placeholder_like:
        return "Standard", 0.95, "placeholder_subst", steps

    # Step D: Fuzzy lexical similarity
    lex = fuzz.ratio(c_norm, t_norm)
    steps.append(StepResult("fuzzy_lexical", lex >= FUZZY_THRESHOLD, float(lex)/100.0, f"RapidFuzz ratio={lex}"))
    if lex >= FUZZY_THRESHOLD:
        return "Standard", 0.90, "lexical_high", steps

    # Step E: Semantic similarity (SBERT)
    sbert_sim = engines.sbert_score(c_raw, template.raw_text)
    steps.append(StepResult("semantic_sbert", sbert_sim >= SBERT_THRESHOLD, sbert_sim, f"SBERT cosine={sbert_sim:.3f}"))
    if sbert_sim >= SBERT_THRESHOLD:
        return "Standard", 0.85, "semantic_high", steps

    if SBERT_AMBIG_LOW <= sbert_sim < SBERT_AMBIG_HIGH:
        steps.append(StepResult("semantic_ambiguous_band", True, sbert_sim, "SBERT score in ambiguous range; needs review."))
        return "Ambiguous", sbert_sim, "semantic_ambiguous", steps

    # Step F: Optional DeBERTa v3 Large cross-encoder (if enabled)
    if engines.use_cross_encoder:
        ce = engines.cross_encoder_score(c_raw, template.raw_text)
        steps.append(StepResult("deberta_cross_encoder", ce is not None and ce >= 0.7, ce, "Cross-encoder entailment prob (>=0.7 → Standard)."))
        if ce is not None and ce >= 0.7:
            return "Standard", float(ce), "deberta_ce_high", steps

    # Step G: Default Non-Standard
    steps.append(StepResult("default_nonstandard", True, sbert_sim, "Low similarity and no earlier rule satisfied."))
    return "Non-Standard", float(sbert_sim), "low_similarity", steps

def choose_best_template(clause, templates: List[TemplateClause], engines: SimilarityEngines):
    ranked = []
    for tpl in templates:
        label, score, rule, steps = classify_against_template(clause, tpl, engines)
        ranked.append((tpl.name, label, score, rule, steps))

    for tpl_name, label, score, rule, steps in ranked:
        if label == "Non-Standard" and any(s.step == "exception_check" and s.satisfied for s in steps):
            return tpl_name, label, score, rule, steps

    def score_key(x):
        tpl_name, label, score, rule, steps = x
        priority = {"Standard": 3, "Ambiguous": 2, "Non-Standard": 1}.get(label, 0)
        return (priority, score)

    tpl_name, label, score, rule, steps = sorted(ranked, key=score_key, reverse=True)[0]
    return tpl_name, label, score, rule, steps

def classify_clauses(clauses: List[Dict], specs: List[AttributeSpec], templates: List[TemplateClause], engines: SimilarityEngines, nlp=None):
    decisions = []
    for cl in clauses:
        # FIX: call the correct detector
        attr = detect_attribute_for_clause_spacy_regex(cl["text"], specs, nlp)

        if not attr:
            decisions.append(ClauseDecision(
                clause_id=cl["clause_id"], attribute=None, template_used=None,
                label="Skip", score=0.0, rule="no_target_attribute", steps=[], text=cl["text"]
            ))
            continue

        tpl_name, label, score, rule, steps = choose_best_template(cl, templates, engines)
        decisions.append(ClauseDecision(
            clause_id=cl["clause_id"], attribute=attr, template_used=tpl_name,
            label=label, score=score, rule=rule, steps=steps, text=cl["text"]
        ))
    return decisions


In [None]:

if not ATTRIBUTE_EXCEL_PATH:
    if ON_COLAB:
        print("Upload the Attribute Dictionary Excel (e.g., 'Attribute Dictionary.xlsx')")
        up = files.upload()
        ATTRIBUTE_EXCEL_PATH = list(up.keys())[0]
    else:
        default = 'data/Attribute Dictionary.xlsx'
        if Path(default).exists():
            ATTRIBUTE_EXCEL_PATH = default
        else:
            raise FileNotFoundError("Please set ATTRIBUTE_EXCEL_PATH or upload the Excel.")

specs = load_attribute_dictionary(ATTRIBUTE_EXCEL_PATH)

if not TEMPLATE_FILES:
    if ON_COLAB:
        print("Upload 1–2 template .txt files (e.g., 'TN_template.txt', 'WA_template.txt')")
        up = files.upload()
        TEMPLATE_FILES = list(up.keys())
    else:
        TEMPLATE_FILES = [str(p) for p in Path('.').glob('*.txt')]
        if not TEMPLATE_FILES:
            raise FileNotFoundError("Please provide template .txt files.")

templates = load_templates(TEMPLATE_FILES)

try:
    nlp = init_spacy(USE_SPACY_MODEL)
except Exception as e:
    print("[WARN] spaCy init failed:", e)
    nlp = None

engines = SimilarityEngines(
    sbert_model_name=USE_SBERT_MODEL,
    use_cross_encoder=USE_DEBERTA_CROSS_ENCODER,
    cross_encoder_name=DEBERTA_CE_MODEL
)
print("SBERT model loaded:", USE_SBERT_MODEL)
print("DeBERTa CE enabled:", USE_DEBERTA_CROSS_ENCODER)

if not CONTRACT_FILES:
    if ON_COLAB:
        print("Upload a contract file (PDF preferred; .txt also works).")
        up = files.upload()
        CONTRACT_FILES = list(up.keys())
    else:
        candidates = [str(p) for p in Path('.').glob('*.pdf')] + [str(p) for p in Path('.').glob('*.txt')]
        if not candidates:
            default_pdf = 'data/Contracts AI Problem Statement - HiLabs Hackathon 2025_IITKGP HiLabs Watermark.pdf'
            if Path(default_pdf).exists():
                CONTRACT_FILES = [default_pdf]
            else:
                raise FileNotFoundError("Please upload a contract PDF or txt.")
        else:
            CONTRACT_FILES = candidates[:1]

print("Contract files:", CONTRACT_FILES)

all_decisions = []
for cpath in CONTRACT_FILES:
    if is_pdf(cpath):
        raw_text = pdf_to_text(cpath)
    else:
        raw_text = Path(cpath).read_text(encoding='utf-8', errors='ignore')

    clauses = split_into_clauses(raw_text)
    print(f"{Path(cpath).name}: Extracted {len(clauses)} clauses")

    decisions = classify_clauses(clauses, specs, templates, engines)
    all_decisions.extend([asdict(d) for d in decisions])

df = pd.DataFrame(all_decisions)
summary = df[df['label'] != 'Skip'].groupby(['attribute', 'label']).size().reset_index(name='count')
print("Summary by attribute/label:")
try:
    from IPython.display import display 
    display(summary)
except Exception:
    print(summary)

OUT_DIR.mkdir(parents=True, exist_ok=True)
out_csv = OUT_DIR / "clause_classification_summary.csv"
out_json = OUT_DIR / "clause_classification_details.json"
df.to_csv(out_csv, index=False)
with open(out_json, 'w', encoding='utf-8') as f:
    json.dump(all_decisions, f, ensure_ascii=False, indent=2)

print(f"Saved summary CSV: {out_csv.resolve()}")
print(f"Saved details JSON: {out_json.resolve()}")


valid_df = df[df['label'].isin(['Standard', 'Non-Standard'])]

print("Valid classified clauses:")
try:
    from IPython.display import display
    display(valid_df[['clause_id', 'attribute', 'template_used', 'label', 'score', 'rule', 'text']])
except Exception:
    print(valid_df[['clause_id', 'attribute', 'template_used', 'label', 'score', 'rule', 'text']].to_string(index=False))



Loaded attributes: ['Medicaid Timely Filing', 'Medicare Timely Filing', 'No Steerage/SOC', 'Medicaid Fee Schedule', 'Medicare Fee Schedule']
Templates: ['TN_Standard_Template_Redacted_extracted_text', 'WA_Standard_Template_Redacted_extracted_text']
SBERT model loaded: sentence-transformers/all-MiniLM-L6-v2
DeBERTa CE enabled: False
Contract files: ['/content/hilabs/TN_Contract1_Redacted.pdf', '/content/hilabs/WA_2_Redacted.pdf']
TN_Contract1_Redacted.pdf: Extracted 564 clauses
WA_2_Redacted.pdf: Extracted 886 clauses
Summary by attribute/label:


Unnamed: 0,attribute,label,count


Saved summary CSV: /content/outputs/clause_classification_summary.csv
Saved details JSON: /content/outputs/clause_classification_details.json
Valid classified clauses:


Unnamed: 0,clause_id,attribute,template_used,label,score,rule,text


In [None]:

import textwrap

def pretty_print_clause(decision_row: dict):
    print("="*100)
    print(f"Clause ID: {decision_row['clause_id']} | Attribute: {decision_row.get('attribute')} | Template: {decision_row.get('template_used')}")
    print(f"Label: {decision_row['label']} | Rule: {decision_row['rule']} | Score: {decision_row['score']:.3f}")
    print("- Text:")
    wrapped = textwrap.fill(decision_row['text'], width=100)
    print(wrapped)
    print("- Steps:")
    for st in decision_row['steps']:
        sat = "✅" if st['satisfied'] else "❌"
        sc  = "" if st['score'] is None else f" | score={st['score']:.3f}"
        print(f"  {sat} {st['step']}{sc} — {st['comment']}")

try:
    sample = df[df['label'] != 'Skip'].head(5).to_dict(orient='records')
    for row in sample:
        pretty_print_clause(row)
except Exception as e:
    print("No decisions to display yet.", e)
