In [None]:
# Imports y definiciones de funciones para análisis de módulos KEGG

import pandas as pd
from pathlib import Path
import requests

def leer_emapper(archivo, muestra):
    """Lee un .emapper.annotations y devuelve sample_id, query y KEGG_Module limpios."""
    # localizar la cabecera
    header = None
    with open(archivo, "r", encoding="utf-8", errors="ignore") as f:
        for line in f:
            if line.startswith("#query"):
                header = line.lstrip("#").strip().split("\t")
                break
    df = pd.read_csv(archivo, sep="\t", comment="#", names=header, dtype=str)
    df = df[["query", "KEGG_Module"]].dropna()
    df = df[df["KEGG_Module"] != "-"]
    df["KEGG_Module"] = df["KEGG_Module"].str.split(",")
    df = df.explode("KEGG_Module")
    df["KEGG_Module"] = df["KEGG_Module"].str.strip()
    df = df[df["KEGG_Module"] != ""]
    df.insert(0, "sample_id", muestra)
    return df[["sample_id", "query", "KEGG_Module"]]


def resumen_modulos(df):
    """Devuelve el conteo por muestra y módulo y la matriz de abundancia."""
    long = (
        df.groupby(["sample_id", "KEGG_Module"])["query"]
        .nunique()
        .reset_index(name="n_proteins")
    )
    matriz = long.pivot(index="sample_id", columns="KEGG_Module", values="n_proteins").fillna(0).astype(int)
    return long, matriz


def obtener_kegg():
    """Descarga el listado de módulos KEGG con nombre y descripción."""
    url = "https://rest.kegg.jp/list/module"
    r = requests.get(url, timeout=60)
    r.raise_for_status()
    lines = [l.split("\t") for l in r.text.strip().split("\n")]
    df = pd.DataFrame(lines, columns=["KEGG_Module", "Description"])
    df["KEGG_Module"] = df["KEGG_Module"].str.replace("module:", "", regex=False).str.strip()
    df["Module_name"] = df["Description"].str.split(",", n=1).str[0]
    df["Module_description"] = df["Description"]
    return df.drop_duplicates(subset=["KEGG_Module"])


def porcentaje_no_mapeado(modulos, ref):
    """Calcula el % de módulos de tus datos que no aparecen en la API de KEGG."""
    encontrados = set(ref["KEGG_Module"])
    presentes = set(modulos)
    faltan = [m for m in presentes if m not in encontrados]
    return round(100 * len(faltan) / len(presentes), 2) if presentes else 0.0


def sparsidad(matriz):
    """Devuelve el porcentaje de celdas vacías (ceros) en la matriz."""
    total = matriz.size
    ceros = (matriz == 0).sum().sum()
    return round(100 * ceros / total, 2) if total else 0.0


In [6]:
from pathlib import Path
import pandas as pd

# 1) Rutas a tus .emapper.annotations (ajusta si cambia la estructura)
base = Path.cwd()
samples = {
    "sample1": base / "sample1" / "functional_annotation" / "eggnog_mapper" / "sample1.emapper.annotations",
    "sample2": base / "sample2" / "functional_annotation" / "eggnog_mapper" / "sample2.emapper.annotations",
    "sample3": base / "sample3" / "functional_annotation" / "eggnog_mapper" / "sample3.emapper.annotations",
}

In [None]:
# Leemos muestras disponibles
mods_list = []
for sid, f in samples.items():
    if f.exists():
        mods_list.append(leer_emapper(f, sid))
    else:
        print(f"[aviso] no existe: {f} — omito {sid}")

if not mods_list:
    raise SystemExit("No hay muestras válidas.")

mods_long = pd.concat(mods_list, ignore_index=True)

In [None]:
# Conteos y matriz
long_counts, matriz = resumen_modulos(mods_long)

print("Matriz (muestras x módulos):", matriz.shape)
display(matriz.head())

Matriz (muestras x módulos): (3, 219)


KEGG_Module,M00001,M00002,M00003,M00004,M00005,M00007,M00009,M00010,M00011,M00012,...,M00806,M00809,M00816,M00817,M00840,M00841,M00842,M00843,M00844,M00845
sample_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
sample1,28,13,15,13,2,6,7,2,5,1,...,2,1,1,1,5,4,1,1,1,3
sample2,31,13,18,21,4,15,7,3,4,2,...,2,0,2,2,4,6,2,2,3,4
sample3,28,13,15,13,2,6,7,2,5,1,...,2,1,1,1,5,4,1,1,1,3


In [12]:
# 4) Sparsity
sp = sparsidad(matriz)
print(f"Sparsity global: {sp}%  |  Densidad (no-ceros): {round(100-sp, 2)}%")


Sparsity global: 7.76%  |  Densidad (no-ceros): 92.24%


In [13]:
# % de módulos no mapeados en KEGG
try:
    kmap = obtener_kegg()
    pct_unmapped = porcentaje_no_mapeado(matriz.columns, kmap)
    print(f"% de módulos presentes NO encontrados en la API de KEGG: {pct_unmapped}%")
    # (opcional) añadir nombres al largo
    long_counts_annot = long_counts.merge(kmap, on="KEGG_Module", how="left")
except Exception as e:
    print("[No se pudo consultar KEGG.")
    print("detalle:", e)
    kmap = None
    long_counts_annot = long_counts.copy()
    long_counts_annot["Module_name"] = long_counts_annot["KEGG_Module"]
    long_counts_annot["Module_description"] = "NA"

% de módulos presentes NO encontrados en la API de KEGG: 47.49%


In [14]:
# Guardar resultados
out = Path("kegg_multi_out"); out.mkdir(exist_ok=True, parents=True)
mods_long.to_csv(out / "mods_long.csv", index=False)  # sample_id, query, KEGG_Module (exploded)
long_counts.to_csv(out / "module_counts_long.csv", index=False)
matriz.to_csv(out / "abundance_matrix_samples_x_modules.csv")   # filas=muestras
matriz.T.to_csv(out / "abundance_matrix_modules_x_samples.csv") # filas=módulos
long_counts_annot.to_csv(out / "module_counts_long_annot.csv", index=False)

print("Guardados en:", out.resolve())

Guardados en: C:\Users\Andre\OneDrive\Escritorio\UE\TFM\test_results\test_results\kegg_multi_out
