## Usage
Place your PFT XML files inside an `input/` directory next to this notebook.
Update the paths in the code cells if needed. Generated CSV and JSON files will be written next to the XML or to the paths you specify.

In [None]:
import pandas as pd
from lxml import etree

def extract_parameters(xml_path, output_csv=None):
    tree = etree.parse(xml_path)
    root = tree.getroot()

    extracted = []

    for param in root.iter("Parameter"):
        param_data = dict(param.attrib)  # FIX: Convert _Attrib to regular dict
        try:
            param_data["TagPath"] = tree.getpath(param)  # optional: full path for context
        except Exception:
            param_data["TagPath"] = None
        extracted.append(param_data)

    df = pd.DataFrame(extracted)

    if output_csv:
        df.to_csv(output_csv, index=False)
        print(f"Saved to {output_csv}")

    return df

# Example usage
xml_file = "input/example.xml"
csv_file = "input/extracted_parameters.csv"

df = extract_parameters(xml_file, output_csv=csv_file)
df = df.drop(columns=["TagPath"]).copy()
print(df.head(100))

In [None]:
from lxml import etree
import matplotlib.pyplot as plt

# Path to your XML file
xml_file_path = "input/example.xml"
csv_file = "input/extracted_parameters.csv"

# Parse the XML
tree = etree.parse(xml_file_path)
root = tree.getroot()

# Extract (Volume, Flow) from <Graph2 X="V (L)" Y="F (L/s)">
points = []
for graph in root.iter("Graph2"):
    if graph.attrib.get("X") == "V (L)" and graph.attrib.get("Y") == "F (L/s)":
        for point in graph.iter("Point"):
            x = point.attrib.get("X")
            y = point.attrib.get("Y")
            if x is not None and y is not None:
                try:
                    points.append((float(x), float(y)))
                except ValueError:
                    continue

# Plot
if points:
    vol, flow = zip(*points)
    plt.figure(figsize=(10, 6))
    plt.plot(vol, flow)
    plt.xlabel("Volume (L)")
    plt.ylabel("Flow (L/s)")
    plt.title("Flow-Volume Loop Extracted from XML")
    plt.grid(True)
    plt.tight_layout()
    plt.show()
else:
    print("No valid <Point> data found under <Graph2> with X='V (L)' and Y='F (L/s)'.")

In [None]:
import json
from lxml import etree

# ---------- CONFIG ----------
xml_file_path = "input/example.xml"
output_json_path = "input/extracted_test_summary.json"
# ----------------------------


def extract_graph(graph_elem):
    """Extract metadata + points from a Graph element."""
    graph_info = {
        "X": graph_elem.attrib.get("X"),
        "Y": graph_elem.attrib.get("Y"),
        "Count": int(graph_elem.attrib.get("Count", "0")),
        "SamplingInterval": float(graph_elem.attrib.get("SamplingInterval", "0")),
        "Points": []
    }

    for point in graph_elem.iter("Point"):
        x = point.attrib.get("X")
        y = point.attrib.get("Y")
        if x is not None and y is not None:
            try:
                graph_info["Points"].append([float(x), float(y)])
            except ValueError:
                continue

    return graph_info


