In [None]:
create or replace DATABASE CCDA_FINAL_ASSIGNMENT;
USE DATABASE CCDA_FINAL_ASSIGNMENT;


In [None]:
create or replace schema CCDA_FINAL_ASSIGNMENT.CCDA;
USE SCHEMA   CCDA_FINAL_ASSIGNMENT.CCDA;

In [None]:
DESC integration S3_STORAGE_INTEGRATION;

--STORAGE_AWS_IAM_USER_ARN arn:aws:iam::053442322070:user/riee1000-s
--STORAGE_AWS_EXTERNAL_ID  XXC67424_SFCRole=6_sBbq3/6qfjlLqEzGx3tBav9VsRU=


In [None]:
-- -- Create Snowflake Stage pointing at S3 XMLs

CREATE OR REPLACE STAGE CCDA_FINAL_ASSIGNMENT.CCDA.CCDA_XML_STAGE 
    URL = 's3://da-batch2-group1-capstone/CCDA_FILES/'
    STORAGE_INTEGRATION = S3_STORAGE_INTEGRATION; 

In [None]:
LIST @CCDA_FINAL_ASSIGNMENT.CCDA.CCDA_XML_STAGE

In [None]:

-- Create a named file format for XML
CREATE OR REPLACE FILE FORMAT CCDA_FINAL_ASSIGNMENT.CCDA.FF_XML
  TYPE = XML
  DISABLE_AUTO_CONVERT = TRUE;


In [None]:

CREATE OR REPLACE NOTIFICATION INTEGRATION S3_CCDA_EVENT
  TYPE = QUEUE
  DIRECTION = 'OUTBOUND'
  ENABLED = TRUE
  NOTIFICATION_PROVIDER = 'AWS_SNS'
  AWS_SNS_TOPIC_ARN = 'arn:aws:sns:us-east-1:518729167346:SANKET_BAGUL_CCDA_S3_SNS'
  AWS_SNS_ROLE_ARN = 'arn:aws:iam::518729167346:role/da-batch2-group1-role';
  
-- DESC S3_CCDA_EVENT;


In [None]:
DESC INTEGRATION S3_CCDA_EVENT;

In [None]:

CREATE OR REPLACE TABLE FINAL_ASSIGNMENT.CCDA.CCDA_BRONZE (
  FILE_NAME        STRING,
  DOC              VARIANT,              -- parsed XML as VARIANT
  LOAD_TS          TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),

  -- Parser-enriched patient/document context
  PATIENT_ID       STRING,
  DOCUMENT_ID      STRING,
  PATIENT_IDS_ALL  VARIANT,              -- JSON array of header IDs

  -- Optional lineage
  SOURCE_SYSTEM    STRING,               -- e.g., 'CCDA'
  RECORD_TYPE      STRING                -- e.g., 'medications', 'allergies', etc. (if you fan-out directly)
);


In [None]:
-- Link Integration to Snowpipe
-- A native Bronze table for CCDA XML
CREATE OR REPLACE TABLE CCDA_FINAL_ASSIGNMENT.CCDA.CCDA_BRONZE (
  FILE_NAME STRING,
  DOC       VARIANT,            -- parsed XML as VARIANT
  LOAD_TS   TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);


-- CREATE OR REPLACE PIPE FINAL_ASSIGNMENT.CCDA.CCDA_pipe
--   AUTO_INGEST = TRUE
-- AS
COPY INTO CCDA_FINAL_ASSIGNMENT.CCDA.CCDA_BRONZE (FILE_NAME, DOC)
FROM (
  SELECT METADATA$FILENAME, PARSE_XML($1)            
  FROM @CCDA_FINAL_ASSIGNMENT.CCDA.CCDA_XML_STAGE
  ( FILE_FORMAT => CCDA_FINAL_ASSIGNMENT.CCDA.FF_XML, PATTERN => '.*\.xml' )
)
ON_ERROR = 'CONTINUE';


In [None]:
DESC PIPE CCDA_FINAL_ASSIGNMENT.ccda.CCDA_PIPE; 

-- notification channel: arn:aws:sqs:us-east-1:846206542736:sf-snowpipe-AIDA4KBOXEOICMEARAYA4-PvSmKWm7xdr_3SGlGj-zsg



In [None]:
-- USE SCHEMA   FINAL_ASSIGNMENT.PUBLIC;
-- Check pipes
-- SHOW PIPES;


-- -- View recent load history
-- SELECT * FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
--   table_name => 'CCDA_BRONZE',
--   start_time => DATEADD('hour', -24, CURRENT_TIMESTAMP())
-- ));

-- Check notifications ingestion status
SELECT SYSTEM$PIPE_STATUS('CCDA_pipe');

-- -- Manually force a re-check (rarely needed)
-- ALTER PIPE FINAL_ASSIGNMENT.PUBLIC.CCDA_PIPE REFRESH;


-- SELECT * FROM TABLE(INFORMATION_SCHEMA.PIPE_USAGE_HISTORY(
--     PIPE_NAME => 'FINAL_ASSIGNMENT.PUBLIC.CCDA_PIPE',
--     START_TIME => DATEADD('hour', -1, CURRENT_TIMESTAMP())
-- ));


In [None]:
SELECT * FROM CCDA_FINAL_ASSIGNMENT.CCDA.CCDA_BRONZE;
-- 1) Do we have any rows at all?
-- SELECT COUNT(*) AS bronze_rows FROM FINAL_ASSIGNMENT.CCDA.CCDA_BRONZE;
-- 
-- -- 2) What does FILE_NAME look like in Bronze?
-- SELECT DISTINCT FILE_NAME FROM FINAL_ASSIGNMENT.CCDA.CCDA_BRONZE ORDER BY 1 LIMIT 50;

-- -- 3) Does our hardcoded file exist (try fuzzy)?
-- SELECT FILE_NAME
-- FROM CCDA_BRONZE
-- -- WHERE FILE_NAME ILIKE '%McGlynn426%'  -- adjust token
-- LIMIT 20;


In [None]:

CREATE OR REPLACE STREAM FINAL_ASSIGNMENT.CCDA.CCDA_BRONZE_STREAM
  ON TABLE FINAL_ASSIGNMENT.CCDA.CCDA_BRONZE
  APPEND_ONLY = TRUE;


In [None]:
--Create Silver Schema & Tables (Snowflake)

USE DATABASE CCDA_FINAL_ASSIGNMENT;
USE SCHEMA CCDA_FINAL_ASSIGNMENT.CCDA;


CREATE OR REPLACE TABLE CCDA_MEDICATIONS (
  FILE_NAME    STRING,
  START_RAW    STRING,
  STOP_RAW     STRING,
  START_ISO    TIMESTAMP_NTZ,
  STOP_ISO     TIMESTAMP_NTZ,
  DESCRIPTION  STRING,
  CODE_SYSTEM  STRING,
  CODE         STRING,

  PATIENT_ID       STRING,
  DOCUMENT_ID      STRING,
  PATIENT_IDS_ALL  VARIANT,
  SOURCE_SYSTEM    STRING,
  RECORD_TYPE      STRING
);


CREATE OR REPLACE TABLE CCDA_VITALS (
  FILE_NAME    STRING,
  START_RAW    STRING,
  START_ISO    TIMESTAMP_NTZ,
  DESCRIPTION  STRING,
  CODE_SYSTEM  STRING,
  CODE         STRING,
  VALUE        STRING,
  UNIT         STRING,

  PATIENT_ID       STRING,
  DOCUMENT_ID      STRING,
  PATIENT_IDS_ALL  VARIANT,
  SOURCE_SYSTEM    STRING,
  RECORD_TYPE      STRING
);


CREATE OR REPLACE TABLE CCDA_RESULTS (
  FILE_NAME    STRING,
  START_RAW    STRING,
  START_ISO    TIMESTAMP_NTZ,
  DESCRIPTION  STRING,
  CODE_SYSTEM  STRING,
  CODE         STRING,
  VALUE        STRING,
  UNIT         STRING,

  PATIENT_ID       STRING,
  DOCUMENT_ID      STRING,
  PATIENT_IDS_ALL  VARIANT,
  SOURCE_SYSTEM    STRING,
  RECORD_TYPE      STRING
);


CREATE OR REPLACE TABLE CCDA_PROBLEMS (
  FILE_NAME    STRING,
  START_RAW    STRING,
  STOP_RAW     STRING,
  START_ISO    TIMESTAMP_NTZ,
  STOP_ISO     TIMESTAMP_NTZ,
  DESCRIPTION  STRING,
  CODE_SYSTEM  STRING,
  CODE         STRING,

  PATIENT_ID       STRING,
  DOCUMENT_ID      STRING,
  PATIENT_IDS_ALL  VARIANT,
  SOURCE_SYSTEM    STRING,
  RECORD_TYPE      STRING
);


CREATE OR REPLACE TABLE CCDA_PROCEDURES (
  FILE_NAME    STRING,
  START_RAW    STRING,
  STOP_RAW     STRING,
  START_ISO    TIMESTAMP_NTZ,
  STOP_ISO     TIMESTAMP_NTZ,
  DESCRIPTION  STRING,
  CODE_SYSTEM  STRING,
  CODE         STRING,

  PATIENT_ID       STRING,
  DOCUMENT_ID      STRING,
  PATIENT_IDS_ALL  VARIANT,
  SOURCE_SYSTEM    STRING,
  RECORD_TYPE      STRING
);


CREATE OR REPLACE TABLE CCDA_ENCOUNTERS (
  FILE_NAME    STRING,
  START_RAW    STRING,
  STOP_RAW     STRING,
  START_ISO    TIMESTAMP_NTZ,
  STOP_ISO     TIMESTAMP_NTZ,
  DESCRIPTION  STRING,
  CODE_SYSTEM  STRING,
  CODE         STRING,

  PATIENT_ID       STRING,
  DOCUMENT_ID      STRING,
  PATIENT_IDS_ALL  VARIANT,
  SOURCE_SYSTEM    STRING,
  RECORD_TYPE      STRING
);



CREATE OR REPLACE TABLE CCDA_IMMUNIZATIONS (
  FILE_NAME    STRING,
  START_RAW    STRING,
  START_ISO    TIMESTAMP_NTZ,
  DESCRIPTION  STRING,
  CODE_SYSTEM  STRING,
  CODE         STRING,

  PATIENT_ID       STRING,
  DOCUMENT_ID      STRING,
  PATIENT_IDS_ALL  VARIANT,
  SOURCE_SYSTEM    STRING,
  RECORD_TYPE      STRING
);


CREATE OR REPLACE TABLE CCDA_FUNCTIONAL_STATUS (
  FILE_NAME    STRING,
  START_RAW    STRING,
  START_ISO    TIMESTAMP_NTZ,
  DESCRIPTION  STRING,
  CODE_SYSTEM  STRING,
  CODE         STRING,
  VALUE        STRING,
  UNIT         STRING,

  PATIENT_ID       STRING,
  DOCUMENT_ID      STRING,
  PATIENT_IDS_ALL  VARIANT,
  SOURCE_SYSTEM    STRING,
  RECORD_TYPE      STRING
);

CREATE OR REPLACE TABLE CCDA_ALLERGIES (
  FILE_NAME              STRING,
  START_RAW              STRING,
  STOP_RAW               STRING,
  START_ISO              TIMESTAMP_NTZ,
  STOP_ISO               TIMESTAMP_NTZ,
  SUBSTANCE_DESC         STRING,
  SUBSTANCE_CODE_SYSTEM  STRING,
  SUBSTANCE_CODE         STRING,
  REACTION_DESC          STRING,
  REACTION_CODE_SYSTEM   STRING,
  REACTION_CODE          STRING,
  SEVERITY               STRING,

  PATIENT_ID       STRING,
  DOCUMENT_ID      STRING,
  PATIENT_IDS_ALL  VARIANT,
  SOURCE_SYSTEM    STRING,
  RECORD_TYPE      STRING
);


