In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from pathlib import Path
import re
import pandas as pd

# Pfade anpassen
CHMM_ROOT = Path("/home-link/zxozk31/Analyse_cons_count/results_chmairra_sim6")
OUT_CSV   = Path("/home-link/zxozk31/Analyse_cons_count/Results_analyse/chmmairra_metrics.csv")

# Sim-GT: sequence_id endet auf "_chim<Zahl>"
RE_SIM_CHIM = re.compile(r"_chim\d+$")

def parse_sim_percent(dir_name: str) -> float:
    token = dir_name.split("simulated_", 1)[1].rstrip("p")
    return float(token.replace("_", "."))

def sample_base_from_stem(stem: str) -> str:
    return re.sub(r"_[0-9]+(?:_[0-9]+)?p_ig$", "", stem)

def main() -> None:
    rows = []
    for sim_dir in sorted(p for p in CHMM_ROOT.iterdir() if p.is_dir() and p.name.startswith("simulated_")):
        sim_percent = parse_sim_percent(sim_dir.name)

        for chim_tsv in sim_dir.glob("*_chim.tsv"):
            common_stem = chim_tsv.name[:-len("_chim.tsv")]
            nonchim_tsv = chim_tsv.with_name(common_stem + "_nonchim.tsv")
            if not nonchim_tsv.exists():
                print(f"[WARN] Missing non-chim file for {chim_tsv}")
                continue

            df_chim = pd.read_csv(chim_tsv, sep="\t", dtype=str)
            df_non  = pd.read_csv(nonchim_tsv, sep="\t", dtype=str)

            n_chim, n_non = len(df_chim), len(df_non)
            n_total = n_chim + n_non
            pct_chim = (n_chim / n_total * 100.0) if n_total else 0.0

            seq_chim = df_chim["sequence_id"].astype(str)
            seq_non  = df_non["sequence_id"].astype(str)

            TP = int(seq_chim.str.contains(RE_SIM_CHIM).sum())
            FP = int(n_chim - TP)
            FN = int(seq_non.str.contains(RE_SIM_CHIM).sum())
            TN = int(n_non - FN)

            rows.append({
                "sim_percent": sim_percent,
                "sample": common_stem,
                "method": "CHMMAIRRa",
                "n_total": n_total,
                "n_chim": n_chim,
                "n_nonchim": n_non,
                "pct_chim": pct_chim,
                "sample_base": sample_base_from_stem(common_stem),
                "TP": TP, "FP": FP, "TN": TN, "FN": FN,
            })

    out = pd.DataFrame(rows, columns=[
        "sim_percent","sample","method","n_total","n_chim","n_nonchim",
        "pct_chim","sample_base","TP","FP","TN","FN"
    ])
    out.sort_values(by=["sim_percent","sample"], inplace=True, kind="stable")
    OUT_CSV.parent.mkdir(parents=True, exist_ok=True)
    out.to_csv(OUT_CSV, index=False)
    print(f"Wrote {len(out)} rows to {OUT_CSV}")

if __name__ == "__main__":
    main()


Wrote 192 rows to /home-link/zxozk31/Analyse_cons_count/Results_analyse/chmmairra_metrics.csv


In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from pathlib import Path
import re
import pandas as pd

# Pfade anpassen
VSEARCH_ROOT = Path("/home-link/zxozk31/Analyse_cons_count/Results_vsearch_simulated6")
TRUTH_ROOT   = Path("/home-link/zxozk31/Analyse_cons_count/results_simulation6")
OUT_CSV      = Path("/home-link/zxozk31/Analyse_cons_count/Results_analyse/vsearch_metrics.csv")

METHODS = ["denovo", "ref", "uchime2", "uchime3"]
RE_CHIM_HDR = re.compile(r"_chim\d+(?:\b|$)")

def parse_percent_token(name: str) -> tuple[str, float]:
    m = re.search(r"_([0-9]+(?:_[0-9]+)?)p(_|$)", name)
    if not m:
        raise ValueError(f"Missing percent token: {name}")
    token = m.group(1)
    return token, float(token.replace("_", "."))

