# Insurance Knowledge Graph (Pure-Python, Online-Runnable)

This notebook is **self-contained**—no external installs required.
It seeds data, builds a tiny in-memory KG, runs TF–IDF retrieval, and answers with citations.


In [None]:
import os, csv, re, math
from collections import defaultdict, Counter

DATA_DIR = "insurance_kg_demo"
os.makedirs(DATA_DIR, exist_ok=True)

seed = {
    "policies.csv": """policy_number,effective_date,expiry_date,product,jurisdiction
P123,2024-01-01,2024-12-31,Homeowners,CA
P456,2024-06-01,2025-05-31,CommercialProperty,NY
""",
    "coverages.csv": """policy_number,coverage_type,limit,deductible
P123,Property,500000,1000
P123,Liability,300000,0
P456,Property,2000000,5000
""",
    "exclusions.csv": """policy_number,exclusion_name,clause_ref,text
P123,Flood,Clause 4.2,"Flood losses are excluded unless a flood rider is attached."
P123,Earth Movement,Clause 4.3,"Earthquake and earth movement are excluded."
P456,War,Clause 5.1,"Losses caused by war or warlike operations are excluded."
""",
    "riders.csv": """policy_number,rider_name,clause_ref,text
P123,R-FLD,Clause 7.1,"Adds flood coverage limit 100000 with $5000 deductible."
""",
    "claims.csv": """claim_id,policy_number,date_of_loss,cause,amount,status
CLM-987,P123,2019-11-12,Flood,18000,Closed
CLM-654,P123,2023-02-10,Wind,3500,Closed
CLM-222,P456,2024-12-01,Fire,120000,Open
""",
    "risks.csv": """risk_id,type,score,model_version,policy_number
RISK-1,Flood,0.72,rmv2,P123
RISK-2,Earthquake,0.41,rmv2,P123
RISK-3,Fire,0.65,rmv3,P456
""",
    "regulations.csv": """jurisdiction,citation,section,text
CA,CCR-Title10,§2695.4,"Fair claims settlement practices definitions include flood handling..."
NY,NYCRR-Insurance,§216.0,"Unfair claims settlement standards for property claims..."
""",
    # Use literal tabs for TSV
    "document_chunks.tsv": """doc_id\tchunk_id\tnode_type\tnode_key\tsource_uri\tpage\tclause\ttext
POL-P123\tc1\tPolicy\tP123\ts3://bucket/POL-P123.pdf\t12\tClause 4.2\tFlood losses are excluded unless a flood rider is attached.
RID-R-FLD\tc2\tRider\tR-FLD\ts3://bucket/RID-R-FLD.pdf\t2\tClause 7.1\tAdds flood coverage limit 100000 with $5000 deductible.
CLAIM-987\tc3\tClaim\tCLM-987\ts3://bucket/CLAIM-987.txt\t3\t\t2019 basement flood claim paid 18000 after assessment.
REG-CA\tc4\tRegulation\tCCR-Title10\ts3://regs/CCR-Title10.html\t18\t§2695.4\tFair claims settlement practices on flood handling in California.
POL-P123\tc5\tPolicy\tP123\ts3://bucket/POL-P123.pdf\t13\tClause 4.3\tEarth movement and earthquake are excluded.
POL-P456\tc6\tPolicy\tP456\ts3://bucket/POL-P456.pdf\t20\tClause 5.1\tLosses caused by war or warlike operations are excluded.
""",
}

for name, content in seed.items():
    path = os.path.join(DATA_DIR, name)
    if not os.path.exists(path):
        with open(path, "w", encoding="utf-8") as f:
            f.write(content)

print("Seeded sample data to", DATA_DIR)

In [None]:
def read_csv(path):
    rows = []
    with open(path, newline="", encoding="utf-8") as f:
        for row in csv.DictReader(f):
            rows.append(row)
    return rows

