In [4]:
# This is the code that will run the agreement test.
# Please note that this is not only a Cohen's Kappa test, as it only ideal for categorical fields.
# Match Exact Rate (MAE) is used for numerical fields.

# --------------------------------------------------------------------------------------------------------

# This code is best run on Google Colab. It expects two things:
#              1. 2 folders named ann_A and ann_B, each with the annotator's corresponding JSONs
#              2. JSON files in the format "invoice_T{i}_gen{j}", where the names match in both folders

# If either of these things don't apply, or you end up running the code somewhere else, feel free to augment the code to fit your situation.

In [5]:
import os, re

ANN_A_DIR = "/content/data/ann_A"
ANN_B_DIR = "/content/data/ann_B"
OUTPUT_DIR = "/content/output"
os.makedirs(OUTPUT_DIR, exist_ok=True)

pat = re.compile(r"^(invoice_T\d+_gen\d+)\.json$", re.IGNORECASE)

def list_ids(folder):
    ids = []
    bad = []
    for fn in os.listdir(folder):
        if not fn.endswith(".json"):
            continue
        m = pat.match(fn)
        if m:
            ids.append(m.group(1))
        else:
            bad.append(fn)
    return set(ids), bad

ids_a, bad_a = list_ids(ANN_A_DIR)
ids_b, bad_b = list_ids(ANN_B_DIR)

overlap = sorted(ids_a & ids_b)
missing_in_b = sorted(ids_a - ids_b)
missing_in_a = sorted(ids_b - ids_a)

print(f"A count: {len(ids_a)} | B count: {len(ids_b)} | Overlap pairs: {len(overlap)}")
if bad_a: print("[WARN] A has files not matching pattern:", bad_a)
if bad_b: print("[WARN] B has files not matching pattern:", bad_b)
if missing_in_b: print("[WARN] Present in A but missing in B:", missing_in_b)
if missing_in_a: print("[WARN] Present in B but missing in A:", missing_in_a)

# Preview a few
print("Sample overlap IDs:", overlap[:5])

FileNotFoundError: [Errno 2] No such file or directory: '/content/data/ann_A'

In [None]:
import os, json, re
from datetime import datetime
from collections import Counter
import pandas as pd

# ===================== Config =====================
AMOUNT_ABS_TOL = 0.01       # $0.01 absolute tolerance
AMOUNT_REL_TOL = 0.005      # 0.5% relative tolerance
AGE_ABS_TOL    = 0          # set to 1 if you allow ±1 year
DATE_FORMATS   = ["%Y-%m-%d", "%m/%d/%Y", "%Y/%m/%d", "%m-%d-%Y", "%d-%b-%Y", "%b %d, %Y"]

FNAME_PAT = re.compile(r"^(invoice_T\d+_gen\d+)\.json$", re.IGNORECASE)

FIELDS = [
    "invoice_number","patient_id","invoice_date","due_date","patient_name",
    "patient_age","patient_address","patient_phone","patient_email",
    "admission_date","discharge_date","subtotal_amount","discount_amount",
    "total_amount","provider_name","bed_id"
]

# ===================== Helpers =====================
_ws = re.compile(r"\s+")
def norm_str(s):
    if s is None: return None
    s = str(s).strip().casefold()
    return _ws.sub(" ", s) or None

def norm_phone(s):
    if s is None: return None
    ds = re.sub(r"\D+", "", str(s))
    return ds or None

def parse_date(s):
    if s is None: return None
    s = str(s).strip()
    if not s: return None
    for fmt in DATE_FORMATS:
        try: return datetime.strptime(s, fmt).date()
        except: pass
    try: return datetime.fromisoformat(s).date()
    except: return None

def norm_date(s):
    d = parse_date(s)
    return d.isoformat() if d else None

def parse_float(x):
    if x is None: return None
    if isinstance(x, (int, float)): return float(x)
    s = str(x).strip().replace(",", "")
    s = re.sub(r"[\$£€]", "", s)
    try: return float(s)
    except: return None