def main(xml_path, output_path):
    tree = etree.parse(xml_path)
    root = tree.getroot()

    subject = root.find("Subject")
    visit = subject.find("Visit")


    export_data = {
        "Subject": {
            "SubjectID": subject.findtext("ID"),
            "FirstName": subject.findtext("FirstName"),
            "LastName": subject.findtext("LastName"),
            "DOB": subject.find("DayOfBirth").attrib.get("ExtendedInfo"),
            "Gender": subject.find("GenderID").attrib.get("ExtendedInfo"),
            "Ethnicity": subject.find("ethnicID").attrib.get("ExtendedInfo"),
        },
        "Visit": {
            "RecordID": visit.findtext("RecordID"),
            "CreatedOn": visit.find("CreatedOn").attrib.get("ExtendedInfo"),
            "Smoker": visit.findtext("Smoker"),
            "CigarettesPerDay": visit.findtext("CigDie"),
            "SmokeYears": visit.findtext("SmokeYears"),
            "SmokeWhat": visit.findtext("SmokeWhat"),
            "NonSmokeYears": visit.findtext("NonSmokeYears"),
            "Height_cm": visit.findtext("Height"),
            "Weight_kg": visit.findtext("Weight"),
            "HRMax": visit.findtext("HRMax"),
            "Technician": visit.findtext("Technician"),
            "Physician": visit.findtext("Physician"),
            "ReferringPhysician": visit.findtext("ReferringPhysician"),
            "VisitReason": visit.findtext("VisitReason"),
            "Diabetes": visit.findtext("Diabetes"),
            "Tests": []
        }
    }

    for test in visit.findall("Test"):
        test_type_elem = test.find("TestType")
        test_type = test_type_elem.attrib.get("ExtendedInfo") if test_type_elem is not None else None
        test_id = test_type_elem.text if test_type_elem is not None else None

        test_record = {
            "TestType": test_type,
            "TestID": test_id,
            "Parameters": [],
            "Graphs": {}
        }

        additional_data = test.find("AdditionalData")
        if additional_data is not None:
            # Extract <Parameters>
            for param in additional_data.iter("Parameter"):
                test_record["Parameters"].append(dict(param.attrib))

            # Extract all <Graph*> under AdditionalData
            for graph in additional_data:
                if graph.tag.startswith("Graph"):
                    test_record["Graphs"][graph.tag] = extract_graph(graph)

        export_data["Visit"]["Tests"].append(test_record)

    with open(output_path, "w") as f:
        json.dump(export_data, f, indent=2)

    print(f"✅ JSON saved to: {output_path}")

# Run it
main(xml_file_path, output_json_path)

In [None]:
import json
import matplotlib.pyplot as plt

# --- CONFIG ---
json_path = output_json_path
# --------------

# Load JSON file
with open(json_path, "r") as f:
    data = json.load(f)

# Build report text
lines = []

# === Subject Info ===
subject = data.get("Subject", {})
lines.append("=== SUBJECT INFORMATION ===")
for key in ["SubjectID", "FirstName", "LastName", "DOB", "Gender", "Ethnicity"]:
    if key in subject:
        lines.append(f"{key}: {subject[key]}")
lines.append("")

# === Visit Info ===
visit = data.get("Visit", {})
lines.append("=== VISIT INFORMATION ===")
for key in ["RecordID", "CreatedOn", "Smoker", "CigarettesPerDay", "SmokeYears", "SmokeWhat", "NonSmokeYears",
            "Height_cm", "Weight_kg", "HRMax", "Technician", "Physician", "ReferringPhysician", "VisitReason", "Diabetes"]:
    if key in visit:
        lines.append(f"{key}: {visit[key]}")
lines.append("")

# === Tests and Graphs ===
lines.append("=== TEST RESULTS ===")
flow_vol_curve = None  # to store graph data if found

for test in visit.get("Tests", []):
    lines.append(f"Test Type: {test.get('TestType')} ({test.get('TestID')})")
    
    # Parameters
    if test.get("Parameters"):
        lines.append("  Parameters:")
        for param in test["Parameters"]:
            formatted = ", ".join(f"{k}={v}" for k, v in param.items())
            lines.append(f"    {formatted}")

    # Graphs
    for gname, gdata in test.get("Graphs", {}).items():
        if gdata.get("X") == "V (L)" and gdata.get("Y") == "F (L/s)" and gdata.get("Points"):
            flow_vol_curve = gdata["Points"]
            lines.append(f"  Flow-volume graph detected in {gname}")
            break  # stop after the first matching graph
    lines.append("")

# Print full report
print("\n".join(lines))

# Plot flow-volume curve if found
if flow_vol_curve:
    vol, flow = zip(*flow_vol_curve)
    plt.figure(figsize=(8, 6))
    plt.plot(vol, flow)
    plt.xlabel("Volume (L)")
    plt.ylabel("Flow (L/s)")
    plt.title("Flow-Volume Curve")
    plt.grid(True)
    plt.tight_layout()
    plt.show()
else:
    print("No flow-volume curve found with X='V (L)' and Y='F (L/s)'.")

In [None]:
import json
import pandas as pd

# --- CONFIG ---
json_path = "input/extracted_test_summary.json"
csv_output_path = "input/subject_flattened_row.csv"
# --------------

# Load JSON file
with open(json_path, "r") as f:
    data = json.load(f)

# Extract subject and visit info
subject = data.get("Subject", {})
visit = data.get("Visit", {})

