# Basic stat and clean data

In [None]:
dat_ori=r"original data path"

dat_work=r"working folder path"

path_results=r"result folder path"

In [None]:
from pathlib import Path
import re
from collections import Counter, defaultdict
import pandas as pd
from tqdm import tqdm
import wfdb

In [None]:

ROOT = Path(dat_ori)  # p00


ECG_LEADS = {
    "I","II","III","AVR","AVL","AVF",
    "V1","V2","V3","V4","V5","V6",
    "MCL1","MCL2","MCL3","MCL4","MCL5","MCL6",
    "V","VX","VE","VF" 
}

PPG_COLOR_KEYS = {
    "RED": ["RED"],
    "IR": ["IR","INFRA","INFRARED"],
    "GREEN": ["GREEN"],
    "BLUE": ["BLUE"]
}

def norm_name(s: str) -> str:
    return re.sub(r"\s+", "", s.upper())

def classify_channel(name: str):
    """
    return data: ECG/PPG/ABG etc
    """
    n = norm_name(name)

    # --- ECG ---
    if n in ECG_LEADS:
        return ("ECG", n)
    # robust on ECG name LeadII / ECGII / ECG2
    m = re.match(r"(LEAD)?(ECG)?(I{1,3}|V[1-6]|AVR|AVL|AVF)$", n)
    if m:
        lead = m.groups()[-1]
        return ("ECG", lead)

    # --- PPG / Pleth ---
    if "PLETH" in n or "PPG" in n or "PHOTO" in n:
        # PPG color
        for color, keys in PPG_COLOR_KEYS.items():
            if any(k in n for k in keys):
                return ("PPG", color)
        return ("PPG", "UNKNOWN")

    # BP
    if n in {"ABP","ART","AOP"}:
        return ("BP", "ABP/ART")
    if n in {"PAP"}:
        return ("BP", "PAP")
    if n in {"CVP"}:
        return ("BP", "CVP")
    if n in {"UAP","FAP","RAP","PCWP","ICP"}:
        return ("BP", n)

    # RESP
    if n in {"RESP","RESPIRATION"}:
        return ("RESP", "RESP")
    if "ETCO2" in n or n == "CO2":
        return ("CO2", "CO2")
    if n in {"SPO2","O2"}:
        return ("O2", n)

    # other
    if n.startswith("EEG"):
        return ("EEG", n)
    if n.startswith("EMG"):
        return ("EMG", n)

    return ("OTHER", name)

# check in all head files
hea_files = list(ROOT.rglob("*.hea"))

records_info = []
counter_by_raw = Counter()         
counter_by_group = Counter()       
counter_ecg_lead = Counter()      
counter_ppg_color = Counter()    

fail_list = []

for hea_path in tqdm(hea_files, desc="Scanning .hea"):
    try:
        rec_base = str(hea_path)[:-4]  
        h = wfdb.rdheader(rec_base)
        sig_names = list(h.sig_name) if h.sig_name else []
        units = list(h.units) if h.units else []
        fs = h.fs
        n_sig = h.n_sig

        records_info.append({
            "record": Path(rec_base).name,
            "rel_dir": str(hea_path.parent.relative_to(ROOT)),
            "fs": fs,
            "n_sig": n_sig,
            "channels": ";".join(sig_names),
            "units": ";".join(units) if units else ""
        })

        for ch in sig_names:
            counter_by_raw[norm_name(ch)] += 1
            big, small = classify_channel(ch)
            counter_by_group[big] += 1
            if big == "ECG":
                counter_ecg_lead[small] += 1
            if big == "PPG":
                counter_ppg_color[small] += 1

    except Exception as e:
        fail_list.append({"hea": str(hea_path), "error": repr(e)})

# summarise
df_records = pd.DataFrame(records_info).sort_values("record")

df_raw = (pd.DataFrame(counter_by_raw.items(), columns=["raw_channel","count"])
            .sort_values("count", ascending=False))
df_group = (pd.DataFrame(counter_by_group.items(), columns=["group","count"])
            .sort_values("count", ascending=False))
df_ecg = (pd.DataFrame(counter_ecg_lead.items(), columns=["ecg_lead","count"])
            .sort_values("count", ascending=False))
df_ppg = (pd.DataFrame(counter_ppg_color.items(), columns=["ppg_color","count"])
            .sort_values("count", ascending=False))