CREATE OR REPLACE TABLE CCDA_PLAN_OF_CARE (
  FILE_NAME    STRING,
  START_RAW    STRING,
  STOP_RAW     STRING,
  START_ISO    TIMESTAMP_NTZ,
  STOP_ISO     TIMESTAMP_NTZ,
  DESCRIPTION  STRING,
  CODE_SYSTEM  STRING,
  CODE         STRING,

  PATIENT_ID       STRING,
  DOCUMENT_ID      STRING,
  PATIENT_IDS_ALL  VARIANT,
  SOURCE_SYSTEM    STRING,
  RECORD_TYPE      STRING
);


CREATE OR REPLACE TABLE CCDA_SOCIAL_HISTORY (
  FILE_NAME    STRING,
  START_RAW    STRING,
  STOP_RAW     STRING,
  START_ISO    TIMESTAMP_NTZ,
  STOP_ISO     TIMESTAMP_NTZ,
  DESCRIPTION  STRING,
  CODE_SYSTEM  STRING,
  CODE         STRING,
  VALUE        STRING,
  UNIT         STRING,

  PATIENT_ID       STRING,
  DOCUMENT_ID      STRING,
  PATIENT_IDS_ALL  VARIANT,
  SOURCE_SYSTEM    STRING,
  RECORD_TYPE      STRING
);



CREATE OR REPLACE TABLE CCDA_INGEST_MANIFEST (
  FILE_NAME                 STRING,
  STATUS                    STRING,
  REASON                    STRING,
  COUNT_MEDICATIONS         NUMBER,
  COUNT_VITALS              NUMBER,
  COUNT_RESULTS             NUMBER,
  COUNT_PROBLEMS            NUMBER,
  COUNT_PROCEDURES          NUMBER,
  COUNT_ENCOUNTERS          NUMBER,
  COUNT_IMMUNIZATIONS       NUMBER,
  COUNT_FUNCTIONAL_STATUS   NUMBER,
  COUNT_ALLERGIES           NUMBER,
  COUNT_PLAN_OF_CARE        NUMBER,
  COUNT_SOCIAL_HISTORY      NUMBER,
  PROCESSED_AT              TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),

  -- Optional if each CCDA file maps to one patient (common):
  PATIENT_ID                STRING,
  DOCUMENT_ID               STRING
);

In [None]:
SHOW TABLES IN CCDA_FINAL_ASSIGNMENT.CCDA;

In [None]:

# Step 5 (Complete): Create a reusable CCDA parser package in the Notebook filesystem

from pathlib import Path

pkg_dir = Path("ccda_parser")
pkg_dir.mkdir(exist_ok=True)

# --------------------------
# __init__.py
# --------------------------
(pkg_dir / "__init__.py").write_text(
    '''"""
Reusable C-CDA Parser

Parses CCDA XML (urn:hl7-org:v3) into tidy pandas DataFrames for Medications,
Diagnostic Results, Problems, Procedures, Encounters, Vital Signs, Immunizations,
and Functional Status.
"""
__version__ = "0.1.0"
''',
    encoding="utf-8"
)