def read_tsv(path):
    rows = []
    with open(path, newline="", encoding="utf-8") as f:
        reader = csv.DictReader(f, delimiter="\t")
        for row in reader:
            rows.append(row)
    return rows

policies    = read_csv(os.path.join(DATA_DIR, "policies.csv"))
coverages   = read_csv(os.path.join(DATA_DIR, "coverages.csv"))
exclusions  = read_csv(os.path.join(DATA_DIR, "exclusions.csv"))
riders      = read_csv(os.path.join(DATA_DIR, "riders.csv"))
claims      = read_csv(os.path.join(DATA_DIR, "claims.csv"))
risks       = read_csv(os.path.join(DATA_DIR, "risks.csv"))
regulations = read_csv(os.path.join(DATA_DIR, "regulations.csv"))
doc_chunks  = read_tsv(os.path.join(DATA_DIR, "document_chunks.tsv"))

print(f"Loaded: {len(policies)} policies, {len(doc_chunks)} chunks")

In [None]:
def nid_policy(policy_number): return f"Policy:{policy_number.upper()}"
def nid_coverage(policy_number, coverage_type): return f"Coverage:{policy_number.upper()}:{coverage_type}"
def nid_exclusion(policy_number, name): return f"Exclusion:{policy_number.upper()}:{name}"
def nid_rider(policy_number, name): return f"Rider:{policy_number.upper()}:{name}"
def nid_claim(claim_id): return f"Claim:{claim_id.upper()}"
def nid_risk(risk_id): return f"Risk:{risk_id.upper()}"
def nid_reg(j,c,s): return f"Regulation:{j}:{c}:{s}"
def nid_doc(doc_id, chunk_id): return f"DocumentChunk:{doc_id}|{chunk_id}"

nodes = {}
edges_out = defaultdict(list)
edges_in  = defaultdict(list)

def add_node(node_id, typ, **props): nodes[node_id] = {"type": typ, "props": dict(props)}
def add_edge(src, rel, dst): edges_out[src].append((rel, dst)); edges_in[dst].append((rel, src))

for p in policies: add_node(nid_policy(p["policy_number"]), "Policy", **p)
for c in coverages: add_node(nid_coverage(c["policy_number"], c["coverage_type"]), "Coverage", **c)
for e in exclusions: add_node(nid_exclusion(e["policy_number"], e["exclusion_name"]), "Exclusion", **e)
for r in riders: add_node(nid_rider(r["policy_number"], r["rider_name"]), "Rider", **r)
for c in claims: add_node(nid_claim(c["claim_id"]), "Claim", **c)
for r in risks: add_node(nid_risk(r["risk_id"]), "Risk", **r)
for rg in regulations: add_node(nid_reg(rg["jurisdiction"], rg["citation"], rg["section"]), "Regulation", **rg)
for d in doc_chunks: add_node(nid_doc(d["doc_id"], d["chunk_id"]), "DocumentChunk", **d)

for c in coverages: add_edge(nid_policy(c["policy_number"]), "HAS_COVERAGE", nid_coverage(c["policy_number"], c["coverage_type"]))
for e in exclusions: add_edge(nid_policy(e["policy_number"]), "HAS_EXCLUSION", nid_exclusion(e["policy_number"], e["exclusion_name"]))
for r in riders: add_edge(nid_policy(r["policy_number"]), "HAS_RIDER", nid_rider(r["policy_number"], r["rider_name"]))
for c in claims: add_edge(nid_claim(c["claim_id"]), "AGAINST_POLICY", nid_policy(c["policy_number"]))
for r in risks: add_edge(nid_risk(r["risk_id"]), "ASSESSMENT_OF", nid_policy(r["policy_number"]))