def within_tol(a,b,abs_tol=AMOUNT_ABS_TOL,rel_tol=AMOUNT_REL_TOL):
    if a is None or b is None: return False
    da = abs(a-b)
    if da <= abs_tol: return True
    return da / max(abs(a),abs(b),1.0) <= rel_tol

def normalize_record(j, forced_id=None):
    out = dict(j)
    out["invoice_number"]  = norm_str(forced_id if forced_id else j.get("invoice_number"))
    out["patient_id"]      = norm_str(j.get("patient_id"))
    out["invoice_date"]    = norm_date(j.get("invoice_date"))
    out["due_date"]        = norm_date(j.get("due_date"))
    out["patient_name"]    = norm_str(j.get("patient_name"))
    out["patient_age"]     = parse_float(j.get("patient_age"))
    out["patient_address"] = norm_str(j.get("patient_address"))
    out["patient_phone"]   = norm_phone(j.get("patient_phone"))
    out["patient_email"]   = norm_str(j.get("patient_email"))
    out["admission_date"]  = norm_date(j.get("admission_date"))
    out["discharge_date"]  = norm_date(j.get("discharge_date"))
    out["subtotal_amount"] = parse_float(j.get("subtotal_amount"))
    out["discount_amount"] = parse_float(j.get("discount_amount"))
    out["total_amount"]    = parse_float(j.get("total_amount"))
    out["provider_name"]   = norm_str(j.get("provider_name"))
    out["bed_id"]          = norm_str(j.get("bed_id"))
    items = []
    for it in (j.get("line_items") or []):
        items.append({
            "description": norm_str(it.get("description")),
            "code":        norm_str(it.get("code")),
            "amount":      parse_float(it.get("amount")),
        })
    out["line_items"] = items
    return out

def load_by_full_id(folder):
    data = {}
    for fn in os.listdir(folder):
        if not fn.endswith(".json"): continue
        m = FNAME_PAT.match(fn)
        if not m:
            continue
        key = m.group(1)  # 'invoice_Ti_genj'
        with open(os.path.join(folder, fn), "r", encoding="utf-8") as f:
            j = json.load(f)
        # Force the pairing key into invoice_number for consistency
        data[key] = normalize_record(j, forced_id=key)
    return data

def kappa(a,b):
    A = ["<MISSING>" if v is None else str(v) for v in a]
    B = ["<MISSING>" if v is None else str(v) for v in b]
    n = len(A)
    if n == 0: return None
    po = sum(1 for x,y in zip(A,B) if x==y) / n
    ca, cb = Counter(A), Counter(B)
    pe = sum((ca[k]/n)*(cb[k]/n) for k in set(ca)|set(cb))
    return 1.0 if pe == 1 else (po - pe) / (1 - pe)

def exact(a,b):
    n = len(a)
    return None if n==0 else sum(1 for x,y in zip(a,b) if x==y)/n

def mae(a,b):
    diffs = []
    for x,y in zip(a,b):
        if x is None or y is None: continue
        try: diffs.append(abs(float(x)-float(y)))
        except: pass
    return sum(diffs)/len(diffs) if diffs else None

def pct_tol(a,b,abs_tol,rel_tol):
    hits=n=0
    for x,y in zip(a,b):
        if x is None or y is None: continue
        n+=1
        hits += 1 if within_tol(x,y,abs_tol,rel_tol) else 0
    return hits/n if n else None

def index_items(items):
    by_code, by_desc = {}, {}
    for it in items:
        c, d = it.get("code"), it.get("description")
        if c: by_code[c] = it
        elif d: by_desc[d] = it
    return by_code, by_desc