# --------------------------
# parser.py â€” core utilities + extractors
# --------------------------
parser_py = r'''
import os
import xml.etree.ElementTree as ET
from datetime import datetime, timezone, timedelta
import pandas as pd


# HL7 namespaces
ns = {"cda": "urn:hl7-org:v3", "sdtc": "urn:hl7-org:sdtc"}


# Configurable default timezone offset (e.g., +0530)
DEFAULT_TZ = os.getenv("CCDA_DEFAULT_TZ", "+0530")


# -------- Utilities --------

def parse_hl7_ts(ts: str):
    """
    Convert HL7 TS 'YYYYMMDDHHMMSS[Â±ZZZZ|Z]?' to datetime (tz-aware if offset exists).
    - If only 14 chars (YYYYMMDDHHMMSS), no embedded offset.
    - If more than 14 chars, the remainder may be 'Z' or 'Â±ZZZZ'.
    """
    if not ts:
        return None
    base = ts[:14]
    offset = ts[14:] if len(ts) > 14 else None
    try:
        dt = datetime.strptime(base, "%Y%m%d%H%M%S")
    except Exception:
        return None

    # Apply embedded offset if present
    if offset:
        if offset.upper() == "Z":
            return dt.replace(tzinfo=timezone.utc)
        if offset[0] in "+-":
            try:
                hours = int(offset[1:3]); mins = int(offset[3:5])
                delta = timedelta(hours=hours, minutes=mins)
                if offset.startswith("-"):
                    delta = -delta
                return dt.replace(tzinfo=timezone(delta))
            except Exception:
                # Fall through to default offset if malformed
                pass
    # No embedded offset: return naive for now
    return dt


def to_iso(ts: str, tz_offset: str = DEFAULT_TZ) -> str:
    """
    Convert HL7 TS to ISO-8601 string.
    - If TS has embedded offset, use it.
    - Else apply configurable default tz_offset (e.g., '+0530').
    """
    if not ts:
        return ""
    dt = parse_hl7_ts(ts)
    if not dt:
        return ""

    # If no tzinfo and we want a default offset, apply it
    if dt.tzinfo is None and tz_offset:
        try:
            hours = int(tz_offset[1:3]); mins = int(tz_offset[3:5])
            delta = timedelta(hours=hours, minutes=mins)
            if tz_offset.startswith("-"):
                delta = -delta
            dt = dt.replace(tzinfo=timezone(delta))
        except Exception:
            # If tz_offset is malformed, keep naive
            pass

    return dt.isoformat()


def _gettext(el):
    return (el.text or "").strip() if el is not None else ""


def validate_ccda_xml(text: str):
    try:
        root = ET.fromstring(text)
    except Exception as e:
        return False, f"XML parse failed: {e}"
    if not root.tag.endswith("ClinicalDocument"):
        return False, "Root element is not ClinicalDocument."
    return True, "OK"


def discover_sections(root):
    sections = []
    for sec in root.findall(".//cda:structuredBody/cda:component/cda:section", ns):
        title = _gettext(sec.find("cda:title", ns))
        code_el = sec.find("cda:code", ns)
        sec_code = code_el.get("code") if code_el is not None else None
        sec_code_system = code_el.get("codeSystem") if code_el is not None else None
        sections.append({"title": title, "code": sec_code, "codeSystem": sec_code_system, "el": sec})
    return sections


# ==== Helpers for narrative parsing (put near other utilities) ====

def _all_text(el):
    if el is None:
        return ""
    return "".join(el.itertext()).strip()

def _split_code_and_system(text):
    t = (text or "").strip()
    if not t:
        return "", ""
    parts = t.split()
    if len(parts) >= 2 and parts[0].startswith("http"):
        return parts[0], parts[-1]
    return "", t


def _parse_table_rows_from_section(sec):
    """
    Parse the first XHTML <table> under <cda:text>. Returns:
    (headers_lc: list[str], rows: list[list[str]]).
    Handles both namespaced and un-namespaced narrative tables.
    """
    text_el = sec.find("./cda:text", ns)
    if text_el is None:
        return [], []

    # Try namespaced <table> first, then non-namespaced fallback
    tables = text_el.findall(".//cda:table", ns)
    if not tables:
        tables = text_el.findall(".//table")

    if not tables:
        return [], []

    table = tables[0]

    # --- Headers ---
    headers = []
    thead = table.find("./cda:thead", ns)
    if thead is None:
        thead = table.find("./thead")
    if thead is not None:
        tr = thead.find("./cda:tr", ns) or thead.find("./tr")
        if tr is not None:
            ths = tr.findall("./cda:th", ns)
            if not ths:
                ths = tr.findall("./th")
            headers = ["".join(th.itertext()).strip().lower() for th in ths]

    # --- Body rows ---
    body_rows = []
    tbody = table.find("./cda:tbody", ns) or table.find("./tbody")
    if tbody is not None:
        trs = tbody.findall("./cda:tr", ns)
        if not trs:
            trs = tbody.findall("./tr")
        for tr in trs:
            tds = tr.findall("./cda:td", ns)
            if not tds:
                tds = tr.findall("./td")
            cells = ["".join(td.itertext()).strip() for td in tds]
            if cells and any(c.strip() for c in cells):
                body_rows.append(cells)
    else:
        # No <tbody>, read all <tr> directly
        trs = table.findall("./cda:tr", ns)
        if not trs:
            trs = table.findall("./tr")
        # If no explicit headers, use first row as header
        if not headers and trs:
            first_tds = trs[0].findall("./cda:td", ns) or trs[0].findall("./td")
            headers = ["".join(td.itertext()).strip().lower() for td in first_tds]
            trs = trs[1:]
        for tr in trs:
            tds = tr.findall("./cda:td", ns) or tr.findall("./td")
            cells = ["".join(td.itertext()).strip() for td in tds]
            if cells and any(c.strip() for c in cells):
                body_rows.append(cells)

    return headers, body_rows


def _col_index(headers, name_variants):
    """
    Find column index by matching any variant (case-insensitive).
    name_variants: iterable of possible header names like ["start", "start date"].
    """
    hlc = [h.strip().lower() for h in headers]
    for nv in name_variants:
        nv_l = nv.strip().lower()
        if nv_l in hlc:
            return hlc.index(nv_l)
    return None\


def get_start_stop(eff_el):
    """
    Read start/stop from <effectiveTime> supporting:
      - <effectiveTime value="..."/>
      - <effectiveTime><low value="..."/><high value="..."/></effectiveTime>
    Returns (start_raw, stop_raw)
    """
    if eff_el is None:
        return ("", "")
    start = eff_el.attrib.get("value", "")
    low = eff_el.find("cda:low", ns)
    high = eff_el.find("cda:high", ns)
    if not start and low is not None:
        start = low.attrib.get("value", "")
    stop = high.attrib.get("value", "") if high is not None else ""
    return (start, stop)


# Extend to_iso to handle ISO strings with timezone (from narrative tables)
def to_iso(ts: str) -> str:
    """Return ISO 8601 string or '' if ts invalid."""
    if not ts:
        return ""
    # Try HL7 compact TS first
    dt = parse_hl7_ts(ts)
    if dt:
        return dt.isoformat()
    # Try Python ISO parser (supports Â±HH:MM)
    try:
        # Remove timezone for Silver NTZ consistency
        from datetime import datetime
        dt2 = datetime.fromisoformat(ts)
        return dt2.replace(tzinfo=None).isoformat()
    except Exception:
        return ""


# -------- Extractors --------


# --- Patient / Document header helpers (add to parser.py utilities) ---
import json
import hashlib

def extract_patient_identifiers(root: ET.Element) -> dict:
    """
    Extract patient identifiers from CCDA header.
    Returns: {'patient_id': <str>, 'patient_ids_all': <json-string>}
    Strategy:
      - Prefer an <id> with @extension (e.g., MRN) and @assigningAuthorityName if present.
      - Else first <id> with @extension.
      - Else fall back to @root or a deterministic hash surrogate.
    """
    ns = {"cda": "urn:hl7-org:v3"}
    ids = root.findall('.//cda:recordTarget/cda:patientRole/cda:id', ns)

    all_ids = []
    for id_node in ids:
        all_ids.append({
            "root": id_node.attrib.get("root"),
            "extension": id_node.attrib.get("extension"),
            "assigningAuthorityName": id_node.attrib.get("assigningAuthorityName")
        })

    chosen = None
    for item in all_ids:
        if item.get("extension") and item.get("assigningAuthorityName"):
            chosen = item["extension"]
            break
    if not chosen:
        for item in all_ids:
            if item.get("extension"):
                chosen = item["extension"]
                break

    if not chosen:
        # if no extension, try root; else surrogate from header bytes
        chosen = (all_ids[0].get("root") if all_ids else None)
    if not chosen:
        sample = ET.tostring(root, encoding="utf-8", method="xml")[:1024]
        chosen = hashlib.md5(sample).hexdigest()

    return {
        "patient_id": chosen,
        "patient_ids_all": json.dumps(all_ids, ensure_ascii=False)
    }

def extract_document_id(root: ET.Element) -> str:
    """
    Extract ClinicalDocument/id as a stable document identifier for lineage.
    Prefer @extension; fallback to @root; else hash surrogate.
    """
    ns = {"cda": "urn:hl7-org:v3"}
    doc_id = root.find('./cda:id', ns)
    if doc_id is not None:
        ext = doc_id.attrib.get('extension')
        if ext:
            return ext
        root_oid = doc_id.attrib.get('root')
        if root_oid:
            return root_oid
    sample = ET.tostring(root, encoding="utf-8", method="xml")[:1024]
    return hashlib.md5(sample).hexdigest()


def extract_medications(sections):
    """
    Returns DataFrame columns:
    file_name (to be added upstream), start_raw, stop_raw, start_iso, stop_iso,
    description, code_system, code
    """
    rows = []
    for s in sections:
        if s["title"].lower().startswith("medication"):
            sec = s["el"]
            for entry in sec.findall(".//cda:entry", ns):
                sa = entry.find(".//cda:substanceAdministration", ns)
                if sa is None:
                    continue
                start = stop = ""
                low = sa.find(".//cda:low", ns)
                high = sa.find(".//cda:high", ns)
                if low is not None:  start = low.get("value","")
                if high is not None: stop  = high.get("value","")
                desc, code_system, code_val = "", "", ""
                prod_code = sa.find(".//cda:consumable//cda:manufacturedProduct//cda:manufacturedMaterial//cda:code", ns)
                if prod_code is not None:
                    desc = prod_code.get("displayName","") or _gettext(prod_code.find("cda:originalText", ns))
                    code_system = prod_code.get("codeSystemName","") or prod_code.get("codeSystem","")
                    code_val = prod_code.get("code","")
                else:
                    name_el = sa.find(".//cda:consumable//cda:manufacturedProduct//cda:manufacturedMaterial//cda:name", ns)
                    desc = _gettext(name_el)
                rows.append({
                    "start_raw": start, "stop_raw": stop,
                    "start_iso": to_iso(start), "stop_iso": to_iso(stop),
                    "description": desc, "code_system": code_system, "code": code_val
                })
    return pd.DataFrame(rows)


def extract_results(sections):
    """Diagnostic Results: start_raw/start_iso, description, LOINC code, value, unit"""
    rows = []
    for s in sections:
        if "result" in (s["title"] or "").lower():
            sec = s["el"]
            for obs in sec.findall(".//cda:observation", ns):
                code = obs.find("cda:code", ns)
                desc  = code.get("displayName", "") if code is not None else ""
                loinc = code.get("code", "")        if code is not None else ""
                eff = obs.find("cda:effectiveTime", ns)
                start_raw, stop_raw = get_start_stop(eff)
                val_el = obs.find("cda:value", ns)
                value = val_el.get("value", "") if val_el is not None else ""
                unit  = val_el.get("unit", "")  if val_el is not None else ""
                rows.append({
                    "start_raw": start_raw,
                    "start_iso": to_iso(start_raw),
                    "description": desc,
                    "code_system": "LOINC",
                    "code": loinc,
                    "value": value,
                    "unit": unit
                })
    return pd.DataFrame(rows)

def extract_problems(sections):
    """Problems: start/stop raw+iso, description, code_system (SNOMED), code"""
    rows = []
    for s in sections:
        if "problem" in s["title"].lower():
            sec = s["el"]
            for obs in sec.findall(".//cda:observation", ns):
                start = stop = ""
                eff = obs.find("cda:effectiveTime", ns)
                if eff is not None:
                    low = eff.find("cda:low", ns); high = eff.find("cda:high", ns)
                    start = low.get("value","") if low is not None else ""
                    stop  = high.get("value","") if high is not None else ""
                val_code = obs.find("cda:value", ns)
                desc = val_code.get("displayName","") if val_code is not None else ""
                code_val = val_code.get("code","") if val_code is not None else ""
                code_system = val_code.get("codeSystemName","") if val_code is not None else ""
                rows.append({
                    "start_raw": start, "stop_raw": stop,
                    "start_iso": to_iso(start), "stop_iso": to_iso(stop),
                    "description": desc, "code_system": code_system, "code": code_val
                })
    return pd.DataFrame(rows)


def extract_procedures(sections):
    """Procedures/Surgeries: start/stop raw+iso, description, code_system, code"""
    rows = []
    for s in sections:
        title = (s.get("title") or "").lower()
        if ("procedure" in title) or ("surger" in title):
            sec = s["el"]
            for proc in sec.findall(".//cda:procedure", ns):
                # Effective time: support value attribute & low/high children
                eff = proc.find("cda:effectiveTime", ns)
                start_raw, stop_raw = get_start_stop(eff)

                # Procedure code
                code_el = proc.find("cda:code", ns)
                desc = code_el.get("displayName", "") if code_el is not None else ""
                if not desc and code_el is not None:
                    desc = _gettext(code_el.find("cda:originalText", ns))
                code_val    = code_el.get("code", "")          if code_el is not None else ""
                code_system = code_el.get("codeSystemName", "") if code_el is not None else ""

                rows.append({
                    "start_raw": start_raw,
                    "stop_raw":  stop_raw,
                    "start_iso": to_iso(start_raw),
                    "stop_iso":  to_iso(stop_raw),
                    "description": desc,
                    "code_system": code_system,
                    "code": code_val
                })
    return pd.DataFrame(rows)

def extract_encounters(sections):
    """Encounters: start/stop raw+iso, description, code_system, code"""
    rows = []
    for s in sections:
        if "encounter" in s["title"].lower():
            sec = s["el"]
            for enc in sec.findall(".//cda:encounter", ns):
                start = stop = ""
                eff = enc.find("cda:effectiveTime", ns)
                if eff is not None:
                    low = eff.find("cda:low", ns); high = eff.find("cda:high", ns)
                    start = low.get("value","") if low is not None else ""
                    stop  = high.get("value","") if high is not None else ""
                code_el = enc.find("cda:code", ns)
                desc = code_el.get("displayName","") if code_el is not None else ""
                code_val = code_el.get("code","") if code_el is not None else ""
                code_system = code_el.get("codeSystemName","") if code_el is not None else ""
                rows.append({
                    "start_raw": start, "stop_raw": stop,
                    "start_iso": to_iso(start), "stop_iso": to_iso(stop),
                    "description": desc, "code_system": code_system, "code": code_val
                })
    return pd.DataFrame(rows)


def extract_vitals(sections):
    """Vitals: start_raw/start_iso, description, LOINC code, value, unit"""
    rows = []
    for s in sections:
        if "vital" in (s["title"] or "").lower():
            sec = s["el"]
            for obs in sec.findall(".//cda:observation", ns):
                code_el = obs.find("cda:code", ns)
                desc = code_el.get("displayName", "") if code_el is not None else ""
                loinc = code_el.get("code", "") if code_el is not None else ""
                eff = obs.find("cda:effectiveTime", ns)
                start_raw, stop_raw = get_start_stop(eff)
                val_el = obs.find("cda:value", ns)
                value = val_el.get("value", "") if val_el is not None else ""
                unit  = val_el.get("unit", "")  if val_el is not None else ""
                rows.append({
                    "start_raw": start_raw,
                    "start_iso": to_iso(start_raw),
                    "description": desc,
                    "code_system": "LOINC",
                    "code": loinc,
                    "value": value,
                    "unit": unit
                })
    return pd.DataFrame(rows)


def extract_immunizations(sections):
    """Immunizations: start_raw/start_iso, description, code_system (CVX), code"""
    rows = []
    for s in sections:
        if "immunization" in (s["title"] or "").lower():
            sec = s["el"]
            for sa in sec.findall(".//cda:substanceAdministration", ns):
                eff = sa.find("cda:effectiveTime", ns)
                start_raw, stop_raw = get_start_stop(eff)
                prod_code = sa.find(".//cda:consumable//cda:manufacturedProduct//cda:manufacturedMaterial//cda:code", ns)
                desc = prod_code.get("displayName", "") if prod_code is not None else ""
                code_val = prod_code.get("code", "")     if prod_code is not None else ""
                code_system = prod_code.get("codeSystemName", "") if prod_code is not None else ""
                rows.append({
                    "start_raw": start_raw,
                    "start_iso": to_iso(start_raw),
                    "description": desc,
                    "code_system": code_system,
                    "code": code_val
                })
    return pd.DataFrame(rows)


def extract_functional_status(sections):
    """Functional Status: start_raw/start_iso, description, LOINC code, value, unit"""
    rows = []
    for s in sections:
        if "functional" in (s["title"] or "").lower():
            sec = s["el"]
            for obs in sec.findall(".//cda:observation", ns):
                code_el = obs.find("cda:code", ns)
                desc = code_el.get("displayName", "") if code_el is not None else ""
                code_val = code_el.get("code", "")    if code_el is not None else ""
                val_el = obs.find("cda:value", ns)
                value = val_el.get("value", "") if val_el is not None else ""
                unit  = val_el.get("unit", "")  if val_el is not None else ""
                eff = obs.find("cda:effectiveTime", ns)
                start_raw, stop_raw = get_start_stop(eff)
                rows.append({
                    "start_raw": start_raw,
                    "start_iso": to_iso(start_raw),
                    "description": desc,
                    "code_system": "LOINC",
                    "code": code_val,
                    "value": value,
                    "unit": unit
                })
    return pd.DataFrame(rows)



# --- Allergies / Adverse Events ---

def extract_allergies(sections):
    """
    Allergies & Adverse Reactions:
    Returns columns:
    start_raw, stop_raw, start_iso, stop_iso,
    substance_desc, substance_code_system, substance_code,
    reaction_desc, reaction_code_system, reaction_code, severity
    """
    import pandas as pd
    rows = []

    def _emit_row(start, stop, desc, code_text):
        cs, cd = _split_code_and_system(code_text)
        rows.append({
            "start_raw": start or "",
            "stop_raw":  stop or "",
            "start_iso": to_iso(start or ""),
            "stop_iso":  to_iso(stop or ""),
            "substance_desc": desc or "",
            "substance_code_system": cs or "",
            "substance_code": cd or "",
            # Narrative tables rarely include reaction/severity explicitly
            "reaction_desc": "",
            "reaction_code_system": "",
            "reaction_code": "",
            "severity": ""
        })

    for s in sections:
        title = (s.get("title") or "").lower()
        code  = (s.get("code") or "").strip()
        # match by title or LOINC section code
        if ("allerg" in title) or (code in {"48765-2", "29299-5"}):
            sec = s["el"]
            # Structured entries (preferred)
            for entry in sec.findall("./cda:entry", ns):
                node = entry.find("./cda:act", ns) or entry.find("./cda:observation", ns)
                if node is None:
                    continue
                # effectiveTime
                start = stop = ""
                eff = node.find("./cda:effectiveTime", ns)
                if eff is not None:
                    low = eff.find("cda:low", ns); high = eff.find("cda:high", ns)
                    start = low.get("value","") if low is not None else (eff.get("value","") or "")
                    stop  = high.get("value","") if high is not None else ""
                # substance from participant
                desc = code_text = ""
                pe = node.find(".//cda:participant/cda:participantRole/cda:playingEntity", ns)
                if pe is not None:
                    pcode = pe.find("./cda:code", ns)
                    if pcode is not None:
                        desc = pcode.get("displayName","") or _gettext(pcode.find("cda:originalText", ns))
                        code_text = pcode.get("code","")
                    if not desc:
                        desc = _gettext(pe.find("./cda:name", ns))
                _emit_row(start, stop, desc, code_text)

            # Narrative fallback if no <entry>
            if not rows:
                headers, table_rows = _parse_table_rows_from_section(sec)
                if headers and table_rows:
                    i_start = _col_index(headers, ["start", "start date", "start time"])
                    i_stop  = _col_index(headers, ["stop", "stop date", "stop time", "end"])
                    i_desc  = _col_index(headers, ["description", "substance", "allergen"])
                    i_code  = _col_index(headers, ["code", "code / system", "code system"])
                    for cells in table_rows:
                        start = cells[i_start] if i_start is not None and i_start < len(cells) else ""
                        stop  = cells[i_stop]  if i_stop  is not None and i_stop  < len(cells) else ""
                        desc  = cells[i_desc]  if i_desc  is not None and i_desc  < len(cells) else ""
                        code_text = cells[i_code] if i_code is not None and i_code < len(cells) else ""
                        # In your sample: 'http://snomed.info/sct 419199007'
                        _emit_row(start, stop, desc, code_text)

    return pd.DataFrame(rows)


# --- Plan of Care ---

def extract_plan_of_care(sections):
    """
    Plan of Care entries: start/stop, description, code_system, code.
    Supports structured <entry><act|observation> and narrative table/list fallback.
    """
    import pandas as pd
    rows = []

    def _emit_row(start, stop, desc, code_text):
        cs, cd = _split_code_and_system(code_text)
        rows.append({
            "start_raw": start or "",
            "stop_raw":  stop or "",
            "start_iso": to_iso(start or ""),
            "stop_iso":  to_iso(stop or ""),
            "description": desc or "",
            "code_system": cs or "",
            "code": cd or ""
        })

    for s in sections:
        title = (s.get("title") or "").lower()
        if "plan of care" in title or "care plan" in title or title.strip() == "plan":
            sec = s["el"]

            # Structured entries
            for entry in sec.findall("./cda:entry", ns):
                node = entry.find("./cda:act", ns) or entry.find("./cda:observation", ns)
                if node is None:
                    continue

                # Effective time: support @value and low/high
                eff = node.find("./cda:effectiveTime", ns)
                start_raw, stop_raw = get_start_stop(eff)

                code_el = node.find("./cda:code", ns)
                desc = code_el.get("displayName", "") if code_el is not None else ""
                if not desc and code_el is not None:
                    desc = _gettext(code_el.find("cda:originalText", ns))
                code_text = code_el.get("code", "") if code_el is not None else ""

                _emit_row(start_raw, stop_raw, desc, code_text)

            # Narrative fallback (table or list)
            if not rows:
                headers, table_rows = _parse_table_rows_from_section(sec)
                if headers and table_rows:
                    i_start = _col_index(headers, ["start", "start date", "start time"])
                    i_stop  = _col_index(headers, ["stop", "stop date", "stop time", "end"])
                    i_desc  = _col_index(headers, ["description", "plan", "care plan", "activity"])
                    i_code  = _col_index(headers, ["code", "code / system", "code system"])
                    for cells in table_rows:
                        start = cells[i_start] if i_start is not None and i_start < len(cells) else ""
                        stop  = cells[i_stop]  if i_stop  is not None and i_stop  < len(cells) else ""
                        desc  = cells[i_desc]  if i_desc  is not None and i_desc  < len(cells) else ""
                        code_text = cells[i_code] if i_code is not None and i_code < len(cells) else ""
                        _emit_row(start, stop, desc, code_text)
                else:
                    text_el = sec.find("./cda:text", ns)
                    if text_el is not None:
                        for item in text_el.findall(".//item"):
                            _emit_row("", "", _all_text(item), "")

    return pd.DataFrame(rows)

# --- Social History ---

def extract_social_history(sections):
    """
    Social History observations: start/stop, description, code_system, code, value, unit.
    Handles structured <entry><observation> and narrative tables/lists.
    """
    import pandas as pd
    rows = []

    def _emit_row(start, stop, desc, code_text, value, unit):
        cs, cd = _split_code_and_system(code_text)
        rows.append({
            "start_raw": start or "",
            "stop_raw":  stop or "",
            "start_iso": to_iso(start or ""),
            "stop_iso":  to_iso(stop or ""),
            "description": desc or "",
            "code_system": cs or "",
            "code": cd or "",
            "value": value or "",
            "unit": unit or ""
        })

    for s in sections:
        title = (s.get("title") or "").lower()
        if "social" in title:
            sec = s["el"]

            # Structured entries
            for entry in sec.findall("./cda:entry", ns):
                obs = entry.find("./cda:observation", ns)
                if obs is None:
                    continue

                # Effective time: support @value and low/high
                eff = obs.find("./cda:effectiveTime", ns)
                start_raw, stop_raw = get_start_stop(eff)

                code_el = obs.find("./cda:code", ns)
                desc = code_el.get("displayName", "") if code_el is not None else ""
                if not desc and code_el is not None:
                    desc = _gettext(code_el.find("cda:originalText", ns))
                code_text = code_el.get("code", "") if code_el is not None else ""

                val_el = obs.find("./cda:value", ns)
                value = unit = ""
                if val_el is not None:
                    # Use displayName/code/value text as available
                    value = val_el.get("displayName", "") or val_el.get("code", "") or (val_el.text or "").strip()
                    unit  = val_el.get("unit", "") or ""

                _emit_row(start_raw, stop_raw, desc, code_text, value, unit)

            # Narrative fallback (table or list)
            if not rows:
                headers, table_rows = _parse_table_rows_from_section(sec)
                if headers and table_rows:
                    i_start = _col_index(headers, ["start", "start date", "start time"])
                    i_stop  = _col_index(headers, ["stop", "stop date", "stop time", "end"])
                    i_desc  = _col_index(headers, ["description", "item", "topic"])
                    i_code  = _col_index(headers, ["code", "code / system", "code system"])
                    i_value = _col_index(headers, ["value", "result", "answer"])
                    i_unit  = _col_index(headers, ["unit"])
                    for cells in table_rows:
                        start = cells[i_start] if i_start is not None and i_start < len(cells) else ""
                        stop  = cells[i_stop]  if i_stop  is not None and i_stop  < len(cells) else ""
                        desc  = cells[i_desc]  if i_desc  is not None and i_desc  < len(cells) else ""
                        code_text = cells[i_code] if i_code is not None and i_code < len(cells) else ""
                        value = cells[i_value] if i_value is not None and i_value < len(cells) else ""
                        unit  = cells[i_unit]  if i_unit  is not None and i_unit  < len(cells) else ""
                        _emit_row(start, stop, desc, code_text, value, unit)
                else:
                    # Narrative list fallback
                    text_el = sec.find("./cda:text", ns)
                    if text_el is not None:
                        for item in text_el.findall(".//item"):
                            _emit_row("", "", _all_text(item), "", "", "")

    return pd.DataFrame(rows)

'''
(pkg_dir / "parser.py").write_text(parser_py, encoding="utf-8")

