# Xml analyzer

Single-run, streaming analysis of an mzML file that computes run summaries, infers DDA/DIA acquisition mode, builds a deduplicated Structure Report, extracts selectedIon summaries, and writes tidy artifacts (JSON + CSV/Parquet). Set inputs below and Run All.

## 1) Install and Import Dependencies

This notebook installs minimal deps if missing (lxml, pandas, tqdm). 

In [1]:
# Auto-install light deps if missing
import importlib, sys, subprocess

def _ensure(pkg: str, module: str | None = None):
    mod = module or pkg
    try:
        importlib.import_module(mod)
    except ImportError:
        try:
            print(f"Installing {pkg}...")
            subprocess.check_call([sys.executable, "-m", "pip", "install", pkg])
        except Exception as e:
            print(f"Warning: failed to install {pkg}: {e}")

for pkg, mod in [("lxml", "lxml"), ("pandas", "pandas")]:
    _ensure(pkg, mod)

# Imports
import os, io, json, time, logging, struct, base64, zlib, math, csv
from pathlib import Path
from collections import defaultdict, Counter
from dataclasses import dataclass, field, asdict
from typing import Optional, Dict, List, Any, Iterable, Tuple

import pandas as pd

# Parser backend (try lxml)
try:
    from lxml import etree as ET
    PARSER_BACKEND = "lxml"
    def get_iterparse_kwargs() -> Dict[str, Any]:
        return dict(resolve_entities=False, no_network=True, huge_tree=False, recover=False, remove_pis=True, remove_comments=True)
except Exception:
    import xml.etree.ElementTree as ET
    PARSER_BACKEND = "xml.etree.ElementTree"
    def get_iterparse_kwargs() -> Dict[str, Any]:
        return {}

print(f"Using parser backend: {PARSER_BACKEND}")

Installing lxml...
Collecting lxml
  Downloading lxml-6.0.2-cp312-cp312-macosx_10_13_universal2.whl.metadata (3.6 kB)