# Initialize base row
row = {
    "SubjectID": subject.get("SubjectID"),
    "FirstName": subject.get("FirstName"),
    "LastName": subject.get("LastName"),
    "DOB": subject.get("DOB"),
    "Gender": subject.get("Gender"),
    "Ethnicity": subject.get("Ethnicity"),
    "VisitRecordID": visit.get("RecordID"),
    "VisitDate": visit.get("CreatedOn"),
    "Smoker": visit.get("Smoker"),
    "CigarettesPerDay": visit.get("CigarettesPerDay"),
    "SmokeYears": visit.get("SmokeYears"),
    "SmokeWhat": visit.get("SmokeWhat"),
    "NonSmokeYears": visit.get("NonSmokeYears"),
    "Height_cm": visit.get("Height_cm"),
    "Weight_kg": visit.get("Weight_kg"),
    "HRMax": visit.get("HRMax"),
    "Technician": visit.get("Technician"),
    "Physician": visit.get("Physician"),
    "ReferringPhysician": visit.get("ReferringPhysician"),
    "VisitReason": visit.get("VisitReason"),
    "Diabetes": visit.get("Diabetes"),
}

# Process each test and all its parameters
for test in visit.get("Tests", []):
    test_prefix = test.get("TestType", "Unknown").replace(" ", "_")

    for param in test.get("Parameters", []):
        name = param.get("Name", "Unnamed").replace(" ", "_")
        for key, val in param.items():
            if key != "Name":
                col_name = f"{test_prefix}_{name}_{key}".replace(" ", "_")
                row[col_name] = val

    # Flow-volume loop handling
    for gname, gdata in test.get("Graphs", {}).items():
        if gdata.get("X") == "V (L)" and gdata.get("Y") == "F (L/s)" and gdata.get("Points"):
            row["FlowVolumeLoop"] = gdata["Points"]
            break  # only include one FVL

# Convert to single-row DataFrame
df = pd.DataFrame([row])

# Save
df.to_csv(csv_output_path, index=False)
print(f"✅ CSV saved to: {csv_output_path}")
df.head()

In [None]:
varbs = df_single["FlowVolumeLoop"]
for varb in varbs:
    for x, y in varb:
        print(f"Volume: {x}, Flow: {y}")
    print(varb)
print(varbs)
print(len(varb))



In [None]:
import json
import pandas as pd
import json
from lxml import etree

# ---------- CONFIG ----------
xml_file_path = "input/example.xml"
output_json_path = "input/extracted_pft.json"
# ----------------------------
# --- CONFIG ---
json_path = output_json_path
csv_output_path = "input/extracted_pft.csv"
# --------------


def extract_graph(graph_elem):
    """Extract metadata + points from a Graph element."""
    graph_info = {
        "X": graph_elem.attrib.get("X"),
        "Y": graph_elem.attrib.get("Y"),
        "Count": int(graph_elem.attrib.get("Count", "0")),
        "SamplingInterval": float(graph_elem.attrib.get("SamplingInterval", "0")),
        "Points": []
    }

    for point in graph_elem.iter("Point"):
        x = point.attrib.get("X")
        y = point.attrib.get("Y")
        if x is not None and y is not None:
            try:
                graph_info["Points"].append([float(x), float(y)])
            except ValueError:
                continue

    return graph_info