def derive_truth_path(result_dir: Path) -> tuple[str, str, float, Path]:
    sample = result_dir.name.removesuffix("_all_chimera_out")
    token, sim_percent = parse_percent_token(sample)
    truth_dir = TRUTH_ROOT / f"simulated_{token}p"
    truth_fa  = truth_dir / f"{sample}_all.fasta"
    return sample, token, sim_percent, truth_fa

def read_fasta_split_truth(path: Path) -> tuple[set[str], set[str]]:
    chim, non = set(), set()
    if not path.exists():
        return chim, non
    with path.open("r", encoding="utf-8", errors="ignore") as fh:
        hdr, buf = None, []
        def flush():
            if hdr is None or not buf:
                return
            seq = "".join(buf).replace(" ", "").replace("\t", "").strip().upper()
            if seq:
                (chim if RE_CHIM_HDR.search(hdr) else non).add(seq)
        for line in fh:
            if line.startswith(">"):
                flush()
                hdr, buf = line.strip(), []
            else:
                buf.append(line.strip())
        flush()
    return chim, non

def read_fasta_seqs(path: Path) -> set[str]:
    seqs, buf = set(), []
    if not path.exists():
        return seqs
    with path.open("r", encoding="utf-8", errors="ignore") as fh:
        for line in fh:
            if line.startswith(">"):
                if buf:
                    seqs.add("".join(buf).replace(" ", "").replace("\t", "").strip().upper())
                    buf = []
            else:
                buf.append(line.strip())
        if buf:
            seqs.add("".join(buf).replace(" ", "").replace("\t", "").strip().upper())
    return seqs

def main() -> None:
    rows = []
    for rd in sorted(p for p in VSEARCH_ROOT.iterdir() if p.is_dir() and p.name.endswith("_all_chimera_out")):
        try:
            sample, token, sim_percent, truth_fa = derive_truth_path(rd)
        except ValueError as e:
            print(f"[WARN] {e}")
            continue

        truth_chim_set, truth_nonchim_set = read_fasta_split_truth(truth_fa)
        if not truth_chim_set and not truth_nonchim_set:
            print(f"[WARN] Truth FASTA missing/empty: {truth_fa}")

        sample_base = re.sub(rf"_{re.escape(token)}p_.*$", "", sample)

        for m in METHODS:
            chim_fa = rd / f"chimeras_{m}.fa"
            non_fa  = rd / f"nonchimeras_{m}.fa"
            if not chim_fa.exists() and not non_fa.exists():
                continue

            chim_set = read_fasta_seqs(chim_fa) if chim_fa.exists() else set()
            non_set  = read_fasta_seqs(non_fa)  if non_fa.exists()  else set()

            n_chim, n_non = len(chim_set), len(non_set)
            n_total = n_chim + n_non
            pct_chim = (100.0 * n_chim / n_total) if n_total else 0.0

            TP = len(chim_set & truth_chim_set)
            FP = len(chim_set & truth_nonchim_set)
            FN = len(non_set & truth_chim_set)
            TN = len(non_set & truth_nonchim_set)

            rows.append({
                "sim_percent": sim_percent,
                "sample": sample,
                "method": f"VSEARCH_{m}",
                "n_total": n_total,
                "n_chim": n_chim,
                "n_nonchim": n_non,
                "pct_chim": pct_chim,
                "sample_base": sample_base,
                "TP": TP, "FP": FP, "TN": TN, "FN": FN,
            })

    df = pd.DataFrame(rows, columns=[
        "sim_percent","sample","method","n_total","n_chim","n_nonchim",
        "pct_chim","sample_base","TP","FP","TN","FN"
    ])
    df.sort_values(["sim_percent","sample","method"], inplace=True, kind="stable")
    OUT_CSV.parent.mkdir(parents=True, exist_ok=True)
    df.to_csv(OUT_CSV, index=False)
    print(f"Wrote {len(df)} rows to {OUT_CSV}")