# --------------------------
# parse_text.py â€” entrypoint to parse raw XML text
# --------------------------
parse_text_py = r'''
import xml.etree.ElementTree as ET
import pandas as pd
from .parser import (
    validate_ccda_xml, discover_sections,
    extract_medications, extract_results, extract_problems,
    extract_procedures, extract_encounters, extract_vitals,
    extract_immunizations, extract_functional_status,
    extract_allergies, extract_plan_of_care, extract_social_history,
    extract_patient_identifiers, extract_document_id
)



def parse_ccda_text(xml_text: str):
    """
    Entry point used by the pipeline:
      - Validates the CCDA XML text
      - Discovers sections
      - Runs extractors to build pandas DataFrames per domain
      - Returns (dfs_dict, meta_dict)

    Returns:
        dfs: dict[str, pd.DataFrame]
        meta: dict with keys:
              - status: "parsed" | "rejected"
              - reason: str
              - patient_id: str
              - patient_ids_all: json string
              - document_id: str
    """
    if not isinstance(xml_text, str) or not xml_text.strip():
        return ({}, {"status": "rejected", "reason": "empty input"})

    ok, reason = validate_ccda_xml(xml_text)
    if not ok:
        return ({}, {"status": "rejected", "reason": reason})

    # Parse XML and discover sections
    root = ET.fromstring(xml_text)
    sections = discover_sections(root)

    # Run extractors (each returns a pandas DataFrame)
    dfs = {
        "medications":        extract_medications(sections),
        "vitals":             extract_vitals(sections),
        "results":            extract_results(sections),
        "problems":           extract_problems(sections),
        "procedures":         extract_procedures(sections),
        "encounters":         extract_encounters(sections),
        "immunizations":      extract_immunizations(sections),
        "functional_status":  extract_functional_status(sections),
        "allergies":          extract_allergies(sections),
        "plan_of_care":       extract_plan_of_care(sections),
        "social_history":     extract_social_history(sections),
    }

    # Patient/document metadata for lineage
    pi   = extract_patient_identifiers(root)   # {'patient_id', 'patient_ids_all'}
    docid = extract_document_id(root)

    meta = {
        "status": "parsed",
        "reason": "OK",
        "patient_id":      pi.get("patient_id"),
        "patient_ids_all": pi.get("patient_ids_all"),
        "document_id":     docid,
    }

    return dfs, meta



'''
(pkg_dir / "parse_text.py").write_text(parse_text_py, encoding="utf-8")

print("âœ… Parser package created at:", pkg_dir.resolve())

from ccda_parser import __version__

In [None]:

# # plan_of_care_parser.py
# import xml.etree.ElementTree as ET
# from datetime import datetime, timezone, timedelta
# import pandas as pd

# # HL7 namespaces
# ns = {"cda": "urn:hl7-org:v3", "sdtc": "urn:hl7-org:sdtc"}

# # ---- Minimal utilities ----
# def _gettext(el):
#     return (el.text or "").strip() if el is not None else ""

# def _all_text(el):
#     if el is None:
#         return ""
#     return "".join(el.itertext()).strip()

# def _parse_table_rows_from_section(sec):
#     """Parse the first XHTML <table> under <cda:text>. Return (headers_lc, rows)."""
#     text_el = sec.find("./cda:text", ns)
#     if text_el is None:
#         return [], []

#     tables = text_el.findall(".//cda:table", ns)
#     if not tables:
#         tables = text_el.findall(".//table")
#     if not tables:
#         return [], []
#     table = tables[0]

#     # headers
#     headers = []
#     thead = table.find("./cda:thead", ns) or table.find("./thead")
#     if thead is not None:
#         tr = thead.find("./cda:tr", ns) or thead.find("./tr")
#         if tr is not None:
#             ths = tr.findall("./cda:th", ns)
#             if not ths:
#                 ths = tr.findall("./th")
#             headers = ["".join(th.itertext()).strip().lower() for th in ths]

#     # body rows
#     body_rows = []
#     tbody = table.find("./cda:tbody", ns) or table.find("./tbody")
#     if tbody is not None:
#         trs = tbody.findall("./cda:tr", ns) or tbody.findall("./tr")
#         for tr in trs:
#             tds = tr.findall("./cda:td", ns) or tr.findall("./td")
#             cells = ["".join(td.itertext()).strip() for td in tds]
#             if cells and any(c.strip() for c in cells):
#                 body_rows.append(cells)
#     else:
#         trs = table.findall("./cda:tr", ns) or table.findall("./tr")
#         if trs:
#             first_tds = trs[0].findall("./cda:td", ns) or trs[0].findall("./td")
#             if not headers:
#                 headers = ["".join(td.itertext()).strip().lower() for td in first_tds]
#                 trs = trs[1:]
#         for tr in trs:
#             tds = tr.findall("./cda:td", ns) or tr.findall("./td")
#             cells = ["".join(td.itertext()).strip() for td in tds]
#             if cells and any(c.strip() for c in cells):
#                 body_rows.append(cells)