def main(xml_path, output_path):
    tree = etree.parse(xml_path)
    root = tree.getroot()

    subject = root.find("Subject")
    visit = subject.find("Visit")


    export_data = {
        "Subject": {
            "SubjectID": subject.findtext("ID"),
            "FirstName": subject.findtext("FirstName"),
            "LastName": subject.findtext("LastName"),
            "DOB": subject.find("DayOfBirth").attrib.get("ExtendedInfo"),
            "Gender": subject.find("GenderID").attrib.get("ExtendedInfo"),
            "Ethnicity": subject.find("ethnicID").attrib.get("ExtendedInfo"),
        },
        "Visit": {
            "RecordID": visit.findtext("RecordID"),
            "CreatedOn": visit.find("CreatedOn").attrib.get("ExtendedInfo"),
            "Smoker": visit.findtext("Smoker"),
            "CigarettesPerDay": visit.findtext("CigDie"),
            "SmokeYears": visit.findtext("SmokeYears"),
            "SmokeWhat": visit.findtext("SmokeWhat"),
            "NonSmokeYears": visit.findtext("NonSmokeYears"),
            "Height_cm": visit.findtext("Height"),
            "Weight_kg": visit.findtext("Weight"),
            "HRMax": visit.findtext("HRMax"),
            "Technician": visit.findtext("Technician"),
            "Physician": visit.findtext("Physician"),
            "ReferringPhysician": visit.findtext("ReferringPhysician"),
            "VisitReason": visit.findtext("VisitReason"),
            "Diabetes": visit.findtext("Diabetes"),
            "Tests": []
        }
    }

    for test in visit.findall("Test"):
        test_type_elem = test.find("TestType")
        test_type = test_type_elem.attrib.get("ExtendedInfo") if test_type_elem is not None else None
        test_id = test_type_elem.text if test_type_elem is not None else None

        test_record = {
            "TestType": test_type,
            "TestID": test_id,
            "Parameters": [],
            "Graphs": {}
        }

        additional_data = test.find("AdditionalData")
        if additional_data is not None:
            # Extract <Parameters>
            for param in additional_data.iter("Parameter"):
                test_record["Parameters"].append(dict(param.attrib))

            # Extract all <Graph*> under AdditionalData
            for graph in additional_data:
                if graph.tag.startswith("Graph"):
                    test_record["Graphs"][graph.tag] = extract_graph(graph)

        export_data["Visit"]["Tests"].append(test_record)

    with open(output_path, "w") as f:
        json.dump(export_data, f, indent=2)

    print(f"✅ JSON saved to: {output_path}")

# Run it
main(xml_file_path, output_json_path)



# Load JSON file
with open(json_path, "r") as f:
    data = json.load(f)

# Extract subject and visit info
subject = data.get("Subject", {})
visit = data.get("Visit", {})

# Initialize base row
row = {
    "SubjectID": subject.get("SubjectID"),
    "FirstName": subject.get("FirstName"),
    "LastName": subject.get("LastName"),
    "DOB": subject.get("DOB"),
    "Gender": subject.get("Gender"),
    "Ethnicity": subject.get("Ethnicity"),
    "VisitRecordID": visit.get("RecordID"),
    "VisitDate": visit.get("CreatedOn"),
    "Smoker": visit.get("Smoker"),
    "CigarettesPerDay": visit.get("CigarettesPerDay"),
    "SmokeYears": visit.get("SmokeYears"),
    "SmokeWhat": visit.get("SmokeWhat"),
    "NonSmokeYears": visit.get("NonSmokeYears"),
    "Height_cm": visit.get("Height_cm"),
    "Weight_kg": visit.get("Weight_kg"),
    "HRMax": visit.get("HRMax"),
    "Technician": visit.get("Technician"),
    "Physician": visit.get("Physician"),
    "ReferringPhysician": visit.get("ReferringPhysician"),
    "VisitReason": visit.get("VisitReason"),
    "Diabetes": visit.get("Diabetes"),
}

# Process each test and all its parameters
for test in visit.get("Tests", []):
    test_prefix = test.get("TestType", "Unknown").replace(" ", "_")

    for param in test.get("Parameters", []):
        name = param.get("Name", "Unnamed").replace(" ", "_")
        for key, val in param.items():
            if key != "Name":
                col_name = f"{test_prefix}_{name}_{key}".replace(" ", "_")
                row[col_name] = val

    # Flow-volume loop handling
    for gname, gdata in test.get("Graphs", {}).items():
        if gdata.get("X") == "V (L)" and gdata.get("Y") == "F (L/s)" and gdata.get("Points"):
            row["FlowVolumeLoop"] = gdata["Points"]
            break  # only include one FVL

# Convert to single-row DataFrame
df = pd.DataFrame([row])

# Save
df.to_csv(csv_output_path, index=False)
print(f"✅ CSV saved to: {csv_output_path}")
df.head()

In [None]:
import json
import pandas as pd
from pathlib import Path
from lxml import etree

# ---------- CONFIG ----------
xml_file_path = Path("input/example.xml")
json_output_path = xml_file_path.with_name("extracted_pft.json")
csv_output_path = xml_file_path.with_name("extracted_pft.csv")
# ----------------------------