if __name__ == "__main__":
    main()


  """


Wrote 768 rows to /home-link/zxozk31/Analyse_cons_count/Results_analyse/vsearch_metrics.csv


In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from pathlib import Path
import re
import pandas as pd


ROOT        = Path("/home-link/zxozk31/Analyse_cons_count/Results_vsearch_simulated6")
TRUTH_ROOT  = Path("/home-link/zxozk31/Analyse_cons_count/results_simulation6")
OUT_CSV     = Path("/home-link/zxozk31/Analyse_cons_count/Results_analyse/usearch_metrics.csv")

RE_CHIM_HDR = re.compile(r"_chim\d+(?:\b|$)")

def parse_percent_token(name: str) -> tuple[str, float]:
    m = re.search(r"_([0-9]+(?:_[0-9]+)?)p(_|$)", name)
    if not m:
        raise ValueError(f"Kein Prozent-Token in: {name}")
    token = m.group(1)
    return token, float(token.replace("_", "."))

def derive_truth_path(result_dir: Path) -> tuple[str, str, float, Path]:
    """Ordner '<sample>_<token>p_all_chimera_out' -> Truth: results_simulation6/simulated_<token>p/<sample>_<token>p_all.fasta"""
    sample = result_dir.name.removesuffix("_all_chimera_out")
    token, sim_percent = parse_percent_token(sample)
    truth_fa = TRUTH_ROOT / f"simulated_{token}p" / f"{sample}_all.fasta"
    return sample, token, sim_percent, truth_fa

def read_fasta_split_truth(path: Path) -> tuple[set[str], set[str]]:
    r"""
    Splittet Truth per Header:
      - Header mit '_chim\d+'  -> truth_chim_set
      - sonst                  -> truth_nonchim_set
    Vergleich über Sequenz (Uppercase, ohne Whitespace).
    """
    chim, non = set(), set()
    if not path.exists():
        return chim, non
    with path.open("r", encoding="utf-8", errors="ignore") as fh:
        hdr, buf = None, []
        def flush():
            if hdr is None or not buf: return
            seq = "".join(buf).replace(" ", "").replace("\t", "").strip().upper()
            if not seq: return
            (chim if RE_CHIM_HDR.search(hdr) else non).add(seq)
        for line in fh:
            if line.startswith(">"):
                flush()
                hdr, buf = line.strip(), []
            else:
                buf.append(line.strip())
        flush()
    return chim, non

def read_fasta_set(path: Path) -> set[str]:
    """FASTA -> Menge der Sequenzen (Uppercase, ohne Whitespace)."""
    s, buf = set(), []
    if not path.exists():
        return s
    with path.open("r", encoding="utf-8", errors="ignore") as fh:
        for line in fh:
            if line.startswith(">"):
                if buf:
                    s.add("".join(buf).replace(" ", "").replace("\t", "").strip().upper())
                    buf = []
            else:
                buf.append(line.strip())
        if buf:
            s.add("".join(buf).replace(" ", "").replace("\t", "").strip().upper())
    return s

def find_first(p: Path, patterns: list[str]) -> Path | None:
    for pat in patterns:
        hits = list(p.glob(pat))
        if hits:
            return hits[0]
    return None

rows = []

# Alle USEARCH-Ordner 
for rd in sorted(p for p in ROOT.iterdir() if p.is_dir() and p.name.endswith("_all_chimera_out")):
    try:
        sample, token, sim_percent, truth_fa = derive_truth_path(rd)
    except ValueError as e:
        print(f"[WARN] {e}")
        continue

    truth_chim_set, truth_nonchim_set = read_fasta_split_truth(truth_fa)
    if not truth_chim_set and not truth_nonchim_set:
        print(f"[WARN] Truth FASTA leer/nicht gefunden: {truth_fa}")

    # USEARCH-Dateien suchen
    chim_fa = find_first(rd, ["*_all_ch.fa"])
    non_fa  = find_first(rd, ["*_all_nonch.fa"])

    if chim_fa is None and non_fa is None:
        continue

    chim_set = read_fasta_set(chim_fa) if chim_fa else set()
    non_set  = read_fasta_set(non_fa)  if non_fa  else set()

    n_chim   = len(chim_set)
    n_non    = len(non_set)
    n_total  = n_chim + n_non
    pct_chim = (100.0 * n_chim / n_total) if n_total else 0.0

   
    TP = len(chim_set & truth_chim_set)
    FP = len(chim_set & truth_nonchim_set)
    FN = len(non_set & truth_chim_set)
    TN = len(non_set & truth_nonchim_set)

    sample_base = re.sub(rf"_{re.escape(token)}p_.*$", "", sample)

    rows.append({
        "sim_percent": sim_percent,
        "sample": sample,                  
        "method": "USEARCH",
        "n_total": n_total,
        "n_chim": n_chim,
        "n_nonchim": n_non,
        "pct_chim": pct_chim,
        "sample_base": sample_base,
        "TP": TP, "FP": FP, "TN": TN, "FN": FN,
    })

# CSV schreiben
df = pd.DataFrame(rows, columns=[
    "sim_percent","sample","method","n_total","n_chim","n_nonchim",
    "pct_chim","sample_base","TP","FP","TN","FN"
])
df.sort_values(["sim_percent","sample"], inplace=True, kind="stable")
OUT_CSV.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(OUT_CSV, index=False)
print(f"Wrote {len(df)} rows to {OUT_CSV}")


Wrote 192 rows to /home-link/zxozk31/Analyse_cons_count/Results_analyse/usearch_metrics.csv


In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from pathlib import Path
import pandas as pd

# Eingaben
FILES = [
    Path("/home-link/zxozk31/Analyse_cons_count/Results_analyse/vsearch_metrics.csv"),
    Path("/home-link/zxozk31/Analyse_cons_count/Results_analyse/usearch_metrics.csv"),
    Path("/home-link/zxozk31/Analyse_cons_count/Results_analyse/chmmairra_metrics.csv"),
]

OUT_CSV = Path("/home-link/zxozk31/Analyse_cons_count/Results_analyse/all_metrics.csv")

dfs = []
for f in FILES:
    if f.exists():
        df = pd.read_csv(f)
        dfs.append(df)
    else:
        print(f"[WARN] Datei fehlt: {f}")

if not dfs:
    raise SystemExit("Keine Eingabedateien gefunden!")

# Zusammenführen
out = pd.concat(dfs, ignore_index=True)

# Sortierung: erst sim_percent, dann sample, dann method
sort_cols = [c for c in ["sim_percent", "sample", "method"] if c in out.columns]
if sort_cols:
    out.sort_values(by=sort_cols, inplace=True, kind="stable")

# Schreiben
OUT_CSV.parent.mkdir(parents=True, exist_ok=True)
out.to_csv(OUT_CSV, index=False)
print(f"Wrote merged CSV with {len(out)} rows -> {OUT_CSV}")


Wrote merged CSV with 1152 rows -> /home-link/zxozk31/Analyse_cons_count/Results_analyse/all_metrics.csv


In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from pathlib import Path
import re
import pandas as pd


VSEARCH_ROOT = Path("/home-link/zxozk31/Analyse_cons_count/Results_vsearch_unsimulated2")
OUT_CSV      = Path("/home-link/zxozk31/Analyse_cons_count/Results_analyse/unsim_vsearch_combined.csv")

RE_CHIMERAS = re.compile(r"^chimeras_(?P<method>[^.]+)\.fa$")

def fasta_count(path: Path) -> int:
    if not path or not path.exists():
        return 0
    n = 0
    with path.open("r", encoding="utf-8", errors="ignore") as fh:
        for line in fh:
            if line.startswith(">"):
                n += 1
    return n

def collect_vsearch_unsim(root: Path) -> list[dict]:
    rows = []

    run_dirs = sorted(p for p in root.iterdir() if p.is_dir() and p.name.endswith("_chimera_out"))
    for rd in run_dirs:
        sample = rd.name.removesuffix("_chimera_out")

        for chim_fa in rd.glob("chimeras_*.fa"):
            m = RE_CHIMERAS.match(chim_fa.name)
            if not m:
                continue
            method = m.group("method")  # z.B. "uchime2", "denovo", "ref", "uchime3", ...

            non_fa = rd / f"nonchimeras_{method}.fa"  
            # Wenn weder chim noch non existiert, überspringen
            if not chim_fa.exists() and not non_fa.exists():
                continue

            n_chim = fasta_count(chim_fa) if chim_fa.exists() else 0
            n_non  = fasta_count(non_fa)  if non_fa.exists()  else 0
            n_tot  = n_chim + n_non
            pct    = round(100.0 * n_chim / n_tot, 2) if n_tot else 0.0

            rows.append({
                "sample": sample,
                "method": f"VSEARCH_{method}",
                "n_total": n_tot,
                "n_chim": n_chim,
                "n_nonchim": n_non,
                "pct_chim": pct,
                "path_chim": str(chim_fa) if chim_fa.exists() else "",
                "path_nonchim": str(non_fa) if non_fa.exists() else "",
            })

        # Optional: falls es eine Methode gibt, für die nur nonchimeras_*.fa existiert:
        for non_fa in rd.glob("nonchimeras_*.fa"):
            method = non_fa.stem.replace("nonchimeras_", "")
            # wurde diese Methode oben schon durch chimeras_* abgedeckt?
            if any(r["sample"] == sample and r["method"] == f"VSEARCH_{method}" for r in rows):
                continue
            chim_fa = rd / f"chimeras_{method}.fa"
            n_chim = fasta_count(chim_fa) if chim_fa.exists() else 0
            n_non  = fasta_count(non_fa)
            n_tot  = n_chim + n_non
            pct    = round(100.0 * n_chim / n_tot, 2) if n_tot else 0.0
            rows.append({
                "sample": sample,
                "method": f"VSEARCH_{method}",
                "n_total": n_tot,
                "n_chim": n_chim,
                "n_nonchim": n_non,
                "pct_chim": pct,
                "path_chim": str(chim_fa) if chim_fa.exists() else "",
                "path_nonchim": str(non_fa),
            })

    return rows

def main():
    rows = collect_vsearch_unsim(VSEARCH_ROOT) if VSEARCH_ROOT.exists() else []
    df = pd.DataFrame(rows, columns=[
        "sample","method","n_total","n_chim","n_nonchim","pct_chim","path_chim","path_nonchim"
    ])
    if not df.empty:
        df.sort_values(["sample","method"], inplace=True, kind="stable")
    OUT_CSV.parent.mkdir(parents=True, exist_ok=True)
    df.to_csv(OUT_CSV, index=False)
    print(f"Wrote {len(df)} rows to {OUT_CSV}")

if __name__ == "__main__":
    main()


Wrote 48 rows to /home-link/zxozk31/Analyse_cons_count/Results_analyse/unsim_vsearch_combined.csv


In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from pathlib import Path
import pandas as pd

# === Eingaben anpassen ===
CSV1 = Path("/home-link/zxozk31/Analyse_cons_count/Results_analyse/unsim_usearch_only.csv")
CSV2 = Path("/home-link/zxozk31/Analyse_cons_count/Results_analyse/unsim_vsearch_combined.csv")
CSV3 = Path("/home-link/zxozk31/Analyse_cons_count/results_unsimulated_chmmairra/chimera_eval_unsim_summary.csv")

OUT  = Path("/home-link/zxozk31/Analyse_cons_count/Results_analyse/unsimulated_combined_allmethods.csv")

# Erwartete Spalten (werden vereinheitlicht)
COLUMNS = ["sample","method","n_total","n_chim","n_nonchim","pct_chim","path_chim","path_nonchim"]

def load_csv(p: Path) -> pd.DataFrame:
    df = pd.read_csv(p)
    # fehlende Spalten ergänzen, extra Spalten ignorieren
    for c in COLUMNS:
        if c not in df.columns:
            df[c] = "" if c.startswith("path_") else 0
    # nur relevante Spalten behalten & Reihenfolge fixieren
    df = df[COLUMNS]
    return df

def main():
    frames = [load_csv(p) for p in [CSV1, CSV2, CSV3]]
    merged = pd.concat(frames, ignore_index=True)

    # Duplikate entfernen (vollständige Zeilengleichheit)
    merged = merged.drop_duplicates()

    # sinnvolle Sortierung
    if {"sample","method"}.issubset(merged.columns):
        merged = merged.sort_values(["sample","method"], kind="stable")

    OUT.parent.mkdir(parents=True, exist_ok=True)
    merged.to_csv(OUT, index=False)
    print(f"OK: {len(merged)} Zeilen -> {OUT}")

if __name__ == "__main__":
    main()


OK: 72 Zeilen -> /home-link/zxozk31/Analyse_cons_count/Results_analyse/unsimulated_combined_allmethods.csv
