# Load & execute pipelines

In [None]:
from pathlib import Path
import pandas as pd
from loguru import logger

def _load_stage_files(paths_csv: str, stage: str, build_paths_fn, reader_fn, print_paths: bool = True) -> pd.DataFrame:
    """
    Universal loader for files referenced in `python_paths.csv`.

:param paths_csv: CSV file with columns: study_id, sample_id, stage, folder
:param stage: name of the stage (BINNING, COVERAGE, GTDBTK)
:param build_paths_fn: function (row: pd.Series) -> List[Path]
:param reader_fn: function (Path) -> pd.DataFrame (may return an empty DataFrame if the file is missing)
:param print_paths: whether to print the file paths
:return: concatenated DataFrame

    """
    df_paths = pd.read_csv(paths_csv, sep=",", dtype=str,)
    rows = df_paths[df_paths["stage"] == stage].copy()
    frames: list[pd.DataFrame] = []

    for _, row in rows.iterrows():
        for path in build_paths_fn(row):
            if print_paths:
                print(path)
            try:
                df_part = reader_fn(path)
                frames.append(df_part)
            except FileNotFoundError:
                logger.warning(f"File not found: {path}")
            except Exception as e:
                logger.error(f"Error reading file {path}: {e}")

    if not frames:
        return pd.DataFrame()

    return pd.concat(frames, ignore_index=True)


# PIPELINE: BINNING
def pipeline_Binning(paths_csv: str, print_paths: bool = True) -> pd.DataFrame:
    def build_paths(row):
        folder = Path(row["folder"])
        sample_id = row["sample_id"]
        return [
            folder / f"{sample_id}_DASTool_contig2bin.tsv",
            folder / f"{sample_id}_DASTool_summary.tsv"
        ]

    def reader(path: Path) -> pd.DataFrame:
        if path.name.endswith("contig2bin.tsv"):
            return pd.read_csv(
                path, sep="\t", header=None,
                names=["contig", "bin"],
                dtype={"contig": "string", "bin": "string"},
            )
        elif path.name.endswith("summary.tsv"):
            return pd.read_csv(path, sep="\t", dtype="string")
        else:
            return pd.DataFrame()

    # special contig2bin join with summary → outer join after bin
    df_paths = pd.read_csv(paths_csv, sep=",", dtype=str,)
    bin_rows = df_paths[df_paths["stage"] == "BINNING"].copy()
    frames: list[pd.DataFrame] = []

    for _, row in bin_rows.iterrows():
        folder = Path(row["folder"])
        sample_id = row["sample_id"]
        contig2bin_path = folder / f"{sample_id}_DASTool_contig2bin.tsv"
        summary_path   = folder / f"{sample_id}_DASTool_summary.tsv"

        if print_paths:
            logger.info(contig2bin_path)
            logger.info(summary_path)

        try:
            c2b = pd.read_csv(contig2bin_path, sep="\t", header=None,
                              names=["contig", "bin"], dtype="string")
        except FileNotFoundError:
            logger.warning(f"File not found: {contig2bin_path}")
            c2b = pd.DataFrame(columns=["contig", "bin"])

        try:
            summ = pd.read_csv(summary_path, sep="\t", dtype="string")
            if "bin" not in summ.columns:
                summ = pd.DataFrame(columns=["bin"])
        except FileNotFoundError:
            logger.warning(f"File not found: {summary_path}")
            summ = pd.DataFrame(columns=["bin"])

        merged = c2b.merge(summ, on="bin", how="outer")
        frames.append(merged)

    if not frames:
        return pd.DataFrame(columns=["contig", "bin"])

    return pd.concat(frames, ignore_index=True)


# PIPELINE: COVERAGE
def pipeline_COVERAGE(paths_csv: str, print_paths: bool = True) -> pd.DataFrame:
    def build_paths(row):
        folder = Path(row["folder"])
        sample_id = row["sample_id"]
        return [folder / f"{sample_id}_coverage.tsv"]

    def reader(path: Path) -> pd.DataFrame:
        df = pd.read_csv(path, sep="\t", dtype="string")
        df.columns = [col.lstrip("#") for col in df.columns]
        return df

    return _load_stage_files(paths_csv, "COVERAGE", build_paths, reader, print_paths)


# PIPELINE: GTDBTK
def pipeline_GTDBTK(paths_csv: str, print_paths: bool = True) -> pd.DataFrame:
    def build_paths(row):
        folder = Path(row["folder"])
        return [folder / "gtdbtk.bac120.summary.tsv"]

    def reader(path: Path) -> pd.DataFrame:
        return pd.read_csv(path, sep="\t", dtype="string")

    return _load_stage_files(paths_csv, "GTDBTK", build_paths, reader, print_paths)


# USE EXAMPLE
df_bin = pipeline_Binning("python_paths.csv", print_paths =False)
df_cov = pipeline_COVERAGE("python_paths.csv", print_paths =False)
df_gtdb = pipeline_GTDBTK("python_paths.csv", print_paths =False)


# Merge & save df

In [None]:
def _split_taxonomy(classif: str) -> dict:
    cols = {"Domain": None, "Phylum": None, "Class": None,
            "Order": None, "Family": None, "Genus": None, "Species": None}
    if not isinstance(classif, str):
        return cols
    for token in classif.split(";"):
        if "__" not in token:
            continue
        prefix, name = token.split("__", 1)
        if   prefix == "d": cols["Domain"]  = name
        elif prefix == "p": cols["Phylum"]  = name
        elif prefix == "c": cols["Class"]   = name
        elif prefix == "o": cols["Order"]   = name
        elif prefix == "f": cols["Family"]  = name
        elif prefix == "g": cols["Genus"]   = name
        elif prefix == "s": cols["Species"] = name
    return cols