def extract_graph(graph_elem):
    return {
        "X": graph_elem.attrib.get("X"),
        "Y": graph_elem.attrib.get("Y"),
        "Count": int(graph_elem.attrib.get("Count", "0")),
        "SamplingInterval": float(graph_elem.attrib.get("SamplingInterval", "0")),
        "Points": [
            [float(p.attrib["X"]), float(p.attrib["Y"])]
            for p in graph_elem.iter("Point")
            if "X" in p.attrib and "Y" in p.attrib
        ]
    }

def parse_xml_to_json(xml_path, json_path):
    tree = etree.parse(str(xml_path))
    root = tree.getroot()

    subject = root.find("Subject")
    visit = subject.find("Visit")

    export_data = {
        "Subject": {
            "SubjectID": subject.findtext("ID"),
            "FirstName": subject.findtext("FirstName"),
            "LastName": subject.findtext("LastName"),
            "DOB": subject.find("DayOfBirth").attrib.get("ExtendedInfo"),
            "Gender": subject.find("GenderID").attrib.get("ExtendedInfo"),
            "Ethnicity": subject.find("ethnicID").attrib.get("ExtendedInfo"),
        },
        "Visit": {
            "RecordID": visit.findtext("RecordID"),
            "CreatedOn": visit.find("CreatedOn").attrib.get("ExtendedInfo"),
            "Smoker": visit.findtext("Smoker"),
            "CigarettesPerDay": visit.findtext("CigDie"),
            "SmokeYears": visit.findtext("SmokeYears"),
            "SmokeWhat": visit.findtext("SmokeWhat"),
            "NonSmokeYears": visit.findtext("NonSmokeYears"),
            "Height_cm": visit.findtext("Height"),
            "Weight_kg": visit.findtext("Weight"),
            "HRMax": visit.findtext("HRMax"),
            "Technician": visit.findtext("Technician"),
            "Physician": visit.findtext("Physician"),
            "ReferringPhysician": visit.findtext("ReferringPhysician"),
            "VisitReason": visit.findtext("VisitReason"),
            "Diabetes": visit.findtext("Diabetes"),
            "Tests": []
        }
    }

    for test in visit.findall("Test"):
        test_type_elem = test.find("TestType")
        test_type = test_type_elem.attrib.get("ExtendedInfo") if test_type_elem is not None else None
        test_id = test_type_elem.text if test_type_elem is not None else None

        test_record = {
            "TestType": test_type,
            "TestID": test_id,
            "Parameters": [],
            "Graphs": {}
        }

        additional_data = test.find("AdditionalData")
        if additional_data is not None:
            for param in additional_data.iter("Parameter"):
                test_record["Parameters"].append(dict(param.attrib))

            for graph in additional_data:
                if graph.tag.startswith("Graph"):
                    test_record["Graphs"][graph.tag] = extract_graph(graph)

        export_data["Visit"]["Tests"].append(test_record)

    with open(json_path, "w", encoding="utf-8") as f:
        json.dump(export_data, f, indent=2)
    print(f"✅ JSON saved to: {json_path}")

def flatten_json_to_csv(json_path, csv_path):
    with open(json_path, "r", encoding="utf-8") as f:
        data = json.load(f)

    subject = data.get("Subject", {})
    visit = data.get("Visit", {})
    row = {
        "SubjectID": subject.get("SubjectID"),
        "FirstName": subject.get("FirstName"),
        "LastName": subject.get("LastName"),
        "DOB": subject.get("DOB"),
        "Gender": subject.get("Gender"),
        "Ethnicity": subject.get("Ethnicity"),
        "VisitRecordID": visit.get("RecordID"),
        "VisitDate": visit.get("CreatedOn"),
        "Smoker": visit.get("Smoker"),
        "CigarettesPerDay": visit.get("CigarettesPerDay"),
        "SmokeYears": visit.get("SmokeYears"),
        "SmokeWhat": visit.get("SmokeWhat"),
        "NonSmokeYears": visit.get("NonSmokeYears"),
        "Height_cm": visit.get("Height_cm"),
        "Weight_kg": visit.get("Weight_kg"),
        "HRMax": visit.get("HRMax"),
        "Technician": visit.get("Technician"),
        "Physician": visit.get("Physician"),
        "ReferringPhysician": visit.get("ReferringPhysician"),
        "VisitReason": visit.get("VisitReason"),
        "Diabetes": visit.get("Diabetes"),
    }

    for test in visit.get("Tests", []):
        test_prefix = test.get("TestType", "Unknown").replace(" ", "_")
        for param in test.get("Parameters", []):
            param_name = param.get("Name", "Unnamed").replace(" ", "_")
            for key, val in param.items():
                if key != "Name":
                    col_name = f"{test_prefix}_{param_name}_{key}".replace(" ", "_")
                    row[col_name] = val

        for gname, gdata in test.get("Graphs", {}).items():
            if gdata.get("X") == "V (L)" and gdata.get("Y") == "F (L/s)" and gdata.get("Points"):
                row["FlowVolumeLoop"] = gdata["Points"]
                break

    df = pd.DataFrame([row])
    df.to_csv(csv_path, index=False)
    print(f"✅ CSV saved to: {csv_path}")