for d in doc_chunks:
    nk, nt = d["node_key"], d["node_type"]
    if nt == "Policy": add_edge(nid_doc(d["doc_id"], d["chunk_id"]), "EVIDENCES", nid_policy(nk))
    elif nt == "Rider": add_edge(nid_doc(d["doc_id"], d["chunk_id"]), "EVIDENCES", nid_rider("P123", nk))
    elif nt == "Claim": add_edge(nid_doc(d["doc_id"], d["chunk_id"]), "EVIDENCES", nid_claim(nk))
    elif nt == "Regulation": add_edge(nid_doc(d["doc_id"], d["chunk_id"]), "EVIDENCES", nid_policy("P123"))

print(f"In-memory KG ready: {len(nodes)} nodes, {sum(len(v) for v in edges_out.values())} edges")

In [None]:
TOKEN_RE = re.compile(r"[A-Za-z0-9#]+")
def tokenize(text): return [t.lower() for t in TOKEN_RE.findall(text or "")]

corpus = [{"id": f"{d['doc_id']}:{d['chunk_id']}", "text": d["text"], "meta": d} for d in doc_chunks]
df = Counter(); [df.update(set(tokenize(doc["text"]))) for doc in corpus]
N = len(corpus) or 1
idf = {t: (math.log((N+1)/(df_t+1)) + 1.0) for t, df_t in df.items()}

def tf(text):
    c = Counter(tokenize(text)); tot = sum(c.values()) or 1
    return {t: c[t]/tot for t in c}

def tfidf_vec(text):
    tfq = tf(text)
    return {t: tfq[t] * idf.get(t, 0.0) for t in tfq}

def cosine_sim(v1, v2):
    keys = set(v1) | set(v2)
    dot = sum(v1.get(k, 0.0) * v2.get(k, 0.0) for k in keys)
    n1 = math.sqrt(sum(x*x for x in v1.values())) or 1e-9
    n2 = math.sqrt(sum(x*x for x in v2.values())) or 1e-9
    return dot / (n1 * n2)

def top_k_chunks(query, k=6):
    qv = tfidf_vec(query); scored = []
    for doc in corpus:
        dv = tfidf_vec(doc["text"])
        s = cosine_sim(qv, dv)
        if s > 0: scored.append((s, doc))
    scored.sort(reverse=True, key=lambda x: x[0])
    return [d for s, d in scored[:k]]

In [None]:
POLICY_RE = re.compile(r"\bP\d+\b", re.IGNORECASE)
CLAIM_RE  = re.compile(r"\bCLM-\d+\b", re.IGNORECASE)

def extract_anchors(question):
    policies = POLICY_RE.findall(question) or []
    claims = CLAIM_RE.findall(question) or []
    keywords = set(tokenize(question))
    intents = {
        "exclusion": any(k in keywords for k in ["exclude","exclusion","excluded","exclusions"]),
        "rider": any(k in keywords for k in ["rider","riders"]),
        "claim": any(k in keywords for k in ["claim","claims","loss","losses"]),
        "coverage": any(k in keywords for k in ["cover","coverage","covered","limit","deductible"]),
        "reg": any(k in keywords for k in ["regulation","regulations","jurisdiction","law","section"]),
    }
    risks = [k for k in ["flood","earth","earthquake","wind","fire","war"] if k in keywords]
    return {"policies": [p.upper() for p in policies], "claims": [c.upper() for c in claims], "intents": intents, "risk_terms": risks}

def expand_graph_from_policy(policy_number, max_hops=2):
    start = nid_policy(policy_number)
    visited = {start}; frontier = [start]; results = {start}; hops = 0
    while frontier and hops < max_hops:
        nxt = []
        for u in frontier:
            for rel, v in edges_out.get(u, []):
                results.add(v)
                if v not in visited: visited.add(v); nxt.append(v)
            for rel, v in edges_in.get(u, []):
                results.add(v)
                if v not in visited: visited.add(v); nxt.append(v)
        frontier = nxt; hops += 1
    ev = []
    for nid in results:
        for rel, src in edges_in.get(nid, []):
            if rel == "EVIDENCES" and nodes.get(src, {}).get("type") == "DocumentChunk":
                ev.append(nodes[src]["props"])
    return results, ev