def li_metrics(items_a, items_b):
    by_code_a, by_desc_a = index_items(items_a)
    by_code_b, by_desc_b = index_items(items_b)
    codes = set(by_code_a)|set(by_code_b)
    jaccard = len(set(by_code_a)&set(by_code_b))/len(codes) if codes else 1.0

    desc_ok, amt_ok = [], []
    for c in (set(by_code_a)&set(by_code_b)):
        da, db = by_code_a[c]["description"], by_code_b[c]["description"]
        aa, ab = by_code_a[c]["amount"],      by_code_b[c]["amount"]
        desc_ok.append(1 if da==db else 0)
        amt_ok.append(1 if within_tol(aa,ab) else 0)

    # descriptions-only (no codes on either side)
    desc_only_a = {d:it for d,it in by_desc_a.items() if d not in by_code_a}
    desc_only_b = {d:it for d,it in by_desc_b.items() if d not in by_code_b}
    for d in (set(desc_only_a)&set(desc_only_b)):
        da, db = desc_only_a[d]["description"], desc_only_b[d]["description"]
        aa, ab = desc_only_a[d]["amount"],      desc_only_b[d]["amount"]
        desc_ok.append(1 if da==db else 0)
        amt_ok.append(1 if within_tol(aa,ab) else 0)

    desc_rate = sum(desc_ok)/len(desc_ok) if desc_ok else (1.0 if (not items_a and not items_b) else 0.0)
    amt_rate  = sum(amt_ok)/len(amt_ok)   if amt_ok  else (1.0 if (not items_a and not items_b) else 0.0)

    if codes:
        va = [1 if c in by_code_a else 0 for c in sorted(codes)]
        vb = [1 if c in by_code_b else 0 for c in sorted(codes)]
        k = kappa([str(x) for x in va], [str(x) for x in vb])
    else:
        k = 1.0

    return {
        "jaccard_codes": jaccard,
        "desc_agree_rate": desc_rate,
        "amount_within_tol_rate": amt_rate,
        "kappa_presence_codes": k,
        "codes_in_A": len(by_code_a),
        "codes_in_B": len(by_code_b),
        "matched_code_count": len(set(by_code_a)&set(by_code_b))
    }

# ===================== Load & Compare =====================
data_a = load_by_full_id(ANN_A_DIR)
data_b = load_by_full_id(ANN_B_DIR)

ids_a, ids_b = set(data_a), set(data_b)
overlap = sorted(ids_a & ids_b)
print(f"Comparing {len(overlap)} overlapped invoices...")

# Per-invoice line item metrics
li_rows = []
for inv_id in overlap:
    na = data_a[inv_id]
    nb = data_b[inv_id]
    lm = li_metrics(na.get("line_items", []), nb.get("line_items", []))
    lm["invoice_number"] = inv_id
    li_rows.append(lm)

# Field-level rollups across all overlapped pairs
summary = []
def collect(field):
    A = [data_a[i].get(field) for i in overlap]
    B = [data_b[i].get(field) for i in overlap]
    return A,B

# String/Date/IDs: exact + kappa
for f in ["invoice_number","patient_id","invoice_date","due_date","patient_name",
          "patient_address","patient_phone","patient_email","admission_date",
          "discharge_date","provider_name","bed_id"]:
    A,B = collect(f)
    summary += [
        {"field": f, "metric": "exact_match_rate", "value": exact(A,B)},
        {"field": f, "metric": "cohens_kappa",     "value": kappa(A,B)},
    ]

# Numeric: MAE + pct within tolerance
for f in ["patient_age","subtotal_amount","discount_amount","total_amount"]:
    A,B = collect(f)
    summary += [
        {"field": f, "metric": "mae",            "value": mae(A,B)},
        {"field": f, "metric": "pct_within_tol", "value": pct_tol(A,B,
                                                                  AMOUNT_ABS_TOL if f!="patient_age" else AGE_ABS_TOL,
                                                                  AMOUNT_REL_TOL if f!="patient_age" else 0.0)},
    ]

df_summary = pd.DataFrame(summary).sort_values(["field","metric"]).reset_index(drop=True)
df_li = pd.DataFrame(li_rows)

# Save
df_summary.to_csv(os.path.join(OUTPUT_DIR, "qa_report.csv"), index=False)
if not df_li.empty:
    df_li.to_csv(os.path.join(OUTPUT_DIR, "line_item_details.csv"), index=False)