# Run both steps
parse_xml_to_json(xml_file_path, json_output_path)
flatten_json_to_csv(json_output_path, csv_output_path)

In [None]:
import json
import pandas as pd
from pathlib import Path
from lxml import etree

# ---------- CONFIG ----------
xml_file_path = Path("input/example.xml")
base_name = xml_file_path.stem.replace(" ", "_")  # strip extension and spaces
json_output_path = xml_file_path.with_name(f"{base_name}_extracted.json")
csv_output_path = xml_file_path.with_name(f"{base_name}_extracted.csv")
# ----------------------------

def extract_graph(graph_elem):
    return {
        "X": graph_elem.attrib.get("X"),
        "Y": graph_elem.attrib.get("Y"),
        "Count": int(graph_elem.attrib.get("Count", "0")),
        "SamplingInterval": float(graph_elem.attrib.get("SamplingInterval", "0")),
        "Points": [
            [float(p.attrib["X"]), float(p.attrib["Y"])]
            for p in graph_elem.iter("Point")
            if "X" in p.attrib and "Y" in p.attrib
        ]
    }

def parse_xml_to_dict(xml_path):
    tree = etree.parse(str(xml_path))
    root = tree.getroot()
    subject = root.find("Subject")
    visit = subject.find("Visit")

    return {
        "Subject": {
            "SubjectID": subject.findtext("ID"),
            "FirstName": subject.findtext("FirstName"),
            "LastName": subject.findtext("LastName"),
            "DOB": subject.find("DayOfBirth").attrib.get("ExtendedInfo"),
            "Gender": subject.find("GenderID").attrib.get("ExtendedInfo"),
            "Ethnicity": subject.find("ethnicID").attrib.get("ExtendedInfo"),
        },
        "Visit": {
            "RecordID": visit.findtext("RecordID"),
            "CreatedOn": visit.find("CreatedOn").attrib.get("ExtendedInfo"),
            "Smoker": visit.findtext("Smoker"),
            "CigarettesPerDay": visit.findtext("CigDie"),
            "SmokeYears": visit.findtext("SmokeYears"),
            "SmokeWhat": visit.findtext("SmokeWhat"),
            "NonSmokeYears": visit.findtext("NonSmokeYears"),
            "Height_cm": visit.findtext("Height"),
            "Weight_kg": visit.findtext("Weight"),
            "HRMax": visit.findtext("HRMax"),
            "Technician": visit.findtext("Technician"),
            "Physician": visit.findtext("Physician"),
            "ReferringPhysician": visit.findtext("ReferringPhysician"),
            "VisitReason": visit.findtext("VisitReason"),
            "Diabetes": visit.findtext("Diabetes"),
            "Tests": []
        }
    }

def add_tests_from_xml(root_dict, visit_elem):
    for test in visit_elem.findall("Test"):
        test_type_elem = test.find("TestType")
        test_type = test_type_elem.attrib.get("ExtendedInfo") if test_type_elem is not None else None
        test_id = test_type_elem.text if test_type_elem is not None else None

        test_record = {
            "TestType": test_type,
            "TestID": test_id,
            "Parameters": [],
            "Graphs": {}
        }

        additional_data = test.find("AdditionalData")
        if additional_data is not None:
            for param in additional_data.iter("Parameter"):
                test_record["Parameters"].append(dict(param.attrib))
            for graph in additional_data:
                if graph.tag.startswith("Graph"):
                    test_record["Graphs"][graph.tag] = extract_graph(graph)

        root_dict["Visit"]["Tests"].append(test_record)