print("total record ", len(df_records))
print("read fail ", len(fail_list))

# output
out_dir = Path(path_results) / "_descriptive_summary"
out_dir.mkdir(exist_ok=True)
df_records.to_csv(out_dir / "records_summary.csv", index=False)         
df_raw.to_csv(out_dir / "channel_raw_counts.csv", index=False)         
df_group.to_csv(out_dir / "channel_group_counts.csv", index=False)     
df_ecg.to_csv(out_dir / "ecg_lead_counts.csv", index=False)           
df_ppg.to_csv(out_dir / "ppg_color_counts.csv", index=False)          

if fail_list:
    pd.DataFrame(fail_list).to_csv(out_dir / "read_failures.csv", index=False)

out_dir


In [None]:
# get data we will use

from pathlib import Path
import re
from tqdm.notebook import tqdm
import pandas as pd
import wfdb
import numpy as np

ROOT = Path(dat_ori)     # p00
OUT_DIR = Path(dat_work) / "_triad_csv" 
OUT_DIR.mkdir(exist_ok=True)


def norm(s: str) -> str:
    return re.sub(r"\s+", "", s.upper()) if s else ""

ECG_II_PATTERNS = [
    r"^II$", r"^ECGII$", r"^LEADII$", r"^LEAD2$", r"^ECG2$", r"^MCL2$"
]
ecg_ii_regex = re.compile("|".join(ECG_II_PATTERNS))

# PPG
def is_ppg(name_norm: str) -> bool:
    if any(k in name_norm for k in ["PLETH", "PPG", "PHOTO"]):
        return True
    return False

# ABP
def is_abp(name_norm: str) -> bool:
    if name_norm in {"ABP", "ART", "AOP"}:
        return True
        
    if re.match(r"^(ABP|ART)\d+$", name_norm):
        return True
    return False

def find_channel_indices(sig_names):
    """return(ECG_II_idx, PPG_idx, ABP_idx)"""
    names_norm = [norm(x) for x in sig_names]

    # ECG II
    idx_ecg = None
    for i, n in enumerate(names_norm):
        if ecg_ii_regex.match(n):
            idx_ecg = i
            break

    # PPG
    idx_ppg = None
    for i, n in enumerate(names_norm):
        if is_ppg(n):
            idx_ppg = i
            break

    # ABP
    idx_abp = None
    for i, n in enumerate(names_norm):
        if is_abp(n):
            idx_abp = i
            break

    if idx_ecg is None or idx_ppg is None or idx_abp is None:
        return None
    return idx_ecg, idx_ppg, idx_abp

# select useful data
hea_files = list(ROOT.rglob("*.hea"))
selected = []   

for hea_path in tqdm(hea_files, desc="Scanning headers"):
    rec_base = str(hea_path)[:-4]  
    try:
        h = wfdb.rdheader(rec_base)
        sig_names = list(h.sig_name) if h.sig_name else []
        idxs = find_channel_indices(sig_names)
        if idxs is not None:
            selected.append((rec_base, idxs, sig_names))
    except Exception as e:
        pass

print(f"满足(ECG II + PPG + ABP)的记录数：{len(selected)}")

# uotput into csv
fail = []
for rec_base, (i_ecg, i_ppg, i_abp), sig_names in tqdm(selected, desc="Exporting CSV"):
    try:
        rec = wfdb.rdrecord(rec_base, channels=[i_ecg, i_ppg, i_abp])
    
        data = rec.p_signal  # (N, 3)
        N = data.shape[0]
    
        # time
        t_sec = np.arange(N) / float(rec.fs)
    
        # DataFrame
        cols = ["ECG_II", "PPG", "ABP"]
        try:
            units = rec.units  
        except:
            units = [None, None, None]
        colnames = [f"{c} ({u})" if u else c for c, u in zip(cols, units)]
    
        df = pd.DataFrame(data, columns=colnames)
        df.insert(0, "t_sec", t_sec)  
    
        rec_id = Path(rec_base).name
        out_path = OUT_DIR / f"{rec_id}.csv"
        df.to_csv(out_path, index=False)
    except Exception as e:
        fail.append({"record": Path(rec_base).name, "error": repr(e)})

print("output finish. fail num ", len(fail))
if fail:
    pd.DataFrame(fail).to_csv(OUT_DIR / "_export_failures.csv", index=False)

OUT_DIR