#     return headers, body_rows

# def _col_index(headers, name_variants):
#     hlc = [h.strip().lower() for h in headers]
#     for nv in name_variants:
#         nv_l = nv.strip().lower()
#         if nv_l in hlc:
#             return hlc.index(nv_l)
#     return None

# def parse_hl7_ts(ts: str):
#     """
#     Convert HL7 TS 'YYYYMMDDHHMMSS[Â±ZZZZ]?' to datetime (tz-aware if offset exists).
#     """
#     if not ts:
#         return None
#     base = ts[:14]
#     offset = ts[14:] if len(ts) > 14 else None
#     try:
#         dt = datetime.strptime(base, "%Y%m%d%H%M%S")
#     except Exception:
#         return None
#     if offset:
#         if offset.upper() == "Z":
#             return dt.replace(tzinfo=timezone.utc)
#         if offset[0] in "+-":
#             try:
#                 hours = int(offset[1:3]); mins = int(offset[3:5])
#                 delta = timedelta(hours=hours, minutes=mins)
#                 if offset.startswith("-"):
#                     delta = -delta
#                 return dt.replace(tzinfo=timezone(delta))
#             except Exception:
#                 pass
#     return dt  # naive

# def to_iso(ts: str, default_tz="+0530") -> str:
#     """
#     Convert HL7 TS to ISO-8601 string.
#     If TS has embedded offset, use it; else apply default_tz (e.g., '+0530').
#     Also accept already ISO strings.
#     """
#     if not ts:
#         return ""
#     # HL7 compact first
#     dt = parse_hl7_ts(ts)
#     if dt:
#         if dt.tzinfo is None and default_tz:
#             try:
#                 hours = int(default_tz[1:3]); mins = int(default_tz[3:5])
#                 delta = timedelta(hours=hours, minutes=mins)
#                 if default_tz.startswith("-"):
#                     delta = -delta
#                 dt = dt.replace(tzinfo=timezone(delta))
#             except Exception:
#                 pass
#         return dt.isoformat()
#     # try Python ISO (Â±HH:MM)
#     try:
#         dt2 = datetime.fromisoformat(ts)
#         return dt2.isoformat()
#     except Exception:
#         return ""

# def get_start_stop(eff_el):
#     """
#     Read start/stop from <effectiveTime> supporting:
#     - <effectiveTime value="..."/>
#     - <effectiveTime><low value="..."/><high value="..."/></effectiveTime>
#     Returns (start_raw, stop_raw)
#     """
#     if eff_el is None:
#         return ("", "")
#     start = eff_el.attrib.get("value", "")
#     low = eff_el.find("cda:low", ns)
#     high = eff_el.find("cda:high", ns)
#     if not start and low is not None:
#         start = low.attrib.get("value", "")
#     stop = high.attrib.get("value", "") if high is not None else ""
#     return (start, stop)

# def _split_code_and_system(text):
#     """
#     For narrative cells that sometimes combine system+code as:
#       'http://snomed.info/sct 419199007'
#     Return (system_url, code_value) or ("", text) if only a code.
#     """
#     t = (text or "").strip()
#     if not t:
#         return "", ""
#     parts = t.split()
#     if len(parts) >= 2 and parts[0].startswith("http"):
#         return parts[0], parts[-1]
#     return "", t

# # ---- Core: Plan of Care only ----
# def parse_plan_of_care(text: str, *, default_tz="+0530") -> pd.DataFrame:
#     """
#     Parse ONLY the Plan of Treatment/Plan of Care section from a CCDA XML string.

#     Returns a DataFrame with columns:
#       start_raw, stop_raw, start_iso, stop_iso, description, code_system, code

#     It does not read or return any other sections.
#     """
#     # Validate XML
#     try:
#         root = ET.fromstring(text)
#     except Exception as e:
#         raise ValueError(f"XML parse failed: {e}")

#     if not root.tag.endswith("ClinicalDocument"):
#         raise ValueError("Root element is not ClinicalDocument.")

#     # Discover sections (both common paths to be safe)
#     sections = []
#     for sec in root.findall(".//cda:structuredBody/cda:component/cda:section", ns):
#         title = _gettext(sec.find("cda:title", ns))
#         code_el = sec.find("cda:code", ns)
#         sec_code = code_el.get("code") if code_el is not None else None
#         sections.append({"title": title, "code": sec_code, "el": sec})
#     for sec in root.findall(".//cda:component/cda:section", ns):
#         title = _gettext(sec.find("cda:title", ns))
#         code_el = sec.find("cda:code", ns)
#         sec_code = code_el.get("code") if code_el is not None else None
#         sections.append({"title": title, "code": sec_code, "el": sec})

#     rows = []

#     def _emit_row(start, stop, desc, code_text):
#         cs, cd = _split_code_and_system(code_text)
#         rows.append({
#             "start_raw": start or "",
#             "stop_raw": stop or "",
#             "start_iso": to_iso(start or "", default_tz),
#             "stop_iso": to_iso(stop or "", default_tz),
#             "description": (desc or "").strip(),
#             "code_system": cs or "",
#             "code": cd or ""
#         })

#     # Identify candidate Plan-of-Care sections (title, code 18776-5, templateId)
#     poc_secs = []
#     for s in sections:
#         title = (s.get("title") or "").lower()
#         code = (s.get("code") or "").strip()
#         sec_el = s["el"]
#         template_ids = [tid.attrib.get("root") for tid in sec_el.findall("./cda:templateId", ns)]
#         if (
#             "plan of care" in title
#             or "plan of treatment" in title
#             or "care plan" in title
#             or title.strip() == "plan"
#             or code == "18776-5"  # LOINC Plan of care note
#             or "2.16.840.1.113883.10.20.22.2.10" in template_ids  # Plan of Treatment Section (V2)
#         ):
#             poc_secs.append(sec_el)

#     for sec in poc_secs:
#         # Structured entries: act, procedure, observation (and nested organizer/component)
#         for entry in sec.findall("./cda:entry", ns):
#             node = (entry.find("./cda:act", ns)
#                     or entry.find("./cda:procedure", ns)
#                     or entry.find("./cda:observation", ns))
#             if node is None:
#                 org = entry.find("./cda:organizer", ns)
#                 if org is not None:
#                     for comp in org.findall("./cda:component", ns):
#                         node2 = (comp.find("./cda:act", ns)
#                                  or comp.find("./cda:procedure", ns)
#                                  or comp.find("./cda:observation", ns))
#                         if node2 is None:
#                             continue
#                         eff = node2.find("./cda:effectiveTime", ns)
#                         start_raw, stop_raw = get_start_stop(eff)
#                         code_el = node2.find("./cda:code", ns)
#                         desc = code_el.get("displayName", "") if code_el is not None else ""
#                         if not desc and code_el is not None:
#                             desc = _gettext(code_el.find("cda:originalText", ns))
#                         code_text = code_el.get("code", "") if code_el is not None else ""
#                         _emit_row(start_raw, stop_raw, desc, code_text)
#                     continue
#             if node is None:
#                 continue

#             eff = node.find("./cda:effectiveTime", ns)
#             start_raw, stop_raw = get_start_stop(eff)
#             code_el = node.find("./cda:code", ns)
#             desc = code_el.get("displayName", "") if code_el is not None else ""
#             if not desc and code_el is not None:
#                 desc = _gettext(code_el.find("cda:originalText", ns))
#             code_text = code_el.get("code", "") if code_el is not None else ""
#             _emit_row(start_raw, stop_raw, desc, code_text)

#         # Narrative fallback (Date/time, Goal, Instructions, Description)
#         if not rows:
#             headers, table_rows = _parse_table_rows_from_section(sec)
#             if headers and table_rows:
#                 i_start = _col_index(headers, ["start", "start date", "start time", "date/time", "date", "time"])
#                 i_stop  = _col_index(headers, ["stop", "stop date", "stop time", "end"])
#                 i_goal  = _col_index(headers, ["goal", "care goal"])
#                 i_instr = _col_index(headers, ["instructions", "plan", "care plan", "activity", "intervention"])
#                 i_desc  = _col_index(headers, ["description"])
#                 for cells in table_rows:
#                     start = cells[i_start] if i_start is not None and i_start < len(cells) else ""
#                     stop  = cells[i_stop]  if i_stop  is not None and i_stop  < len(cells) else ""
#                     goal  = cells[i_goal]  if i_goal  is not None and i_goal  < len(cells) else ""
#                     instr = cells[i_instr] if i_instr is not None and i_instr < len(cells) else ""
#                     desc  = cells[i_desc]  if i_desc  is not None and i_desc  < len(cells) else ""
#                     text = (goal + ("; " if goal and instr else "") + instr) or desc
#                     _emit_row(start, stop, text, "")
#             else:
#                 text_el = sec.find("./cda:text", ns)
#                 if text_el is not None:
#                     for item in text_el.findall(".//item"):
#                         _emit_row("", "", _all_text(item), "")

#     return pd.DataFrame(rows)


In [None]:

# # -- Read Bronze â†’ Parse Plan of Care (only) â†’ Load Silver & Manifest --

# from snowflake.snowpark.context import get_active_session
# session = get_active_session()
# assert session is not None, "No active Snowpark session."

# # 1) Read ALL raw CCDA docs from Bronze
# df = session.sql("""
#     SELECT FILE_NAME, TO_VARCHAR(DOC) AS XML_TEXT
#     FROM FINAL_ASSIGNMENT.CCDA.CCDA_BRONZE
# """)
# rows = df.collect()
# assert rows, "Bronze has no rows."

# import pandas as pd
# import xml.etree.ElementTree as ET
# import json

# from snowflake.connector.pandas_tools import write_pandas

# # ðŸ‘‰ IMPORTANT: import the Plan-of-Care-only parser
# # from plan_of_care_parser import parse_plan_of_care   # if in module
# # If you defined parse_plan_of_care inline, ensure it's in scope.

# DB = "FINAL_ASSIGNMENT"
# SCHEMA = "CCDA"

# # --- Only Plan of Care domain/table ---
# DOMAIN_TO_TABLE = {
#     "plan_of_care": "CCDA_PLAN_OF_CARE",
# }

# ISO_TIME_COLS = {"start_iso", "stop_iso"}

# # ---------------- Helper: minimal CCDA meta ----------------
# ns = {"cda": "urn:hl7-org:v3"}

# def get_ccda_meta(xml_text: str) -> dict:
#     """
#     Extract minimal meta without running the full parser:
#       - patient_id: first recordTarget/patientRole/id/@extension (or @root if no extension)
#       - document_id: ClinicalDocument/id/@extension (or @root)
#       - patient_ids_all: JSON array of {root, extension} for patientRole/id[]
#     """
#     meta = {"status": "parsed", "reason": ""}
#     try:
#         root = ET.fromstring(xml_text)
#     except Exception as e:
#         meta["status"] = "rejected"
#         meta["reason"] = f"XML parse failed: {e}"
#         return meta

#     # document_id
#     doc_id_el = root.find("./cda:id", ns)
#     if doc_id_el is not None:
#         meta["document_id"] = doc_id_el.attrib.get("extension") or doc_id_el.attrib.get("root")
#     else:
#         meta["document_id"] = None