def flatten_dict_to_dataframe(data_dict):
    subject = data_dict.get("Subject", {})
    visit = data_dict.get("Visit", {})
    row = {
        "SubjectID": subject.get("SubjectID"),
        "FirstName": subject.get("FirstName"),
        "LastName": subject.get("LastName"),
        "DOB": subject.get("DOB"),
        "Gender": subject.get("Gender"),
        "Ethnicity": subject.get("Ethnicity"),
        "VisitRecordID": visit.get("RecordID"),
        "VisitDate": visit.get("CreatedOn"),
        "Smoker": visit.get("Smoker"),
        "CigarettesPerDay": visit.get("CigarettesPerDay"),
        "SmokeYears": visit.get("SmokeYears"),
        "SmokeWhat": visit.get("SmokeWhat"),
        "NonSmokeYears": visit.get("NonSmokeYears"),
        "Height_cm": visit.get("Height_cm"),
        "Weight_kg": visit.get("Weight_kg"),
        "HRMax": visit.get("HRMax"),
        "Technician": visit.get("Technician"),
        "Physician": visit.get("Physician"),
        "ReferringPhysician": visit.get("ReferringPhysician"),
        "VisitReason": visit.get("VisitReason"),
        "Diabetes": visit.get("Diabetes"),
    }

    for test in visit.get("Tests", []):
        test_prefix = test.get("TestType", "Unknown").replace(" ", "_")
        for param in test.get("Parameters", []):
            param_name = param.get("Name", "Unnamed").replace(" ", "_")
            for key, val in param.items():
                if key != "Name":
                    col = f"{test_prefix}_{param_name}_{key}".replace(" ", "_")
                    row[col] = val
        for gname, gdata in test.get("Graphs", {}).items():
            if gdata.get("X") == "V (L)" and gdata.get("Y") == "F (L/s)" and gdata.get("Points"):
                row["FlowVolumeLoop"] = gdata["Points"]
                break
    return pd.DataFrame([row])

# -------- Pipeline --------
tree = etree.parse(str(xml_file_path))
root = tree.getroot()
visit_elem = root.find(".//Visit")

# Build dict in memory
data_dict = parse_xml_to_dict(xml_file_path)
add_tests_from_xml(data_dict, visit_elem)

# Optional: save JSON for inspection
with open(json_output_path, "w", encoding="utf-8") as f:
    json.dump(data_dict, f, indent=2)
print(f"✅ Saved JSON to: {json_output_path.name}")

# Flatten and save as CSV
df = flatten_dict_to_dataframe(data_dict)
df.to_csv(csv_output_path, index=False)
print(f"✅ Saved CSV to: {csv_output_path.name}")

In [None]:
import json
import pandas as pd
from pathlib import Path
from lxml import etree

# ---------- CONFIG ----------
xml_file_list = [
    Path("input/example.xml")
]
output_dir = xml_file_list[0].parent  # save alongside inputs
# ----------------------------

def extract_graph(graph_elem):
    return {
        "X": graph_elem.attrib.get("X"),
        "Y": graph_elem.attrib.get("Y"),
        "Count": int(graph_elem.attrib.get("Count", "0")),
        "SamplingInterval": float(graph_elem.attrib.get("SamplingInterval", "0")),
        "Points": [
            [float(p.attrib["X"]), float(p.attrib["Y"])]
            for p in graph_elem.iter("Point")
            if "X" in p.attrib and "Y" in p.attrib
        ]
    }

def parse_xml_to_dict(root):
    subject = root.find("Subject")
    visit = subject.find("Visit")

    return {
        "Subject": {
            "SubjectID": subject.findtext("ID"),
            "FirstName": subject.findtext("FirstName"),
            "LastName": subject.findtext("LastName"),
            "DOB": subject.find("DayOfBirth").attrib.get("ExtendedInfo"),
            "Gender": subject.find("GenderID").attrib.get("ExtendedInfo"),
            "Ethnicity": subject.find("ethnicID").attrib.get("ExtendedInfo"),
        },
        "Visit": {
            "RecordID": visit.findtext("RecordID"),
            "CreatedOn": visit.find("CreatedOn").attrib.get("ExtendedInfo"),
            "Smoker": visit.findtext("Smoker"),
            "CigarettesPerDay": visit.findtext("CigDie"),
            "SmokeYears": visit.findtext("SmokeYears"),
            "SmokeWhat": visit.findtext("SmokeWhat"),
            "NonSmokeYears": visit.findtext("NonSmokeYears"),
            "Height_cm": visit.findtext("Height"),
            "Weight_kg": visit.findtext("Weight"),
            "HRMax": visit.findtext("HRMax"),
            "Technician": visit.findtext("Technician"),
            "Physician": visit.findtext("Physician"),
            "ReferringPhysician": visit.findtext("ReferringPhysician"),
            "VisitReason": visit.findtext("VisitReason"),
            "Diabetes": visit.findtext("Diabetes"),
            "Tests": []
        }
    }