Downloading lxml-6.0.2-cp312-cp312-macosx_10_13_universal2.whl (8.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.7/8.7 MB[0m [31m21.8 MB/s[0m  [33m0:00:00[0m eta [36m0:00:01[0m
[?25hInstalling collected packages: lxml
Successfully installed lxml-6.0.2
Using parser backend: lxml


## 2) Inputs & Controls

Set your inputs once, then Run All.

In [11]:
# User-friendly controls

# Defaults
MZML_FILE: str = "/Users/maxvandenboom/Docs/Coding/AI/ProfileTensor/data/2449SAB_0002_A1.mzML"  # change to your file
OUTPUT_DIR: str = "../out/2449SAB_0002_A1"
MAX_SPECTRA_SAMPLE: int = 0         # 0 = process all spectra
PREVIEW_ROWS: int = 10
ACQ_DDA_THRESHOLD: float = 0.80
ACQ_DIA_THRESHOLD: float = 0.80

print("Config:")
print("- MZML_FILE:", MZML_FILE)
print("- OUTPUT_DIR:", OUTPUT_DIR)
print("- MAX_SPECTRA_SAMPLE:", MAX_SPECTRA_SAMPLE)
print("- PREVIEW_ROWS:", PREVIEW_ROWS)
print("- ACQ_DDA_THRESHOLD:", ACQ_DDA_THRESHOLD)
print("- ACQ_DIA_THRESHOLD:", ACQ_DIA_THRESHOLD)

Config:
- MZML_FILE: /Users/maxvandenboom/Docs/Coding/AI/ProfileTensor/data/2449SAB_0002_A1.mzML
- OUTPUT_DIR: ../out/2449SAB_0002_A1
- MAX_SPECTRA_SAMPLE: 0
- PREVIEW_ROWS: 10
- ACQ_DDA_THRESHOLD: 0.8
- ACQ_DIA_THRESHOLD: 0.8


## 3) Secure Path Validation and Output Directory Prep

In [12]:
from dataclasses import dataclass

def validate_path(file_path: str, allowed_root: Path | None = None) -> Path:
    p = Path(file_path).expanduser()
    if not p.exists() or not p.is_file():
        raise FileNotFoundError(f"File not found: {file_path}")
    if allowed_root is not None:
        try:
            p.resolve(strict=True).relative_to(Path(allowed_root).resolve())
        except Exception:
            # Allow explicit absolute path but warn
            print(f"Warning: {p} is outside the allowed root {allowed_root}. Proceeding anyway.")
    return p.resolve(strict=True)

# Prepare output directory and artifact paths
OUTDIR = Path(OUTPUT_DIR).resolve()
OUTDIR.mkdir(parents=True, exist_ok=True)
if not os.access(OUTDIR, os.W_OK):
    raise PermissionError(f"Cannot write to output directory: {OUTDIR}")

RUN_SUMMARY_JSON = OUTDIR / 'run_summary.json'
ACQ_REPORT_JSON = OUTDIR / 'acquisition_report.json'
STRUCTURE_JSON = OUTDIR / 'structure_report.json'
SPECTRA_META_CSV = OUTDIR / 'spectra_meta.csv'

print("Artifacts will be written to:")
print("-", RUN_SUMMARY_JSON)
print("-", ACQ_REPORT_JSON)
print("-", STRUCTURE_JSON)
print("-", SPECTRA_META_CSV)

Artifacts will be written to:
- /Users/maxvandenboom/Docs/Coding/AI/ProfileTensor/out/2449SAB_0002_A1/run_summary.json
- /Users/maxvandenboom/Docs/Coding/AI/ProfileTensor/out/2449SAB_0002_A1/acquisition_report.json
- /Users/maxvandenboom/Docs/Coding/AI/ProfileTensor/out/2449SAB_0002_A1/structure_report.json
- /Users/maxvandenboom/Docs/Coding/AI/ProfileTensor/out/2449SAB_0002_A1/spectra_meta.csv


## 4) Parser Backend and Namespace Utilities

In [13]:
MZML_NS = {"mzml": "http://psi.hupo.org/ms/mzml"}

def strip_ns(tag: str) -> str:
    return tag.split('}', 1)[1] if '}' in tag else tag

def get_ns_tag(local_name: str) -> str:
    return f"{{{MZML_NS['mzml']}}}{local_name}"

def _local(tag: str) -> str:
    return tag.split('}', 1)[-1] if '}' in tag else tag

def _findall(elem, xpath: str):
    # Use wildcard ns
    return elem.findall(xpath)

## 5) Data Models and Structured Logger

In [14]:
@dataclass
class SpectrumSummary:
    index: int
    id: str
    ms_level: Optional[int]
    polarity: Optional[str] = None
    centroid: Optional[bool] = None
    tic: Optional[float] = None
    base_peak_mz: Optional[float] = None
    base_peak_intensity: Optional[float] = None
    rt_seconds: Optional[float] = None

@dataclass
class MzMLSummary:
    file_path: str
    file_size_bytes: int
    parser_backend: str
    mzml_id: Optional[str] = None
    mzml_version: Optional[str] = None
    run_id: Optional[str] = None
    start_timestamp: Optional[str] = None
    cv_count: int = 0
    software_count: int = 0
    instrument_count: int = 0
    data_processing_count: int = 0
    total_spectra: int = 0
    spectra_sampled: int = 0
    ms_level_distribution: Dict[int, int] = field(default_factory=dict)
    centroid_count: int = 0
    profile_count: int = 0
    tic_range: Optional[Tuple[float, float]] = None
    base_peak_mz_range: Optional[Tuple[float, float]] = None
    base_peak_intensity_range: Optional[Tuple[float, float]] = None
    rt_range: Optional[Tuple[float, float]] = None
    compression_counts: Dict[str, int] = field(default_factory=dict)   # {'zlib': n, 'none': n}
    bit_depth_counts: Dict[str, int] = field(default_factory=dict)     # {'32': n, '64': n}
    parse_duration_sec: float = 0.0

class StructuredLogger:
    def __init__(self, name: str = "mzml_end2end", level: int = logging.INFO):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(level)
        if not self.logger.handlers:
            h = logging.StreamHandler(sys.stdout)
            h.setFormatter(logging.Formatter('%(message)s'))
            self.logger.addHandler(h)
    def _emit(self, level: str, event: str, **kv):
        payload = {"ts": time.time(), "level": level, "event": event, **kv}
        self.logger.log(getattr(logging, level), json.dumps(payload))
    def info(self, event: str, **kv):
        self._emit("INFO", event, **kv)
    def warning(self, event: str, **kv):
        self._emit("WARNING", event, **kv)
    def error(self, event: str, **kv):
        self._emit("ERROR", event, **kv)

logger = StructuredLogger()

## 6) Single-Pass Streaming Analyzer (Run Metrics + Acquisition + Structure)

In [15]:
# CV accessions of interest
ACC_MS_LEVEL = "MS:1000511"
ACC_SELECTED_ION_MZ = "MS:1000744"
ACC_ISO_TARGET = "MS:1000827"
ACC_ISO_WIDTH = {"MS:1000826", "MS:1000828", "MS:1000829"}  # offsets + width related

class Reservoir:
    def __init__(self, k: int = 200):
        self.k = k
        self.buf: List[float] = []
        self.n = 0
    def add(self, x: float):
        import random
        self.n += 1
        if len(self.buf) < self.k:
            self.buf.append(x)
        else:
            j = random.randint(1, self.n)
            if j <= self.k:
                self.buf[j - 1] = x
    def sample(self) -> List[float]:
        return list(self.buf)

@dataclass
class AcquisitionEvidence:
    total_ms2: int = 0
    dda_selectedIon_cnt1: int = 0
    dda_selectedIon_cnt_gt1: int = 0
    dia_iso_no_selected: int = 0
    ms2_no_precursor: int = 0
    selectedIonList_counts: Counter = field(default_factory=Counter)
    selectedIon_mz_min: float = math.inf
    selectedIon_mz_max: float = -math.inf
    selectedIon_seen: int = 0
    selectedIon_mz_sample: Reservoir = field(default_factory=lambda: Reservoir(200))

    def record_mz(self, mz: float):
        self.selectedIon_seen += 1
        if mz < self.selectedIon_mz_min:
            self.selectedIon_mz_min = mz
        if mz > self.selectedIon_mz_max:
            self.selectedIon_mz_max = mz
        self.selectedIon_mz_sample.add(mz)

@dataclass
class StructureReport:
    adjacency: Dict[str, set] = field(default_factory=lambda: defaultdict(set))
    tag_counts: Counter = field(default_factory=Counter)
    def add_edge(self, parent: str, child: str):
        self.adjacency[parent].add(child)
    def add_tag(self, tag: str):
        self.tag_counts[tag] += 1
    def as_dict(self) -> Dict[str, List[str]]:
        return {k: sorted(list(v)) for k, v in self.adjacency.items()}


def _cv_params(elem) -> List[Dict[str, Any]]:
    out = []
    for cv in elem.findall(get_ns_tag('cvParam')) or elem.findall('cvParam'):
        out.append({
            'accession': cv.get('accession'),
            'name': cv.get('name'),
            'value': cv.get('value'),
            'unitName': cv.get('unitName'),
        })
    return out

def _first_cv_by_name(params: List[Dict[str, Any]], needle: str):
    n = needle.lower()
    for p in params:
        if n in (p.get('name') or '').lower():
            return p
    return None

def _get_cv_value_by_accession(elem, accession: str) -> Optional[str]:
    for cv in _findall(elem, ".//{*}cvParam"):
        if cv.get('accession') == accession:
            return cv.get('value')
    return None


def _convert_scan_time_seconds(scan_elem) -> Optional[float]:
    if scan_elem is None:
        return None
    params = _cv_params(scan_elem)
    p = _first_cv_by_name(params, 'scan start time')
    if not p:
        return None
    try:
        v = float(p.get('value')) if p.get('value') is not None else None
    except ValueError:
        return None
    if v is None:
        return None
    unit = (p.get('unitName') or '').lower()
    if 'minute' in unit:
        return v * 60.0
    return v


def _update_compression_bitdepth_counts(bda_elem, compression_counts: Dict[str, int], bit_counts: Dict[str, int]):
    params = _cv_params(bda_elem)
    names = [(p.get('name') or '').lower() for p in params]
    if any('zlib' in n for n in names):
        compression_counts['zlib'] = compression_counts.get('zlib', 0) + 1
    else:
        compression_counts['none'] = compression_counts.get('none', 0) + 1
    if any('64-bit' in n for n in names):
        bit_counts['64'] = bit_counts.get('64', 0) + 1
    elif any('32-bit' in n for n in names):
        bit_counts['32'] = bit_counts.get('32', 0) + 1


def analyze_stream(mzml_path: Path, max_spectra: int = 0,
                   acq_dda_thr: float = 0.80, acq_dia_thr: float = 0.80) -> Tuple[MzMLSummary, Dict[str, Any], Dict[str, Any]]:
    start = time.time()
    logger.info('parse_start', file=str(mzml_path))

    summary = MzMLSummary(
        file_path=str(mzml_path),
        file_size_bytes=mzml_path.stat().st_size,
        parser_backend=PARSER_BACKEND,
    )
    ev = AcquisitionEvidence()
    structure = StructureReport()

    iter_kwargs = get_iterparse_kwargs()
    with open(mzml_path, 'rb') as f:
        if PARSER_BACKEND == 'lxml':
            context = ET.iterparse(f, events=('start','end'), **iter_kwargs)
        else:
            context = ET.iterparse(f, events=('start','end'))
        stack: List[str] = []
        processed = 0
        for event, elem in context:
            tag = _local(elem.tag)
            if event == 'start':
                structure.add_tag(tag)
                if stack:
                    structure.add_edge(stack[-1], tag)
                stack.append(tag)
                if tag == 'mzML':
                    summary.mzml_id = elem.get('id')
                    summary.mzml_version = elem.get('version')
                elif tag == 'cv':
                    summary.cv_count += 1
                elif tag == 'software':
                    summary.software_count += 1
                elif tag == 'instrumentConfiguration':
                    summary.instrument_count += 1
                elif tag == 'dataProcessing':
                    summary.data_processing_count += 1
                elif tag == 'run':
                    summary.run_id = elem.get('id')
                    summary.start_timestamp = elem.get('startTimeStamp')
                continue

            # event == 'end'
            if tag == 'spectrum':
                if max_spectra > 0 and summary.spectra_sampled >= max_spectra:
                    elem.clear()
                    stack.pop();
                    continue

                summary.spectra_sampled += 1
                processed += 1

                # ms level, polarity, centroid/profile, TIC, base peaks
                params = _cv_params(elem)
                p_ms = _first_cv_by_name(params, 'ms level')
                ms_level = int(p_ms.get('value')) if p_ms and p_ms.get('value') else None
                if ms_level is not None:
                    summary.ms_level_distribution[ms_level] = summary.ms_level_distribution.get(ms_level, 0) + 1
                if _first_cv_by_name(params, 'centroid'):
                    summary.centroid_count += 1
                if _first_cv_by_name(params, 'profile'):
                    summary.profile_count += 1
                p_tic = _first_cv_by_name(params, 'total ion current')
                tic = float(p_tic.get('value')) if p_tic and p_tic.get('value') else None
                if tic is not None:
                    if summary.tic_range is None:
                        summary.tic_range = (tic, tic)
                    else:
                        summary.tic_range = (min(summary.tic_range[0], tic), max(summary.tic_range[1], tic))
                p_bpmz = _first_cv_by_name(params, 'base peak m/z')
                bpmz = float(p_bpmz.get('value')) if p_bpmz and p_bpmz.get('value') else None
                if bpmz is not None:
                    if summary.base_peak_mz_range is None:
                        summary.base_peak_mz_range = (bpmz, bpmz)
                    else:
                        summary.base_peak_mz_range = (min(summary.base_peak_mz_range[0], bpmz), max(summary.base_peak_mz_range[1], bpmz))
                p_bpint = _first_cv_by_name(params, 'base peak intensity')
                bpint = float(p_bpint.get('value')) if p_bpint and p_bpint.get('value') else None
                if bpint is not None:
                    if summary.base_peak_intensity_range is None:
                        summary.base_peak_intensity_range = (bpint, bpint)
                    else:
                        summary.base_peak_intensity_range = (min(summary.base_peak_intensity_range[0], bpint), max(summary.base_peak_intensity_range[1], bpint))

                # RT from scanList/scan
                scan_list = elem.find(get_ns_tag('scanList')) or elem.find('scanList')
                rt = None
                if scan_list is not None:
                    scan = scan_list.find(get_ns_tag('scan')) or scan_list.find('scan')
                    rt = _convert_scan_time_seconds(scan)
                    if rt is not None:
                        if summary.rt_range is None:
                            summary.rt_range = (rt, rt)
                        else:
                            summary.rt_range = (min(summary.rt_range[0], rt), max(summary.rt_range[1], rt))

                # Acquisition evidence on MS2+
                ms_level_val = _get_cv_value_by_accession(elem, ACC_MS_LEVEL)
                try:
                    ms_level_acc = int(ms_level_val) if ms_level_val is not None else None
                except ValueError:
                    ms_level_acc = None
                if ms_level_acc and ms_level_acc >= 2:
                    ev.total_ms2 += 1
                    precursors = list(_findall(elem, ".//{*}precursor"))
                    if not precursors:
                        ev.ms2_no_precursor += 1
                    seen_selected = False
                    seen_dia = False
                    for prec in precursors:
                        sel_list = prec.find('./{*}selectedIonList')
                        if sel_list is not None:
                            try:
                                cnt = int(sel_list.get('count', '0'))
                            except ValueError:
                                cnt = 0
                            ev.selectedIonList_counts[cnt] += 1
                            if cnt == 1:
                                ev.dda_selectedIon_cnt1 += 1
                                seen_selected = True
                            elif cnt > 1:
                                ev.dda_selectedIon_cnt_gt1 += 1
                                seen_selected = True
                            sel = sel_list.find('./{*}selectedIon')
                            if sel is not None:
                                val = _get_cv_value_by_accession(sel, ACC_SELECTED_ION_MZ)
                                if val is not None:
                                    try:
                                        ev.record_mz(float(val))
                                    except ValueError:
                                        pass
                        iso = prec.find('./{*}isolationWindow')
                        if iso is not None and not seen_selected:
                            # Count DIA-like if target present
                            has_target = _get_cv_value_by_accession(iso, ACC_ISO_TARGET) is not None
                            if has_target:
                                seen_dia = True
                    if seen_dia:
                        ev.dia_iso_no_selected += 1

                # compression/bit-depth from binaryDataArrayList
                bdal = elem.find(get_ns_tag('binaryDataArrayList')) or elem.find('binaryDataArrayList')
                if bdal is not None:
                    for bda in bdal.findall(get_ns_tag('binaryDataArray')) or bdal.findall('binaryDataArray'):
                        _update_compression_bitdepth_counts(bda, summary.compression_counts, summary.bit_depth_counts)

                elem.clear()
                while hasattr(elem, 'getprevious') and elem.getprevious() is not None:
                    try:
                        del elem.getparent()[0]
                    except Exception:
                        break

            # pop stack
            if stack:
                stack.pop()

    summary.parse_duration_sec = time.time() - start

    # Total spectra from ms_level_distribution if not set elsewhere
    summary.total_spectra = sum(summary.ms_level_distribution.values())

    # Acquisition classification
    if ev.total_ms2 == 0:
        classification = 'UNKNOWN'
        dda_ratio = dia_ratio = 0.0
    else:
        dda_ratio = ev.dda_selectedIon_cnt1 / ev.total_ms2
        dia_ratio = ev.dia_iso_no_selected / ev.total_ms2
        if dda_ratio >= acq_dda_thr and dia_ratio <= (1 - acq_dda_thr):
            classification = 'DDA'
        elif dia_ratio >= acq_dia_thr and dda_ratio <= (1 - acq_dia_thr):
            classification = 'DIA'
        else:
            classification = 'MIXED'

    run_summary = asdict(summary)
    # normalize tuple ranges to lists for JSON friendliness
    def _tuple_to_list(d: Dict[str, Any], keys: List[str]):
        for k in keys:
            if d.get(k) is not None:
                d[k] = list(d[k])
    _tuple_to_list(run_summary, ['tic_range','base_peak_mz_range','base_peak_intensity_range','rt_range'])

    acquisition_report = {
        'classification': classification,
        'total_ms2': ev.total_ms2,
        'evidence': {
            'dda_selectedIon_cnt1': ev.dda_selectedIon_cnt1,
            'dda_selectedIon_cnt_gt1': ev.dda_selectedIon_cnt_gt1,
            'dia_iso_no_selected': ev.dia_iso_no_selected,
            'ms2_no_precursor': ev.ms2_no_precursor,
        },
        'ratios': {
            'dda_selectedIon_cnt1_per_ms2': (ev.dda_selectedIon_cnt1 / ev.total_ms2) if ev.total_ms2 else 0.0,
            'dia_iso_no_selected_per_ms2': (ev.dia_iso_no_selected / ev.total_ms2) if ev.total_ms2 else 0.0,
        },
        'thresholds': {
            'dda_ratio_threshold': acq_dda_thr,
            'dia_ratio_threshold': acq_dia_thr,
        },
    }

    selected_ion_summary = {
        'selectedIonList_count_histogram': dict(ev.selectedIonList_counts),
        'selectedIon_seen': ev.selectedIon_seen,
        'selectedIon_mz_min': (None if ev.selectedIon_seen == 0 else ev.selectedIon_mz_min),
        'selectedIon_mz_max': (None if ev.selectedIon_seen == 0 else ev.selectedIon_mz_max),
        'selectedIon_mz_sample': ev.selectedIon_mz_sample.sample(),
    }

    structure_report = {
        'elements_hierarchy': structure.as_dict(),
        'element_occurrences': dict(structure.tag_counts),
        'selected_ion_summary': selected_ion_summary,
    }

    logger.info('parse_done', duration_sec=summary.parse_duration_sec, spectra=summary.spectra_sampled)
    return run_summary, acquisition_report, structure_report

## 7) Execute Analysis (Run All)

Validates inputs, runs the analyzer, and keeps dicts in memory for writing.

In [16]:
# Validate path and run
a_mzml = validate_path(MZML_FILE, allowed_root=None)
run_summary, acquisition_report, structure_report = analyze_stream(
    a_mzml,
    max_spectra=MAX_SPECTRA_SAMPLE,
    acq_dda_thr=ACQ_DDA_THRESHOLD,
    acq_dia_thr=ACQ_DIA_THRESHOLD,
)
print(json.dumps({
    'classification': acquisition_report['classification'],
    'spectra_sampled': run_summary.get('spectra_sampled'),
    'ms_level_distribution': run_summary.get('ms_level_distribution')
}, indent=2))

{"ts": 1767618040.5465841, "level": "INFO", "event": "parse_start", "file": "/Users/maxvandenboom/Docs/Coding/AI/ProfileTensor/data/2449SAB_0002_A1.mzML"}


  scan_list = elem.find(get_ns_tag('scanList')) or elem.find('scanList')
  scan = scan_list.find(get_ns_tag('scan')) or scan_list.find('scan')
  bdal = elem.find(get_ns_tag('binaryDataArrayList')) or elem.find('binaryDataArrayList')


{"ts": 1767618042.173748, "level": "INFO", "event": "parse_done", "duration_sec": 1.6271018981933594, "spectra": 16228}
{
  "classification": "DDA",
  "spectra_sampled": 16228,
  "ms_level_distribution": {
    "1": 2723,
    "2": 13505
  }
}


## 8) Write Artifacts (JSON)

In [17]:
with open(RUN_SUMMARY_JSON, 'w', encoding='utf-8') as f:
    json.dump(run_summary, f, indent=2)
with open(ACQ_REPORT_JSON, 'w', encoding='utf-8') as f:
    json.dump(acquisition_report, f, indent=2)
with open(STRUCTURE_JSON, 'w', encoding='utf-8') as f:
    json.dump(structure_report, f, indent=2)
print("Wrote:")
print("-", RUN_SUMMARY_JSON)
print("-", ACQ_REPORT_JSON)
print("-", STRUCTURE_JSON)

Wrote:
- /Users/maxvandenboom/Docs/Coding/AI/ProfileTensor/out/2449SAB_0002_A1/run_summary.json
- /Users/maxvandenboom/Docs/Coding/AI/ProfileTensor/out/2449SAB_0002_A1/acquisition_report.json
- /Users/maxvandenboom/Docs/Coding/AI/ProfileTensor/out/2449SAB_0002_A1/structure_report.json


## 9) Stream Spectra Metadata to CSV/Parquet

In [18]:
def stream_spectra_meta(mzml_path: Path, max_spectra: int = 0):
    iter_kwargs = get_iterparse_kwargs()
    records = 0
    
    f = open(SPECTRA_META_CSV, 'w', newline='', encoding='utf-8')
    w = csv.writer(f)
    header = ['spectrum_index','spectrum_id','ms_level','rt_seconds','polarity','centroid','tic','base_peak_mz','base_peak_intensity','precursor_selected_ion_mz','isolation_window_target_mz','acquisition_mode_hint']
    w.writerow(header)

    with open(mzml_path, 'rb') as fin:
        if PARSER_BACKEND == 'lxml':
            context = ET.iterparse(fin, events=('end',), **iter_kwargs)
        else:
            context = ET.iterparse(fin, events=('end',))
        for event, elem in context:
            if strip_ns(elem.tag) != 'spectrum':
                continue
            if max_spectra > 0 and records >= max_spectra:
                break

            idx = int(elem.get('index', '0'))
            sid = elem.get('id')
            params = _cv_params(elem)
            p_ms = _first_cv_by_name(params, 'ms level')
            ms_level = int(p_ms.get('value')) if p_ms and p_ms.get('value') else None
            pol = 'positive' if _first_cv_by_name(params, 'positive') else ('negative' if _first_cv_by_name(params, 'negative') else None)
            centroid = True if _first_cv_by_name(params, 'centroid') else (False if _first_cv_by_name(params, 'profile') else None)
            p_tic = _first_cv_by_name(params, 'total ion current')
            tic = float(p_tic.get('value')) if p_tic and p_tic.get('value') else None
            p_bpmz = _first_cv_by_name(params, 'base peak m/z')
            bpmz = float(p_bpmz.get('value')) if p_bpmz and p_bpmz.get('value') else None
            p_bpint = _first_cv_by_name(params, 'base peak intensity')
            bpint = float(p_bpint.get('value')) if p_bpint and p_bpint.get('value') else None

            scan_list = elem.find(get_ns_tag('scanList')) or elem.find('scanList')
            rt = None
            if scan_list is not None:
                scan = scan_list.find(get_ns_tag('scan')) or scan_list.find('scan')
                rt = _convert_scan_time_seconds(scan)

            # precursor info
            precursor_mz = None
            iso_target = None
            acq_hint = None
            precursor_list = elem.find(get_ns_tag('precursorList')) or elem.find('precursorList')
            if precursor_list is not None:
                prec = precursor_list.find(get_ns_tag('precursor')) or precursor_list.find('precursor')
                if prec is not None:
                    sel_list = prec.find(get_ns_tag('selectedIonList')) or prec.find('selectedIonList')
                    if sel_list is not None:
                        try:
                            cnt = int(sel_list.get('count','0'))
                        except ValueError:
                            cnt = 0
                        if cnt >= 1:
                            acq_hint = 'DDA'
                        sel = sel_list.find(get_ns_tag('selectedIon')) or sel_list.find('selectedIon')
                        if sel is not None:
                            val = _get_cv_value_by_accession(sel, ACC_SELECTED_ION_MZ)
                            if val is not None:
                                try:
                                    precursor_mz = float(val)
                                except ValueError:
                                    pass
                    iso = prec.find(get_ns_tag('isolationWindow')) or prec.find('isolationWindow')
                    if iso is not None:
                        val = _get_cv_value_by_accession(iso, ACC_ISO_TARGET)
                        if val is not None:
                            try:
                                iso_target = float(val)
                                if acq_hint is None:
                                    acq_hint = 'DIA'
                            except ValueError:
                                pass

            row = [idx, sid, ms_level, rt, pol, centroid, tic, bpmz, bpint, precursor_mz, iso_target, acq_hint]
            w.writerow(row)
            records += 1
            elem.clear()
            while hasattr(elem, 'getprevious') and elem.getprevious() is not None:
                try:
                    del elem.getparent()[0]
                except Exception:
                    break

    f.close()
    return SPECTRA_META_CSV, records

meta_path, meta_n = stream_spectra_meta(a_mzml, MAX_SPECTRA_SAMPLE)
print(f"Wrote spectra metadata ({meta_n} rows) to {meta_path}")

  scan_list = elem.find(get_ns_tag('scanList')) or elem.find('scanList')
  scan = scan_list.find(get_ns_tag('scan')) or scan_list.find('scan')
  precursor_list = elem.find(get_ns_tag('precursorList')) or elem.find('precursorList')
  prec = precursor_list.find(get_ns_tag('precursor')) or precursor_list.find('precursor')
  sel_list = prec.find(get_ns_tag('selectedIonList')) or prec.find('selectedIonList')
  sel = sel_list.find(get_ns_tag('selectedIon')) or sel_list.find('selectedIon')
  iso = prec.find(get_ns_tag('isolationWindow')) or prec.find('isolationWindow')


Wrote spectra metadata (16228 rows) to /Users/maxvandenboom/Docs/Coding/AI/ProfileTensor/out/2449SAB_0002_A1/spectra_meta.csv


## 10) Validate Outputs (Quick Preview)

In [19]:
# Quick previews
try:
    with open(RUN_SUMMARY_JSON, 'r', encoding='utf-8') as f:
        rs = json.load(f)
    with open(ACQ_REPORT_JSON, 'r', encoding='utf-8') as f:
        ar = json.load(f)
    with open(STRUCTURE_JSON, 'r', encoding='utf-8') as f:
        sr = json.load(f)
    print('Classification:', ar.get('classification'))
    print('Total MS2:', ar.get('total_ms2'))
    print('MS level distribution:', rs.get('ms_level_distribution'))
    print('RT range (s):', rs.get('rt_range'))
    print('Compression:', rs.get('compression_counts'))
    print('Bit depth:', rs.get('bit_depth_counts'))
    
    if SPECTRA_META_CSV.exists():
        df = pd.read_csv(SPECTRA_META_CSV, nrows=max(PREVIEW_ROWS, 200))
        print('spectra_meta.csv rows (first chunk):', len(df))
        print(df.head(PREVIEW_ROWS).to_string())
    else:
        print('No spectra metadata file found yet.')

    # Smoke checks
    cls_ok = ar.get('classification') in {'DDA','DIA','MIXED','UNKNOWN'}
    assert cls_ok, 'Classification out of range'
    for k in ['cv_count','software_count','instrument_count','data_processing_count']:
        v = rs.get(k, 0)
        assert isinstance(v, int) and v >= 0
    print('Basic checks passed.')
except Exception as e:
    print('Preview error:', e)

Classification: DDA
Total MS2: 13505
MS level distribution: {'1': 2723, '2': 13505}
RT range (s): [0.075, 899.82499999998]
Compression: {'zlib': 32456}
Bit depth: {'64': 16228, '32': 16228}
spectra_meta.csv rows (first chunk): 200
   spectrum_index                             spectrum_id  ms_level  rt_seconds  polarity  centroid       tic  base_peak_mz  base_peak_intensity  precursor_selected_ion_mz  isolation_window_target_mz acquisition_mode_hint
0               0  sample=1 period=1 cycle=1 experiment=1         1       0.075  positive      True  325240.0    198.095246              37926.0                        NaN                         NaN                   NaN
1               1  sample=1 period=1 cycle=1 experiment=2         2       0.143  positive      True  722706.0     42.034084             183094.0                     105.15                      105.15                   DDA
2               2  sample=1 period=1 cycle=1 experiment=3         2       0.184  positive      True  63