def retrieve(question):
    anchors = extract_anchors(question)
    node_ids = set(); evidence = []
    for p in anchors["policies"]:
        nids, ev = expand_graph_from_policy(p)
        node_ids |= nids; evidence.extend(ev)
    seen = set(); ev_uniq = []
    for d in evidence:
        key = (d.get("doc_id"), d.get("chunk_id"))
        if key not in seen: seen.add(key); ev_uniq.append(d)
    text_hits = [doc["meta"] for doc in top_k_chunks(question, k=6)]
    seen = set(); merged = []
    for d in text_hits + ev_uniq:
        key = (d.get("doc_id"), d.get("chunk_id"))
        if key not in seen: seen.add(key); merged.append(d)
    return anchors, node_ids, merged[:10]

def format_citation(d):
    return f"{d.get('doc_id','')}:{d.get('clause','')} p{d.get('page','')}".strip()

def answer(question, anchors, node_ids, chunks):
    parts = []; citations = []
    qlow = question.lower()
    def find(term): return [d for d in chunks if term in (d.get("text") or "").lower()]
    if anchors["intents"]["exclusion"]:
        parts.append("**Exclusions**:")
        for nid in node_ids:
            if isinstance(nid, str) and nid.startswith("Exclusion:"):
                exname = nid.split(":")[-1]
                sn = find(exname.lower())
                if sn:
                    cite = format_citation(sn[0]); parts.append(f"- {exname} — see {cite}"); citations.append(cite)
    if anchors["intents"]["rider"] or anchors["intents"]["coverage"]:
        parts.append("**Riders & Coverage Notes**:")
        for nid in node_ids:
            if isinstance(nid, str) and nid.startswith("Rider:"):
                rname = nid.split(":")[-1]
                sn = find(rname.lower())
                if sn:
                    cite = format_citation(sn[0]); parts.append(f"- {rname}: {sn[0]['text']} ({cite})"); citations.append(cite)
    if anchors["intents"]["claim"]:
        parts.append("**Claims**:")
        for nid in node_ids:
            if isinstance(nid, str) and nid.startswith("Claim:"):
                clid = nid.split(":")[-1]
                sn = find(clid.lower())
                if sn:
                    cite = format_citation(sn[0]); parts.append(f"- {clid}: see {cite}"); citations.append(cite)
    if anchors["risk_terms"]:
        parts.append("**Risk-specific note**:")
        for term in anchors["risk_terms"]:
            sn = find(term)
            if sn:
                cite = format_citation(sn[0]); parts.append(f"- Evidence for '{term}': {cite}"); citations.append(cite)
    if not parts:
        parts.append("**Top Evidence Excerpts** (fallback):")
        for d in chunks[:5]:
            cite = format_citation(d); parts.append(f"- {d['text']} ({cite})"); citations.append(cite)
    uniq = []; seen = set()
    for c in citations:
        if c not in seen: seen.add(c); uniq.append(c)
    return "\n".join(parts), uniq

In [None]:
examples = [
    "List all exclusions for policy P123 and indicate if any riders restore coverage.",
    "After adding R-FLD, does P123 cover flood and what deductible applies?",
    "Show claims on P123 related to flood and the relevant clauses.",
    "Are there any California regulations affecting flood claim handling for P123?"
]

for q in examples:
    anchors, node_ids, chunks = retrieve(q)
    ans, cites = answer(q, anchors, node_ids, chunks)
    print("Q:", q)
    print(ans)
    print("Citations:", cites[:5], "...")
    print("-"*80)

q = "After adding R-FLD, does P123 cover flood and what deductible applies?"
anchors, node_ids, chunks = retrieve(q)
ans, cites = answer(q, anchors, node_ids, chunks)
assert "5000" in ans, "Expected deductible '5000' not found in answer."
print("Smoke test passed.")