#     # patient ids
#     pid_els = root.findall(".//cda:recordTarget/cda:patientRole/cda:id", ns)
#     ids_all = []
#     for el in pid_els:
#         ids_all.append({
#             "root": el.attrib.get("root"),
#             "extension": el.attrib.get("extension")
#         })
#     meta["patient_ids_all"] = json.dumps(ids_all, ensure_ascii=False)
#     # first usable patient_id
#     if ids_all:
#         meta["patient_id"] = ids_all[0].get("extension") or ids_all[0].get("root")
#     else:
#         meta["patient_id"] = None

#     return meta

# # -------- Normalize (unchanged; works for single domain) --------
# def normalize_domain_dfs(dfs: dict, file_name: str, meta: dict) -> dict:
#     """
#     Add FILE_NAME + context (patient_id, document_id, patient_ids_all),
#     convert *_ISO to datetimes, and uppercase columns.
#     Idempotent: does not create duplicate column names.
#     """
#     out = {}
#     patient_id = meta.get("patient_id")
#     document_id = meta.get("document_id")
#     patient_ids_all = meta.get("patient_ids_all")  # JSON string

#     for domain, pdf in dfs.items():
#         if pdf is None or pdf.empty:
#             out[domain] = pdf
#             continue

#         # --- Add columns only if missing ---
#         def ensure(col_name: str, value):
#             if col_name not in pdf.columns:
#                 pdf[col_name] = value

#         ensure("FILE_NAME", file_name)
#         ensure("PATIENT_ID", patient_id)
#         ensure("DOCUMENT_ID", document_id)
#         ensure("PATIENT_IDS_ALL", patient_ids_all)
#         ensure("SOURCE_SYSTEM", "CCDA")
#         ensure("RECORD_TYPE", domain)

#         # --- Convert ISO columns safely ---
#         for col in list(pdf.columns):
#             base = col.lower()
#             if base in ISO_TIME_COLS:
#                 pdf[col] = pd.to_datetime(pdf[col], errors="coerce")

#         # --- Uppercase column names ONCE ---
#         upper_cols = [c.upper() for c in pdf.columns]

#         # --- Ensure uniqueness after uppercasing ---
#         seen = set()
#         final_cols = []
#         for c in upper_cols:
#             if c in seen:
#                 continue  # drop exact duplicate name
#             seen.add(c)
#             final_cols.append(c)

#         # Rebuild the DataFrame with unique, uppercased columns, keeping first occurrence
#         pdf = pdf.loc[:, pdf.columns[:len(final_cols)]]
#         pdf.columns = final_cols
#         out[domain] = pdf

#     return out

# def write_df(table_name: str, pdf: pd.DataFrame) -> int:
#     """Write a pandas DataFrame to a Silver table via write_pandas."""
#     if pdf is None or pdf.empty:
#         return 0
#     success, nchunks, nrows, _ = write_pandas(
#         conn=session.connection,
#         df=pdf,
#         table_name=table_name,
#         database=DB,
#         schema=SCHEMA,
#         quote_identifiers=False,  # assumes Silver columns are unquoted UPPERCASE
#         overwrite=False
#     )
#     if not success:
#         raise RuntimeError(f"write_pandas failed for {DB}.{SCHEMA}.{table_name}")
#     return nrows

# def log_manifest_row(file_name: str, status: str, reason: str, counts: dict, meta: dict = None):
#     """Log one manifest row; other domain counts default to 0."""
#     manifest_row = {
#         "FILE_NAME": file_name,
#         "STATUS": status,
#         "REASON": reason or "",
#         # other domains = 0; only plan_of_care populated
#         "COUNT_MEDICATIONS":        0,
#         "COUNT_VITALS":             0,
#         "COUNT_RESULTS":            0,
#         "COUNT_PROBLEMS":           0,
#         "COUNT_PROCEDURES":         0,
#         "COUNT_ENCOUNTERS":         0,
#         "COUNT_IMMUNIZATIONS":      0,
#         "COUNT_FUNCTIONAL_STATUS":  0,
#         "COUNT_ALLERGIES":          0,
#         "COUNT_PLAN_OF_CARE":       counts.get("plan_of_care", 0),
#         "COUNT_SOCIAL_HISTORY":     0,
#     }
#     if meta:
#         manifest_row["PATIENT_ID"]  = meta.get("patient_id")
#         manifest_row["DOCUMENT_ID"] = meta.get("document_id")

#     manifest_pdf = pd.DataFrame([manifest_row])

#     ok, _, _, _ = write_pandas(
#         conn=session.connection,
#         df=manifest_pdf,
#         table_name="CCDA_INGEST_MANIFEST",
#         database=DB,
#         schema=SCHEMA,
#         quote_identifiers=False
#     )
#     if not ok:
#         raise RuntimeError("Failed to write manifest row")

# # (Idempotent) remove existing Plan of Care rows for the file (and optional document_id)
# def delete_plan_of_care_rows_for_file(file_name: str, document_id: str = None):
#     safe_file = (file_name or "").replace("'", "''")
#     safe_doc  = (document_id or "").replace("'", "''")
#     table = DOMAIN_TO_TABLE["plan_of_care"]
#     if document_id:
#         session.sql(
#             f"DELETE FROM {DB}.{SCHEMA}.{table} WHERE FILE_NAME = '{safe_file}' AND DOCUMENT_ID = '{safe_doc}'"
#         ).collect()
#     else:
#         session.sql(
#             f"DELETE FROM {DB}.{SCHEMA}.{table} WHERE FILE_NAME = '{safe_file}'"
#         ).collect()

# # ---------- Process ALL rows (Plan of Care only) ----------
# grand_counts = {"plan_of_care": 0}
# processed = 0
# failed = 0

# DEFAULT_TZ = "+0530"  # for to_iso conversion inside parse_plan_of_care

# for r in rows:
#     FILE_NAME = r["FILE_NAME"]
#     xml_text  = r["XML_TEXT"]

#     print(f"\n=== Processing (Plan of Care): {FILE_NAME} ===")

#     # Minimal meta (no full parser)
#     meta = get_ccda_meta(xml_text)

#     try:
#         if meta.get("status") != "parsed":
#             # rejected (bad XML, etc.)
#             log_manifest_row(FILE_NAME, status="rejected", reason=meta.get("reason",""), counts={}, meta=meta)
#             print(f"Rejected: {FILE_NAME} | Reason: {meta.get('reason','')}")
#             failed += 1
#             continue

#         # delete only from CCDA_PLAN_OF_CARE
#         delete_plan_of_care_rows_for_file(FILE_NAME, meta.get("document_id"))

#         # --- Parse ONLY Plan of Care ---
#         poc_df = parse_plan_of_care(xml_text, default_tz=DEFAULT_TZ)

#         # --- Normalize with patient/document context ---
#         dfs_norm = normalize_domain_dfs({"plan_of_care": poc_df}, FILE_NAME, meta)

#         # --- Write Plan of Care to Silver ---
#         count_poc = write_df(DOMAIN_TO_TABLE["plan_of_care"], dfs_norm["plan_of_care"])
#         grand_counts["plan_of_care"] += count_poc

#         # --- Log success ---
#         log_manifest_row(FILE_NAME, status="parsed", reason="OK", counts={"plan_of_care": count_poc}, meta=meta)
#         print(f"Loaded Plan of Care: {FILE_NAME} | Rows: {count_poc}")
#         processed += 1

#     except Exception as e:
#         log_manifest_row(FILE_NAME, status="failed", reason=str(e), counts={}, meta=meta if meta else None)
#         print(f"Failed: {FILE_NAME} | Error: {e}")
#         failed += 1

# print("\n=== Summary (Plan of Care only) ===")
# print(f"Processed: {processed} | Failed: {failed} | Total Plan of Care rows: {grand_counts['plan_of_care']}")



In [None]:

# Step 5 (Sanity tests): import package and optionally parse one local file

from ccda_parser import __version__
from ccda_parser.parse_text import parse_ccda_text
from pathlib import Path

print("ccda_parser version:", __version__)

# Optional quick self-check: if you already ran GET and have a file in /tmp/ccda_xml, parse it
candidate_dir = Path("/tmp/ccda_xml")
candidate_files = sorted(candidate_dir.glob("*.xml"))
# print(candidate_files)
if candidate_files:
    sample = candidate_files[0]
    print(f"[INFO] Parsing sample: {sample.name}")
    xml_text = sample.read_text(encoding="utf-8", errors="ignore")
    dfs, meta = parse_ccda_text(xml_text)
    print("Status:", meta.get("status"))
    print("Section counts:", meta.get("counts"))
    # Show a peek
    for name, df in dfs.items():
        print(f"\n{name} -> {len(df)} rows")
        display(df.head(3))
else:
    print("[INFO] No local XML found in /tmp/ccda_xml yet. Run Step 6 (GET from stage) and re-run this test.")




In [None]:

SELECT
  CURRENT_ACCOUNT()            AS account_locator,   -- legacy locator form
  CURRENT_USER()               AS user_name,
  CURRENT_ROLE()               AS role_name,
  CURRENT_WAREHOUSE()          AS warehouse_name,
  CURRENT_DATABASE()           AS database_name,
  CURRENT_SCHEMA()             AS schema_name;


In [None]:

# from snowflake.snowpark import Session
# # from snowflake.snowpark.context import get_active_session

# # Fill these with your values (or use Notebook secrets/parameters)
# connection_parameters = {
#     "account":   "HKC53319",      # e.g., xy12345.ap-south-1
#     "user":      "SBAGUL077",
#     "password":  "Winterspell#077",             # consider keypair auth in prod
#     "role":      "SYSADMIN",
#     "warehouse": "COMPUTE_WH",
#     "database":  "CAPSTONE_DB",
#        "schema":    "MEDALLION_SILVER",
# }

# session = Session.builder.configs(connection_parameters).create()

# # Optional sanity checks
# session.sql("SELECT CURRENT_USER(), CURRENT_ROLE(), CURRENT_DATABASE(), CURRENT_SCHEMA()").collect()
# print("[INFO] Snowpark session initialized.")



In [None]:
select * from FINAL_ASSIGNMENT.CCDA.CCDA_BRONZE;

In [None]:
--NOT NEEDED MAYBE
-- -- Use your database/schema
-- USE DATABASE CAPSTONE_DB;
-- USE SCHEMA MEDALLION_SILVER;

-- -- Create the external stage (already fine)
-- CREATE OR REPLACE STAGE ccda_xml_stage
--   URL='s3://demosnowflakesanket/CCDA_Data/'
--   STORAGE_INTEGRATION=S3_CCDA_INTEGRATION;

-- -- Create a FILE FORMAT for XML (named constant)
-- CREATE OR REPLACE FILE FORMAT xml_ccda_ff
--   TYPE = XML
--   COMPRESSION = AUTO;

-- -- Verify the specific file is visible
-- LIST @ccda_xml_stage PATTERN='.*Nancey580_Melissa844_McGlynn426_b26b56d5-928d-13c0-8b47-080d343d0878\.xml$';

-- -- Bronze table
-- CREATE OR REPLACE TABLE ccda_bronze (
--   file_name STRING,
--   doc       VARIANT
-- );

-- -- Load the single XML into Bronze using the named file format
-- COPY INTO ccda_bronze (file_name, doc)
-- FROM (
--   SELECT METADATA$FILENAME, $1
--   FROM @ccda_xml_stage/Nancey580_Melissa844_McGlynn426_b26b56d5-928d-13c0-8b47-080d343d0878.xml
--        (FILE_FORMAT => 'xml_ccda_ff')
-- );


In [None]:
DESC TABLE ccda_bronze;

In [None]:

from snowflake.snowpark.context import get_active_session
session = get_active_session()
session.sql("USE DATABASE CAPSTONE_DB").collect()
session.sql("USE SCHEMA MEDALLION_SILVER").collect()