def prepare_mag_table(df_gtdb: pd.DataFrame, df_cov: pd.DataFrame, df_bin: pd.DataFrame) -> pd.DataFrame:
    """
Builds the final MAG table as required.
Expected inputs:
- df_gtdb: columns at least ['user_genome','classification','closest_genome_reference','closest_genome_ani']
- df_cov: columns at least ['rname','endpos','numreads'] (+ optional 'sample_id')
- df_bin: columns at least ['contig','bin'] + (from DASTool_summary.tsv) 'bin_score'
Returns a DataFrame with columns:
['mag_id','genome_size','bin_score','relative_abundance',
'Domain','Phylum','Class','Order','Family','Genus','Species', 'closest_reference_genome_id','closest_reference_genome_ani']
and prints how many records were rejected due to missing values.
    """

    # 1) map contig->bin and connect to coverage
    # sanity dtype
    cov = df_cov.copy()
    cov.columns = [str(c).lstrip("#") for c in cov.columns]
    cov["endpos"]   = pd.to_numeric(cov["endpos"], errors="coerce")
    cov["numreads"] = pd.to_numeric(cov["numreads"], errors="coerce")

    contig2bin = df_bin[["contig", "bin"]].dropna().copy()

    cov_bin = cov.merge(contig2bin, left_on="rname", right_on="contig", how="inner")

# 2) genome_size: sum of contig lengths in the bin
# I take the contig length as endpos (coverage counted from 1 to endpos)
    contig_len = (cov_bin
                  .groupby(["bin", "rname"], as_index=False)["endpos"]
                  .max())  # na wypadek duplikatów rname w pliku
    genome_size = (contig_len
                   .groupby("bin", as_index=False)["endpos"]
                   .sum()
                   .rename(columns={"bin": "mag_id", "endpos": "genome_size"}))

    # 3) relative abundance: share of readings per bin
    if "sample_id" in cov_bin.columns:
        reads_per = (cov_bin.groupby(["sample_id", "bin"], as_index=False)["numreads"]
                     .sum()
                     .rename(columns={"numreads": "reads_in_bin"}))
        total_reads = (cov_bin.groupby("sample_id", as_index=False)["numreads"]
                       .sum()
                       .rename(columns={"numreads": "reads_total"}))
        rel = reads_per.merge(total_reads, on="sample_id", how="left")
        rel["relative_abundance"] = rel["reads_in_bin"] / rel["reads_total"]
        rel = rel.rename(columns={"bin": "mag_id"})[["mag_id", "relative_abundance"]]
# If I have multiple samples, duplicate mag_ids from different samples may result.
# Consolidate by sum (or average). By default, I'll take the sum of the contributions (typically 1 sample => no influence).
        rel = rel.groupby("mag_id", as_index=False)["relative_abundance"].sum()
    else:
        reads_per = (cov_bin.groupby("bin", as_index=False)["numreads"]
                     .sum()
                     .rename(columns={"numreads": "reads_in_bin"}))
        total_reads = reads_per["reads_in_bin"].sum()
        rel = reads_per.assign(relative_abundance=reads_per["reads_in_bin"] / total_reads)
        rel = rel.rename(columns={"bin": "mag_id"})[["mag_id", "relative_abundance"]]

    # 4) bin_score from DASTool_summary
    # Take unique bin_score per bin (sometimes repeated per contig).
    if "bin_score" in df_bin.columns:
        bs = (df_bin[["bin", "bin_score"]]
              .dropna(subset=["bin"])
              .drop_duplicates(subset=["bin"]))
        bs["bin_score"] = pd.to_numeric(bs["bin_score"], errors="coerce")
        bs = bs.rename(columns={"bin": "mag_id"})
    else:
        # if no column in input
        bs = pd.DataFrame(columns=["mag_id", "bin_score"])

    # 5) GTDB: taxonomy + closest genome
    gtdb = df_gtdb[["user_genome", "classification",
                    "closest_genome_reference", "closest_genome_ani"]].copy()

    tax = gtdb["classification"].apply(_split_taxonomy).apply(pd.Series)
    gtdb_clean = pd.concat([gtdb.drop(columns=["classification"]), tax], axis=1)
    gtdb_clean = gtdb_clean.rename(columns={
        "user_genome": "mag_id",
        "closest_genome_reference": "closest_reference_genome_id",
        "closest_genome_ani": "closest_reference_genome_ani"
    })
    gtdb_clean["closest_reference_genome_ani"] = pd.to_numeric(
        gtdb_clean["closest_reference_genome_ani"], errors="coerce"
    )

    # 6) Merging everything by mag_id
    merged = (genome_size
              .merge(rel, on="mag_id", how="left")
              .merge(bs, on="mag_id",  how="left")
              .merge(gtdb_clean, on="mag_id", how="left"))

    # 7) First select only the required columns
    wanted = [
        "mag_id",
        "genome_size",
        "bin_score",
        "relative_abundance",
        "Domain", "Phylum", "Class", "Order", "Family", "Genus", "Species",
        "closest_reference_genome_id",
        "closest_reference_genome_ani",
    ]
    out = merged[wanted].copy()

    # Now remove missing records and report
    before = len(out)
    out_clean = out.dropna()
    removed = before - len(out_clean)
    logger.info(f"Usunięto {removed} z {before} rekordów z brakami (NaN/NULL).")

    return out_clean

df_mag = prepare_mag_table(df_gtdb, df_cov, df_bin)

# Preview of the first few lines
print(df_mag.head())

# Writing to a file
df_mag.to_csv("MAG_table.csv", sep="\t", index=False)