print("Saved to:", OUTPUT_DIR)
display(df_summary.head(30))
if not df_li.empty:
    display(df_li)

Comparing 8 overlapped invoices...
Saved to: /content/output


Unnamed: 0,field,metric,value
0,admission_date,cohens_kappa,1.0
1,admission_date,exact_match_rate,1.0
2,bed_id,cohens_kappa,1.0
3,bed_id,exact_match_rate,1.0
4,discharge_date,cohens_kappa,1.0
5,discharge_date,exact_match_rate,1.0
6,discount_amount,mae,0.0
7,discount_amount,pct_within_tol,1.0
8,due_date,cohens_kappa,1.0
9,due_date,exact_match_rate,1.0


Unnamed: 0,jaccard_codes,desc_agree_rate,amount_within_tol_rate,kappa_presence_codes,codes_in_A,codes_in_B,matched_code_count,invoice_number
0,1.0,1.0,1.0,1.0,3,3,3,invoice_T1_gen1
1,1.0,1.0,1.0,1.0,2,2,2,invoice_T1_gen2
2,1.0,1.0,1.0,1.0,2,2,2,invoice_T1_gen3
3,1.0,1.0,1.0,1.0,3,3,3,invoice_T2_gen1
4,1.0,1.0,1.0,1.0,2,2,2,invoice_T2_gen2
5,1.0,0.5,1.0,1.0,2,2,2,invoice_T2_gen3
6,1.0,1.0,1.0,1.0,2,2,2,invoice_T3_gen1
7,1.0,0.5,1.0,1.0,2,2,2,invoice_T3_gen2


In [None]:
def disagreement_report(overlap_ids, data_a, data_b, out_csv):
    rows = []
    for inv in overlap_ids:
        na, nb = data_a[inv], data_b[inv]
        # Scalars
        for f in FIELDS:
            va, vb = na.get(f), nb.get(f)
            agree = (va == vb)
            if f in {"subtotal_amount","discount_amount","total_amount"}:
                agree = within_tol(va, vb)
            if f == "patient_age" and AGE_ABS_TOL > 0:
                agree = within_tol(va, vb, abs_tol=AGE_ABS_TOL, rel_tol=0.0)
            if not agree:
                rows.append({"invoice_number": inv, "field": f, "A": va, "B": vb})

        # Line-items by code
        by_code_a, _ = index_items(na.get("line_items", []))
        by_code_b, _ = index_items(nb.get("line_items", []))
        codes = sorted(set(by_code_a) | set(by_code_b))
        for c in codes:
            a_it, b_it = by_code_a.get(c), by_code_b.get(c)
            if a_it and not b_it:
                rows.append({"invoice_number": inv, "field": f"line_items[{c}]", "A": "present", "B": "missing"})
            elif b_it and not a_it:
                rows.append({"invoice_number": inv, "field": f"line_items[{c}]", "A": "missing", "B": "present"})
            elif a_it and b_it:
                if a_it.get("description") != b_it.get("description"):
                    rows.append({"invoice_number": inv, "field": f"line_items[{c}].description",
                                 "A": a_it.get("description"), "B": b_it.get("description")})
                if not within_tol(a_it.get("amount"), b_it.get("amount")):
                    rows.append({"invoice_number": inv, "field": f"line_items[{c}].amount",
                                 "A": a_it.get("amount"), "B": b_it.get("amount")})

    df = pd.DataFrame(rows)
    if not df.empty:
        df.to_csv(out_csv, index=False)
    return df

diff_path = os.path.join(OUTPUT_DIR, "disagreements.csv")
df_diff = disagreement_report(overlap, data_a, data_b, diff_path)
print("Disagreements rows:", len(df_diff))
display(df_diff.head(25))

Disagreements rows: 4


Unnamed: 0,invoice_number,field,A,B
0,invoice_T2_gen3,line_items[cpt500].description,ct scan head,ct head scan
1,invoice_T3_gen1,patient_age,40.0,41.0
2,invoice_T3_gen1,provider_name,metro health,
3,invoice_T3_gen2,line_items[dme100].description,crutches,crutch