In [None]:
TRUNCATE TABLE FINAL_ASSIGNMENT.CCDA.CCDA_ALLERGIES;
TRUNCATE TABLE FINAL_ASSIGNMENT.CCDA.CCDA_MEDICATIONS;
TRUNCATE TABLE FINAL_ASSIGNMENT.CCDA.CCDA_VITALS;
TRUNCATE TABLE FINAL_ASSIGNMENT.CCDA.CCDA_RESULTS;
TRUNCATE TABLE FINAL_ASSIGNMENT.CCDA.CCDA_PROBLEMS;
TRUNCATE TABLE FINAL_ASSIGNMENT.CCDA.CCDA_PROCEDURES;
TRUNCATE TABLE FINAL_ASSIGNMENT.CCDA.CCDA_ENCOUNTERS;
TRUNCATE TABLE FINAL_ASSIGNMENT.CCDA.CCDA_IMMUNIZATIONS;
TRUNCATE TABLE FINAL_ASSIGNMENT.CCDA.CCDA_FUNCTIONAL_STATUS;
TRUNCATE TABLE FINAL_ASSIGNMENT.CCDA.CCDA_INGEST_MANIFEST;


In [None]:

CREATE OR REPLACE PROCEDURE CCDA.SP_PROCESS_CCDA_STREAM()
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.10'
PACKAGES = ('snowflake-snowpark-python', 'pandas', 'snowflake-connector-python')
HANDLER = 'run'
AS
$$
from snowflake.snowpark.context import get_active_session
from snowflake.connector.pandas_tools import write_pandas
from ccda_parser.parse_text import parse_ccda_text  # from your package
import pandas as pd

DB = "FINAL_ASSIGNMENT"
SCHEMA = "CCDA"

# Domain â†’ table mapping (same as your script)
DOMAIN_TO_TABLE = {
  "medications": "CCDA_MEDICATIONS",
  "vitals": "CCDA_VITALS",
  "results": "CCDA_RESULTS",
  "problems": "CCDA_PROBLEMS",
  "procedures": "CCDA_PROCEDURES",
  "encounters": "CCDA_ENCOUNTERS",
  "immunizations": "CCDA_IMMUNIZATIONS",
  "functional_status": "CCDA_FUNCTIONAL_STATUS",
  "allergies": "CCDA_ALLERGIES",
  "plan_of_care": "CCDA_PLAN_OF_CARE",
  "social_history": "CCDA_SOCIAL_HISTORY",
}

ISO_TIME_COLS = {"start_iso", "stop_iso"}

def normalize_domain_dfs(dfs: dict, file_name: str, meta: dict) -> dict:
    # (use your existing implementation; simplified here)
    out = {}
    patient_id = meta.get("patient_id")
    document_id = meta.get("document_id")
    patient_ids_all = meta.get("patient_ids_all")
    for domain, pdf in dfs.items():
        if pdf is None or pdf.empty:
            out[domain] = pdf
            continue
        # add context columns if missing
        for col, val in [
            ("FILE_NAME", file_name),
            ("PATIENT_ID", patient_id),
            ("DOCUMENT_ID", document_id),
            ("PATIENT_IDS_ALL", patient_ids_all),
            ("SOURCE_SYSTEM", "CCDA"),
            ("RECORD_TYPE", domain),
        ]:
            if col not in pdf.columns:
                pdf[col] = val
        # convert *_ISO safely
        for col in list(pdf.columns):
            if col.lower() in ISO_TIME_COLS:
                pdf[col] = pd.to_datetime(pdf[col], errors="coerce")
        # uppercase & dedupe column names (safer variant)
        pdf.columns = pdf.columns.str.upper()
        pdf = pdf.loc[:, ~pdf.columns.duplicated()]  # keep first occurrence
        out[domain] = pdf
    return out

def write_df(session, table_name: str, pdf: pd.DataFrame) -> int:
    if pdf is None or pdf.empty:
        return 0
    success, nchunks, nrows, _ = write_pandas(
        conn=session.connection,
        df=pdf,
        table_name=table_name,
        database=DB,
        schema=SCHEMA,
        quote_identifiers=False,
        overwrite=False
    )
    if not success:
        raise RuntimeError(f"write_pandas failed for {DB}.{SCHEMA}.{table_name}")
    return nrows

def log_manifest_row(session, file_name: str, status: str, reason: str, counts: dict, meta: dict = None):
    row = {
      "FILE_NAME": file_name, "STATUS": status, "REASON": reason or "",
      "COUNT_MEDICATIONS": counts.get("medications", 0),
      "COUNT_VITALS": counts.get("vitals", 0),
      "COUNT_RESULTS": counts.get("results", 0),
      "COUNT_PROBLEMS": counts.get("problems", 0),
      "COUNT_PROCEDURES": counts.get("procedures", 0),
      "COUNT_ENCOUNTERS": counts.get("encounters", 0),
      "COUNT_IMMUNIZATIONS": counts.get("immunizations", 0),
      "COUNT_FUNCTIONAL_STATUS": counts.get("functional_status", 0),
      "COUNT_ALLERGIES": counts.get("allergies", 0),
      "COUNT_PLAN_OF_CARE": counts.get("plan_of_care", 0),
      "COUNT_SOCIAL_HISTORY": counts.get("social_history", 0),
    }
    if meta:
        row["PATIENT_ID"]  = meta.get("patient_id")
        row["DOCUMENT_ID"] = meta.get("document_id")
    mdf = pd.DataFrame([row])
    ok, _, _, _ = write_pandas(
        conn=session.connection,
        df=mdf,
        table_name="CCDA_INGEST_MANIFEST",
        database=DB,
        schema=SCHEMA,
        quote_identifiers=False
    )
    if not ok:
        raise RuntimeError("Failed to write manifest row")

def delete_existing_rows_for_file(session, file_name: str, document_id: str = None):
    safe_file = (file_name or "").replace("'", "''")
    safe_doc  = (document_id or "").replace("'", "''")
    for table in DOMAIN_TO_TABLE.values():
        if document_id:
            session.sql(f"DELETE FROM {DB}.{SCHEMA}.{table} WHERE FILE_NAME = '{safe_file}' AND DOCUMENT_ID = '{safe_doc}'").collect()
        else:
            session.sql(f"DELETE FROM {DB}.{SCHEMA}.{table} WHERE FILE_NAME = '{safe_file}'").collect()

def run():
    session = get_active_session()
    assert session is not None, "No active Snowpark session."

    # Read only new rows from the STREAM (incremental)
    df = session.sql("""
        SELECT FILE_NAME, TO_VARCHAR(DOC) AS XML_TEXT
        FROM CCDA_BRONZE_STREAM
    """)
    rows = df.collect()
    if not rows:
        return "No new rows in CCDA_BRONZE_STREAM."

    processed, failed = 0, 0

    for r in rows:
        FILE_NAME = r["FILE_NAME"]
        XML_TEXT  = r["XML_TEXT"]
        try:
            session.sql("BEGIN").collect()
            dfs, meta = parse_ccda_text(XML_TEXT, file_name=FILE_NAME)  # uses your parser
            if meta.get("status") != "parsed":
                log_manifest_row(session, FILE_NAME, "rejected", meta.get("reason",""), {}, meta)
                session.sql("COMMIT").collect()
                failed += 1
                continue

            # Optional idempotency: clear previous rows for this file+document
            delete_existing_rows_for_file(session, FILE_NAME, meta.get("document_id"))

            dfs_norm = normalize_domain_dfs(dfs, FILE_NAME, meta)
            counts = {}
            for domain, table in DOMAIN_TO_TABLE.items():
                counts[domain] = write_df(session, table, dfs_norm.get(domain))

            log_manifest_row(session, FILE_NAME, "parsed", "OK", counts, meta)
            session.sql("COMMIT").collect()
            processed += 1
        except Exception as e:
            session.sql("ROLLBACK").collect()
            # best-effort failure manifest
            try:
                log_manifest_row(session, FILE_NAME, "failed", str(e), {}, meta if 'meta' in locals() else None)
            except Exception:
                pass
            failed += 1

    return f"Processed OK: {processed}, Failed/Rejected: {failed}"
$$;


In [None]:

-- Driver task (runs every 30 days at midnight UTC)
CREATE OR REPLACE TASK CCDA.TASK_PROCESS_CCDA_STREAM
  WAREHOUSE = COMPUTE_WH
  SCHEDULE = 'USING CRON 0 0 1 * * UTC'  -- At 00:00 UTC on the 1st of every month
AS
CALL CCDA.SP_PROCESS_CCDA_STREAM();

-- Start the task
ALTER TASK CCDA.TASK_PROCESS_CCDA_STREAM RESUME;


In [None]:

# -- Read Bronze â†’ Parse CCDA in Python â†’ Load Silver & Manifest (Python cell)

from snowflake.snowpark.context import get_active_session
session = get_active_session()
assert session is not None, "No active Snowpark session."

# 1) Read ALL raw CCDA docs from Bronze
df = session.sql("""
    SELECT FILE_NAME, TO_VARCHAR(DOC) AS XML_TEXT
    FROM CCDA_BRONZE
""")
rows = df.collect()
assert rows, "Bronze has no rows."

import pandas as pd
from snowflake.connector.pandas_tools import write_pandas
from ccda_parser.parse_text import parse_ccda_text

DB = "CCDA_FINAL_ASSIGNMENT"
SCHEMA = "CCDA"

# Domain â†’ Silver table mapping (UPPERCASE Snowflake tables)
DOMAIN_TO_TABLE = {
    "medications":        "CCDA_MEDICATIONS",
    "vitals":             "CCDA_VITALS",
    "results":            "CCDA_RESULTS",
    "problems":           "CCDA_PROBLEMS",
    "procedures":         "CCDA_PROCEDURES",
    "encounters":         "CCDA_ENCOUNTERS",
    "immunizations":      "CCDA_IMMUNIZATIONS",
    "functional_status":  "CCDA_FUNCTIONAL_STATUS",
    # NEW
    "allergies":          "CCDA_ALLERGIES",
    "plan_of_care":       "CCDA_PLAN_OF_CARE",
    "social_history":     "CCDA_SOCIAL_HISTORY",
}

# Columns that should be parsed to datetime (ISO ones only)
ISO_TIME_COLS = {"start_iso", "stop_iso"}



ISO_TIME_COLS = {"start_iso", "stop_iso"}

def normalize_domain_dfs(dfs: dict, file_name: str, meta: dict) -> dict:
    """
    Add FILE_NAME + context (patient_id, document_id, patient_ids_all),
    convert *_ISO to datetimes, and uppercase columns.
    Idempotent: does not create duplicate column names.
    """
    out = {}
    patient_id = meta.get("patient_id")
    document_id = meta.get("document_id")
    patient_ids_all = meta.get("patient_ids_all")  # JSON string from parser

    for domain, pdf in dfs.items():
        if pdf is None or pdf.empty:
            out[domain] = pdf
            continue

        # --- Add columns only if missing ---
        def ensure(col_name: str, value):
            if col_name not in pdf.columns:
                pdf[col_name] = value

        ensure("FILE_NAME", file_name)
        ensure("PATIENT_ID", patient_id)
        ensure("DOCUMENT_ID", document_id)
        ensure("PATIENT_IDS_ALL", patient_ids_all)
        ensure("SOURCE_SYSTEM", "CCDA")
        ensure("RECORD_TYPE", domain)

        # --- Convert ISO columns safely (case-insensitive match) ---
        # If parser already uppercased, handle both cases
        for col in list(pdf.columns):
            base = col.lower()
            if base in ISO_TIME_COLS:
                pdf[col] = pd.to_datetime(pdf[col], errors="coerce")

        # --- Uppercase column names ONCE ---
        upper_cols = [c.upper() for c in pdf.columns]

        # --- Ensure uniqueness after uppercasing (avoid rare case like 'Patient_ID'/'PATIENT_ID') ---
        seen = set()
        final_cols = []
        for c in upper_cols:
            if c in seen:
                # append a numeric suffix to keep write_pandas happy; but prefer to drop duplicates
                # Here we drop exact duplicates by keeping the FIRST occurrence:
                # skip duplicates entirely
                continue
            seen.add(c)
            final_cols.append(c)

        # Rebuild the DataFrame with unique, uppercased columns, keeping first occurrence of each
        pdf = pdf.loc[:, pdf.columns[:len(final_cols)]]
        pdf.columns = final_cols
        out[domain] = pdf

    return out