def add_tests_from_xml(data_dict, visit_elem):
    for test in visit_elem.findall("Test"):
        test_type_elem = test.find("TestType")
        test_type = test_type_elem.attrib.get("ExtendedInfo") if test_type_elem is not None else None
        test_id = test_type_elem.text if test_type_elem is not None else None

        test_record = {
            "TestType": test_type,
            "TestID": test_id,
            "Parameters": [],
            "Graphs": {}
        }

        additional_data = test.find("AdditionalData")
        if additional_data is not None:
            for param in additional_data.iter("Parameter"):
                test_record["Parameters"].append(dict(param.attrib))
            for graph in additional_data:
                if graph.tag.startswith("Graph"):
                    test_record["Graphs"][graph.tag] = extract_graph(graph)

        data_dict["Visit"]["Tests"].append(test_record)

def flatten_dict_to_row(data_dict):
    subject = data_dict["Subject"]
    visit = data_dict["Visit"]
    row = {
        "SubjectID": subject.get("SubjectID"),
        "FirstName": subject.get("FirstName"),
        "LastName": subject.get("LastName"),
        "DOB": subject.get("DOB"),
        "Gender": subject.get("Gender"),
        "Ethnicity": subject.get("Ethnicity"),
        "VisitRecordID": visit.get("RecordID"),
        "VisitDate": visit.get("CreatedOn"),
        "Smoker": visit.get("Smoker"),
        "CigarettesPerDay": visit.get("CigarettesPerDay"),
        "SmokeYears": visit.get("SmokeYears"),
        "SmokeWhat": visit.get("SmokeWhat"),
        "NonSmokeYears": visit.get("NonSmokeYears"),
        "Height_cm": visit.get("Height_cm"),
        "Weight_kg": visit.get("Weight_kg"),
        "HRMax": visit.get("HRMax"),
        "Technician": visit.get("Technician"),
        "Physician": visit.get("Physician"),
        "ReferringPhysician": visit.get("ReferringPhysician"),
        "VisitReason": visit.get("VisitReason"),
        "Diabetes": visit.get("Diabetes"),
    }

    for test in visit.get("Tests", []):
        test_prefix = test.get("TestType", "Unknown").replace(" ", "_")
        for param in test.get("Parameters", []):
            name = param.get("Name", "Unnamed").replace(" ", "_")
            for k, v in param.items():
                if k != "Name":
                    col = f"{test_prefix}_{name}_{k}".replace(" ", "_")
                    row[col] = v
        for gname, gdata in test.get("Graphs", {}).items():
            if gdata.get("X") == "V (L)" and gdata.get("Y") == "F (L/s)" and gdata.get("Points"):
                row["FlowVolumeLoop"] = gdata["Points"]
                break
    return row

# ---------- MAIN LOOP ----------

for xml_path in xml_file_list:
    print(f"📂 Processing: {xml_path.name}")
    
    # Parse and build XML tree
    tree = etree.parse(str(xml_path))
    root = tree.getroot()
    visit_elem = root.find(".//Visit")

    # Build structured dict from Subject + Visit
    data_dict = parse_xml_to_dict(root)

    # Add Tests + Graphs to the same dict (in-place)
    add_tests_from_xml(data_dict, visit_elem)

    # ✅ Save JSON per patient
    json_path = output_dir / f"{xml_path.stem.replace(' ', '_')}_extracted.json"
    with open(json_path, "w", encoding="utf-8") as f:
        json.dump(data_dict, f, indent=2)
    print(f"✅ JSON saved to: {json_path}")

    # ✅ Flatten and save CSV per patient
    row = flatten_dict_to_row(data_dict)
    df = pd.DataFrame([row])
    csv_path = output_dir / f"{xml_path.stem.replace(' ', '_')}_extracted.csv"
    df.to_csv(csv_path, index=False)
    print(f"✅ CSV saved to: {csv_path}")