# This is a sample Jupyter Notebook

Below is an example of a code cell. 
Put your cursor into the cell and press Shift+Enter to execute it and select the next one, or click 'Run Cell' button.

Press Double Shift to search everywhere for classes, files, tool windows, actions, and settings.

To learn more about Jupyter Notebooks in PyCharm, see [help](https://www.jetbrains.com/help/pycharm/ipython-notebook-support.html).
For an overview of PyCharm, go to Help -> Learn IDE features or refer to [our documentation](https://www.jetbrains.com/help/pycharm/getting-started.html).

In [2]:
import csv
import logging
import pandas as pd
import os
import json


# STEP1: SCHEMA NORMALIZATION
ISSUE: Each hospital uses a different format to send their claims (some in tables, some in JSON-like forms)

Here we ensure that claims (in file format) is 'standardized'

END GOAL: Translates them into a single, consistent format so that:
   1. Clients can understand them.
   2. Easily Workable with

In [3]:
class FileLoader:
    def __init__(self, path):
        self.filename = path
        self.schemas = {}

    def load(self):
        file_extension = os.path.splitext(self.filename)[1]

        # we keep a newly encountered schema format
        schema_id = self.schemas.get(file_extension, len(self.schemas))
        self.schemas[file_extension] = schema_id

        fp = open(self.filename, 'r')
        if file_extension == '.csv':
            try:
                df = pd.read_csv(fp)
                df["source"] = file_extension
                return df
            except IOError as e:
                logger.exception(file_extension.upper()+": "+e.message)
        elif file_extension == '.json':
            try:
                df = pd.DataFrame(json.load(fp))
                df["source"] = file_extension
                return df
            except IOError as e:
                logger.exception(file_extension.upper()+": "+ e.message)

        # Must learn all data file extensions and include them here (as elif branches)

        raise ValueError("Invalid file extension") #System only works for readable text/byte files


class Normaliser:
    def __init__(self):
        self.vocabulary = {
            "none": "",
            "": "",
            "claim_id":"claim_id",
            "invoice_id":"claim_id",
            "id":"claim_id",
            "order_id":"claim_id",
            "patient_id":"patient_id",
            "member_id":"patient_id",
            "member":"patient_id",
            "client_id":"patient_id",
            "procedure_code":"procedure_code",
            "code":"procedure_code",
            "proc_code":"procedure_code",
            "denial_reason":"denial_reason",
            "result":"denial_reason",
            "error_msg":"denial_reason",
            "message":"denial_reason",
            "source":"source_system",
            "submitted_at":"submitted_at",
            "date":"submitted_at",
            "status":"status",
        }

        self.unified_schema = {}

    def reason(self, claim):
        return self.vocabulary[claim]

    def get_unified_scheme(self):
        return self.unified_schema

    def unify_schema(self, df):
        d = df.copy()

        for attribute in df.columns.tolist():
            #strictly ensuring that attributes follow the unification schema
            self.unified_schema[attribute] = self.vocabulary[attribute].lower().strip()

        d = d.rename(columns=self.unified_schema)

        # d = d[list(self.unified_schema.keys())]  # We filter the data to contain only the unified schema attributes
        return d

    def enforce_as_string(self, d, attribute):
        """
        Ensures domain integrity as Text/String
        :param d: Dataframe
        :param attribute: From the unified schema
        :return: String as text
        """
        return d[attribute].astype(str).str.strip() if attribute != "None" else ""

    def enforce_as_date(self, attribute):
        """
        Converts the 'string' date in ISO date with pandas
        :param attribute: From the unified schema
        :return: Timestamp
        """
        if pd.isna(attribute):
            return pd.NaT
        try:
            return pd.to_datetime(attribute).normalize()
        except:
            return pd.NaT


def standardise_unified_attribute_type(d, normaliser):
    """
    Applies domain integrity on attribtues
    :param d: Dataframe with no constraints
    :return: Dataframe with constraints
    """
    d["patient_id"] = normaliser.enforce_as_string(d, "patient_id")
    d.loc[d["patient_id"].isin(["", "None", "nan"]), "patient_id"] = pd.NA
    d["procedure_code"] = normaliser.enforce_as_string(d, "procedure_code")
    d["denial_reason"] = normaliser.enforce_as_string(d, "denial_reason")
    d["submitted_at"] = d["submitted_at"].apply(normaliser.enforce_as_date)
    d["status"] = normaliser.enforce_as_string(d, "status")
    d["source_system"] = normaliser.enforce_as_string(d, "source_system")
    return d




In [56]:
# Deterministic rule set catalog (each returns (eligible_bool, recommended_action, rule_id))
def rule_denied_status(row: pd.Series):
    """
    Checks whether a claim is denied or approved
    :param row: Claim
    :return: True = Denial, False = Approved
    """
    eligible = row.get("status") == "denied"
    return eligible, "Proceed to denial analysis" if eligible else "Not denied", "R1"

def rule_require_patient_id(row: pd.Series):
    """
    Checks whether a patient ID is valid
    :param row:
    :return: True = Valid, passed
    """
    has_pid = pd.notna(row.get("patient_id"))
    return has_pid, "Patient ID present" if has_pid else "Missing patient ID", "R2"

def rule_validity_by_date(row_data):
    """
    :param row_data: Claim
    :return: True = expired a week ago, False = Valid
    """
    return  ((pd.Timestamp(pd.to_datetime("2025-7-30")) - pd.to_datetime(row_data.get("submitted_at"),unit="ms")) > pd.Timedelta(days=7)), "Expired a week ago; Resubmit", "R3"

def rule_missing_modifier_for_em(row: pd.Series):
    """
    Deterministic + inferable:
    - If denial_reason == 'Missing modifier' and procedure_code difference differ insignificantly,
      we infer modifier likely required. Eligible if true.
    """
    reason = row.get("denial_reason", "")
    code = str(row.get("procedure_code", "")).strip()

    #refer to  "procedure code hospital admission 99213, 99214, 99215" on Google: outpatient codes
    em_codes = {"99213", "99214", "99215"}
    if reason == "Missing modifier" and code in em_codes:
        return True, "Add modifier and resubmit", "R4a"
    if reason == "Missing modifier":
        return True, "Add required modifier and resubmit", "R4"
    return False, "N/A", "R4"


def rule_incorrect_npi(row: pd.Series):
    """
    If denial_reason == 'Incorrect NPI', assume we can correct provider record and resubmit.
    """
    if row.get("denial_reason") == "Incorrect NPI":
        return True, "Correct provider NPI and resubmit", "R4b"
    return False, "N/A", "R4"

def rule_auth_issues(row: pd.Series):
    """
    If denial_reason in {'Prior auth required','Authorization expired'}, allow resubmission
    after obtaining/renewing authorization.
    """
    reason = row.get("denial_reason")
    if reason in {"Prior auth required", "Authorization expired"}:
        return True, "Obtain or renew authorization and resubmit", "R4c"
    return False, "N/A", "R4"

def rule_incorrect_provider_type(row: pd.Series):
    """
    If denial_reason == 'Incorrect provider type', allow resubmission after correcting taxonomy/credentialing.
    """
    if row.get("denial_reason") == "Incorrect provider type":
        return True, "Update provider taxonomy/role and resubmit", "R4d"
    return False, "N/A", "R4"

def rule_incorrect_procedure(row: pd.Series):
    """
    If denial_reason == 'Incorrect procedure', flag as *not* auto-resubmittable—needs coding review/appeal.
    """
    if row.get("denial_reason") == "Incorrect procedure":
        return False, "Manual coding review; likely appeal, not auto-resubmit", "R5"
    return False, "N/A", "R5"

RULES = [
    rule_denied_status,
    rule_require_patient_id,
    rule_validity_by_date,
    rule_missing_modifier_for_em,
    rule_incorrect_npi,
    rule_auth_issues,
    rule_incorrect_provider_type,
    rule_incorrect_procedure,
]

def evaluate_eligibility(row: pd.Series):
    """
    Execute rule chain:
    - R1 = status
    - R2 = Patient ID
    - R3 = Expiry Date
    - R4 = Clear or inferred deniable reason: a - procedure, b - npi, c- auth, d - incomplete info/manual
    - If no True rule beyond gates, the claim is not eligible.
    """
    results = []
    eligible = True
    # Apply R0 and R1 as gates
    for r in RULES[:2]:
        ok, action, rid = r(row)
        results.append({"rule_id": rid, "result": ok, "action": action})
        if not ok:
            eligible = False
    # If gates pass, try subsequent rules for resubmission action
    action = ""
    matched_rule = None
    if eligible:
        for r in RULES[2:]:
            ok, a, rid = r(row)
            results.append({"rule_id": rid, "result": ok, "action": a})
            if ok and not matched_rule:
                matched_rule = rid
                action = a
        eligible = matched_rule is not None

    return {
        "eligible_for_resubmission": bool(eligible),
        "recommended_changes": action if eligible else "",
        "rule_trace": results,
    }


In [51]:


logger = logging.getLogger("claims_pipeline")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter(
    "%(asctime)s | %(levelname)s | %(name)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
)
handler.setFormatter(formatter)
# Avoid duplicate handlers if re-running
if not logger.handlers:
    logger.addHandler(handler)


def run_pipeline():
    """
    A
    :return:
    """
    metrics = {
        "ingested": 0,
        "normalized": 0,
        "denied": 0,
        "eligible": 0,
        "ineligible": 0,
        "by_source": {},
    }

    # 1) Ingest
    logger.info("Ingesting sources...")

    #File pointers
    emr_data1 = FileLoader("EMR/source/source1.csv")
    emr_data2 = FileLoader("EMR/source/source2.json")

    #Loading the data from disk into memory
    data1 = emr_data1.load()
    data2 = emr_data2.load()

    #Logging Data type and size
    metrics["ingested"] = len(data1) + len(data2)
    metrics["by_source"]["CSV CLAIMS"] = len(data1)
    metrics["by_source"]["JSON CLAIMS"] = len(data2)

    # 2) Normalize / unify schema
    logger.info("Normalizing sources...")

    normaliser = Normaliser()
    n1 = normaliser.unify_schema(data1)
    n2 = normaliser.unify_schema(data2)
    merged_records = pd.concat([n1, n2], ignore_index=True)
    metrics["normalized"] = len(merged_records)
    merged_records = standardise_unified_attribute_type(merged_records, normaliser)

    # 3) Evaluate eligibility rules
    logger.info("Evaluating eligibility...")
    evals = merged_records.apply(evaluate_eligibility, axis=1, result_type="expand")
    # evals has columns: eligible_for_resubmission, recommended_action, rule_trace
    out = pd.concat([merged_records, evals], axis=1)

    # Compute metrics
    metrics["denied"] = int((out["status"] == "denied").sum())
    metrics["eligible"] = int(out["eligible_for_resubmission"].sum())
    metrics["ineligible"] = int((~out["eligible_for_resubmission"]).sum())

    # 4) Produce clean outputs (normalized + filtered eligible)
    eligible_df = out.loc[out["eligible_for_resubmission"], [
        "claim_id", "denial_reason", "source_system", "recommended_changes"
    ]].copy()

    eligible_df = eligible_df.rename(columns={
    "denial_reason": "resubmission_reason",
    })

    # Save outputs
    normalized_path = "EMR/claims_normalized.csv"
    eligible_path = "EMR/claims_resubmit.json"
    out.to_csv(normalized_path, index=False)
    eligible_df.to_json(eligible_path, orient="records", index=False)

    logger.info("Pipeline complete.")
    logger.info("Metrics: %s", metrics)

    return out, eligible_df, metrics, normalized_path, eligible_path

# Run the pipeline
full_df, eligible_df, metrics, normalized_path, eligible_path = run_pipeline()



2025-08-22 10:19:53 | INFO | claims_pipeline | Ingesting sources...
2025-08-22 10:19:53 | INFO | claims_pipeline | Normalizing sources...
2025-08-22 10:19:53 | INFO | claims_pipeline | Evaluating eligibility...
2025-08-22 10:19:53 | INFO | claims_pipeline | Pipeline complete.
2025-08-22 10:19:53 | INFO | claims_pipeline | Metrics: {'ingested': 9, 'normalized': 9, 'denied': 7, 'eligible': 5, 'ineligible': 4, 'by_source': {'CSV CLAIMS': 5, 'JSON CLAIMS': 4}}


In [52]:
full_df

Unnamed: 0,claim_id,patient_id,procedure_code,denial_reason,submitted_at,status,source_system,eligible_for_resubmission,recommended_changes,rule_trace
0,A123,P001,99213,Missing modifier,2025-07-01,denied,.csv,True,Expired a week ago,"[{'rule_id': 'R1', 'result': True, 'action': '..."
1,A124,P002,99214,Incorrect NPI,2025-07-10,denied,.csv,True,Expired a week ago,"[{'rule_id': 'R1', 'result': True, 'action': '..."
2,A125,,99215,Authorization expired,2025-07-05,denied,.csv,False,,"[{'rule_id': 'R1', 'result': True, 'action': '..."
3,A126,P003,99381,,2025-07-15,approved,.csv,False,,"[{'rule_id': 'R1', 'result': False, 'action': ..."
4,A127,P004,99401,Prior auth required,2025-07-20,denied,.csv,True,Expired a week ago,"[{'rule_id': 'R1', 'result': True, 'action': '..."
5,B987,P010,99213,Incorrect provider type,2025-07-03,denied,.json,True,Expired a week ago,"[{'rule_id': 'R1', 'result': True, 'action': '..."
6,B988,P011,99214,Missing modifier,2025-07-09,denied,.json,True,Expired a week ago,"[{'rule_id': 'R1', 'result': True, 'action': '..."
7,B989,P012,99215,,2025-07-10,approved,.json,False,,"[{'rule_id': 'R1', 'result': False, 'action': ..."
8,B990,,99401,incorrect procedure,2025-07-01,denied,.json,False,,"[{'rule_id': 'R1', 'result': True, 'action': '..."


In [55]:
eligible_df

Unnamed: 0,claim_id,resubmission_reason,source_system,recommended_changes
0,A123,Missing modifier,.csv,Expired a week ago
1,A124,Incorrect NPI,.csv,Expired a week ago
4,A127,Prior auth required,.csv,Expired a week ago
5,B987,Incorrect provider type,.json,Expired a week ago
6,B988,Missing modifier,.json,Expired a week ago