def write_df(table_name: str, pdf: pd.DataFrame) -> int:
    """Write a pandas DataFrame to a Silver table via write_pandas."""
    if pdf is None or pdf.empty:
        return 0
    success, nchunks, nrows, _ = write_pandas(
        conn=session.connection,
        df=pdf,
        table_name=table_name,
        database=DB,
        schema=SCHEMA,
        quote_identifiers=False,  # assumes Silver columns are unquoted UPPERCASE
        overwrite=False
    )
    if not success:
        raise RuntimeError(f"write_pandas failed for {DB}.{SCHEMA}.{table_name}")
    return nrows


def log_manifest_row(file_name: str, status: str, reason: str, counts: dict, meta: dict = None):
    """Log one manifest row; optionally include patient/document context."""
    manifest_row = {
        "FILE_NAME": file_name,
        "STATUS": status,
        "REASON": reason or "",
        "COUNT_MEDICATIONS":        counts.get("medications", 0),
        "COUNT_VITALS":             counts.get("vitals", 0),
        "COUNT_RESULTS":            counts.get("results", 0),
        "COUNT_PROBLEMS":           counts.get("problems", 0),
        "COUNT_PROCEDURES":         counts.get("procedures", 0),
        "COUNT_ENCOUNTERS":         counts.get("encounters", 0),
        "COUNT_IMMUNIZATIONS":      counts.get("immunizations", 0),
        "COUNT_FUNCTIONAL_STATUS":  counts.get("functional_status", 0),
        "COUNT_ALLERGIES":          counts.get("allergies", 0),
        "COUNT_PLAN_OF_CARE":       counts.get("plan_of_care", 0),
        "COUNT_SOCIAL_HISTORY":     counts.get("social_history", 0),
    }
    if meta:
        manifest_row["PATIENT_ID"]  = meta.get("patient_id")
        
        manifest_row["DOCUMENT_ID"] = meta.get("document_id")

    manifest_pdf = pd.DataFrame([manifest_row])

    ok, _, _, _ = write_pandas(
        conn=session.connection,
        df=manifest_pdf,
        table_name="CCDA_INGEST_MANIFEST",
        database=DB,
        schema=SCHEMA,
        quote_identifiers=False
    )
    if not ok:
        raise RuntimeError("Failed to write manifest row")

# (Optional) make reruns idempotent: remove existing Silver rows for the file
def delete_existing_rows_for_file(file_name: str):
    safe = (file_name or "").replace("'", "''")
    for table in DOMAIN_TO_TABLE.values():
        session.sql(f"DELETE FROM {DB}.{SCHEMA}.{table} WHERE FILE_NAME = '{safe}'").collect()

# ---------- Process ALL rows ----------
grand_counts = {k: 0 for k in DOMAIN_TO_TABLE.keys()}
processed = 0
failed = 0



for r in rows:
    FILE_NAME = r["FILE_NAME"]
    xml_text  = r["XML_TEXT"]

    print(f"\n=== Processing: {FILE_NAME} ===")
    delete_existing_rows_for_file(FILE_NAME)  # optional

    try:
        dfs, meta = parse_ccda_text(xml_text)

        # Rejected parse â†’ log and continue
        if meta.get("status") != "parsed":
            log_manifest_row(FILE_NAME, status="rejected", reason=meta.get("reason",""), counts={}, meta=meta)
            print(f"Rejected: {FILE_NAME} | Reason: {meta.get('reason','')}")
            failed += 1
            continue

        # Normalize with patient/document context
        dfs_norm = normalize_domain_dfs(dfs, FILE_NAME, meta)

        counts = {}
        for domain, table in DOMAIN_TO_TABLE.items():
            counts[domain] = write_df(table, dfs_norm.get(domain))

        # Update totals
        for domain in grand_counts:
            grand_counts[domain] += counts.get(domain, 0)

        # Log success including patient/doc
        log_manifest_row(FILE_NAME, status="parsed", reason="OK", counts=counts, meta=meta)
        print(f"Parsed & loaded: {FILE_NAME} | Counts: {counts}")
        processed += 1

    except Exception as e:
        log_manifest_row(FILE_NAME, status="failed", reason=str(e), counts={}, meta=meta if 'meta' in locals() else None)
        print(f"Failed: {FILE_NAME} | Error: {e}")
        failed += 1

def delete_existing_rows_for_file(file_name: str, document_id: str = None):
    safe_file = (file_name or "").replace("'", "''")
    safe_doc  = (document_id or "").replace("'", "''")
    for table in DOMAIN_TO_TABLE.values():
        if document_id:
            session.sql(
                f"DELETE FROM {DB}.{SCHEMA}.{table} WHERE FILE_NAME = '{safe_file}' AND DOCUMENT_ID = '{safe_doc}'"
            ).collect()
        else:
            session.sql(
                f"DELETE FROM {DB}.{SCHEMA}.{table} WHERE FILE_NAME = '{safe_file}'"
            ).collect()


print("\n=== Summary ===")
print(f"Processed OK: {processed}, Failed/Rejected: {failed}")



In [None]:
-- SELECT * FROM FINAL_ASSIGNMENT.CCDA.CCDA_ALLERGIES;

-- SELECT count(1),FILE_NAME,	START_RAW,	STOP_RAW,	START_ISO,	STOP_ISO,	DESCRIPTION,	CODE_SYSTEM,	CODE FROM FINAL_ASSIGNMENT.CCDA.CCDA_MEDICATIONS
-- where file_name like '%James276_Bradtke547_052f9984-3e08-e364-872e-c2ed6284aaf6.xml'
-- group by FILE_NAME,	START_RAW,	STOP_RAW,	START_ISO,	STOP_ISO,	DESCRIPTION,	CODE_SYSTEM,	CODE
-- order by 1 desc;
-- select * FROM FINAL_ASSIGNMENT.CCDA.CCDA_MEDICATIONS;
-- SELECT * FROM FINAL_ASSIGNMENT.CCDA.CCDA_VITALS;
-- SELECT * FROM FINAL_ASSIGNMENT.CCDA.CCDA_RESULTS;
-- SELECT * FROM FINAL_ASSIGNMENT.CCDA.CCDA_PROBLEMS;
-- SELECT * FROM FINAL_ASSIGNMENT.CCDA.CCDA_PROCEDURES;
SELECT * FROM CCDA_FINAL_ASSIGNMENT.CCDA.CCDA_ENCOUNTERS;
-- SELECT * FROM FINAL_ASSIGNMENT.CCDA.CCDA_IMMUNIZATIONS;
-- SELECT * FROM FINAL_ASSIGNMENT.CCDA.CCDA_FUNCTIONAL_STATUS;
-- SELECT * FROM FINAL_ASSIGNMENT.CCDA.CCDA_INGEST_MANIFEST;
-- SELECT * FROM FINAL_ASSIGNMENT.CCDA.CCDA_PLAN_OF_CARE;
-- SELECT * FROM FINAL_ASSIGNMENT.CCDA.CCDA_SOCIAL_HISTORY;

In [None]:
-- select * from CCDA_FINAL_ASSIGNMENT.CCDA.CCDA_BRONZE;
CREATE OR REPLACE TABLE CCDA_FINAL_ASSIGNMENT.ccda.V_PATIENT_DEMOGRAPHICS AS
WITH doc AS (
  SELECT file_NAME AS DOC_ID, DOC AS x
  FROM CCDA_FINAL_ASSIGNMENT.CCDA.CCDA_BRONZE
),
rt AS (
  SELECT DOC_ID, XMLGET(x, 'recordTarget') AS record_target
  FROM doc
),
pr AS (
  SELECT DOC_ID, XMLGET(record_target, 'patientRole') AS patient_role
  FROM rt
),
p AS (
  SELECT DOC_ID, XMLGET(patient_role, 'patient') AS patient
  FROM pr
),
addr0 AS (
  SELECT DOC_ID, XMLGET(patient_role, 'addr') AS addr
  FROM pr
),
tele0 AS (
  SELECT DOC_ID, XMLGET(patient_role, 'telecom') AS telecom
  FROM pr
),
name0 AS (
  SELECT DOC_ID, XMLGET(patient, 'name') AS name_el
  FROM p
)
SELECT
  d.DOC_ID,

  /* Patient Contact Details */
  COALESCE(GET(XMLGET(a.addr, 'streetAddressLine'), '$')::string, NULL) AS street_address_line,
  COALESCE(GET(XMLGET(a.addr, 'city'), '$')::string, NULL)              AS city,
  COALESCE(GET(XMLGET(a.addr, 'state'), '$')::string, NULL)             AS state,
  COALESCE(GET(XMLGET(a.addr, 'postalCode'), '$')::string, NULL)        AS postal_code,
  COALESCE(GET(XMLGET(a.addr, 'country'), '$')::string, NULL)           AS country,
  REGEXP_REPLACE(GET(t.telecom, '@value')::string, '^tel:', '')         AS phone_raw,
  REGEXP_REPLACE(GET(t.telecom, '@value')::string, '^mailto:', '')      AS email_raw,

  /* Date of Birth */
  TRY_TO_DATE(GET(XMLGET(p.patient, 'birthTime'), '@value')::string, 'YYYYMMDD') AS date_of_birth,

  /* Gender, Race, Ethnicity */
  GET(XMLGET(p.patient, 'administrativeGenderCode'), '@code')::string   AS gender_code,
  GET(XMLGET(p.patient, 'raceCode'), '@code')::string                   AS race_code,
  GET(XMLGET(p.patient, 'ethnicGroupCode'), '@code')::string            AS ethnicity_code,

  /* Patient IDs */
  GET(XMLGET(pr.patient_role, 'id'), '@root')::string                   AS patient_id_root,
  GET(XMLGET(pr.patient_role, 'id'), '@extension')::string              AS patient_id_extension,

  /* Language Communication */
  GET(
    XMLGET(XMLGET(p.patient, 'languageCommunication'), 'languageCode'),
    '@code'
  )::string AS language_code,

  /* Optional Name */
  GET(XMLGET(n.name_el, 'given'),  '$')::string                         AS given_name,
  GET(XMLGET(n.name_el, 'family'), '$')::string                         AS family_name

FROM doc d
JOIN rt  r  ON r.DOC_ID  = d.DOC_ID
JOIN pr  pr ON pr.DOC_ID = d.DOC_ID
JOIN p   p  ON p.DOC_ID  = d.DOC_ID
LEFT JOIN addr0 a ON a.DOC_ID = d.DOC_ID
LEFT JOIN tele0 t ON t.DOC_ID = d.DOC_ID
LEFT JOIN name0 n ON n.DOC_ID = d.DOC_ID;


In [None]:
-- select * from FINAL_ASSIGNMENT.CCDA.CCDA_BRONZE;

SELECT * FROM CCDA_FINAL_ASSIGNMENT.CCDA.V_PATIENT_DEMOGRAPHICS

In [None]:
SELECT * FROM FINAL_ASSIGNMENT.CCDA.CCDA_INGEST_MANIFEST;