# CSV → Planilhas CCIRAS

Gera planilhas de monitoramento para o CCIRAS a partir dos CSVs `vigiram-*.csv` e das contagens de pacientes positivos/negativos. O fluxo utiliza os mesmos arquivos de entrada já exportados do Metabase.

In [None]:

import pandas as pd
from pathlib import Path

try:
    NB_DIR = Path(__file__).parent.resolve()
except NameError:
    NB_DIR = Path().resolve()  # Jupyter

REPO_ROOT = NB_DIR.parent
DATA_DIR = (REPO_ROOT / "data").resolve()
OUT_DIR = (REPO_ROOT / "outputs").resolve()
DATA_DIR.mkdir(exist_ok=True)
OUT_DIR.mkdir(exist_ok=True)

print("DATA_DIR =", DATA_DIR)
print("OUT_DIR =", OUT_DIR)


In [None]:

# =========================================================
# Função de leitura robusta de CSV
# =========================================================

def read_csv_robusto(path_csv: str, sep: str = ",") -> pd.DataFrame:
    for enc in ("utf-8-sig", "utf-8", "latin-1", "cp1252"):
        try:
            return pd.read_csv(
                path_csv, sep=sep, dtype=str, encoding=enc, engine="python"
            )
        except UnicodeDecodeError:
            continue
    return pd.read_csv(
        path_csv,
        sep=sep,
        dtype=str,
        encoding="utf-8",
        encoding_errors="replace",
        engine="python",
    )


In [None]:

# =========================================================
# 1. Planilha de Isolados Detalhados
# =========================================================

def planilha_isolados(df: pd.DataFrame) -> pd.DataFrame:
    cols = {
        "data_area_executora": "Data",
        "paciente": "Paciente",
        "solicitacao": "Prontuario",
        "unidade_solicitante": "Unidade/Setor",
        "desc_material_analise": "Tipo de Amostra",
        "microorganismo": "Microrganismo",
        "Antibiótico": "Antimicrobiano Testado",
        "RSI": "Resultado",
        "Perfil de Resistência": "Observações",
    }
    existe = [c for c in cols if c in df.columns]
    return df[existe].rename(columns=cols)


In [None]:

# =========================================================
# 2. Planilha de Sensibilidade por Microrganismo
# =========================================================

def planilha_sensibilidade(df: pd.DataFrame) -> pd.DataFrame:
    base = df.dropna(subset=["microorganismo", "Antibiótico", "RSI"])
    tab = base.pivot_table(
        index=["microorganismo", "Antibiótico"],
        columns="RSI",
        values="solicitacao",
        aggfunc="count",
        fill_value=0,
    ).reset_index()
    tab["Nº Testes"] = tab.filter(regex="^(S|R|I)$").sum(axis=1)
    for col in ["S", "R", "I"]:
        if col not in tab:
            tab[col] = 0
    tab["% Sensíveis"] = (tab["S"] / tab["Nº Testes"].replace(0, pd.NA) * 100).round(2)
    tab["% Resistentes"] = (tab["R"] / tab["Nº Testes"].replace(0, pd.NA) * 100).round(2)
    tab["% Intermediários"] = (tab["I"] / tab["Nº Testes"].replace(0, pd.NA) * 100).round(2)
    return tab[["microorganismo", "Antibiótico", "Nº Testes", "% Sensíveis", "% Resistentes", "% Intermediários"]]


In [None]:

# =========================================================
# 3. Planilha de Tendência Temporal
# =========================================================

def planilha_tendencia(df: pd.DataFrame) -> pd.DataFrame:
    if "data_area_executora" not in df.columns:
        raise ValueError("coluna 'data_area_executora' ausente")
    dt = pd.to_datetime(df["data_area_executora"], errors="coerce")
    df = df.assign(Ano=dt.dt.year, Mes=dt.dt.month)
    iso = df.groupby(["Ano", "Mes", "microorganismo"])["solicitacao"].nunique().reset_index(name="Total de Isolados")
    res = df[df["RSI"].str.upper().eq("R")].groupby(["Ano", "Mes", "microorganismo"]).size().reset_index(name="Resistentes")
    out = iso.merge(res, on=["Ano", "Mes", "microorganismo"], how="left").fillna(0)
    out["% Resistentes"] = (out["Resistentes"] / out["Total de Isolados"].replace(0, pd.NA) * 100).round(2)
    return out


In [None]:

# =========================================================
# 4. Planilha de Indicadores Epidemiológicos
# =========================================================

def carregar_contagens(pos_path: str, neg_path: str):
    df_pos = read_csv_robusto(pos_path)
    df_neg = read_csv_robusto(neg_path)
    df_pos = df_pos.rename(columns={df_pos.columns[0]: "unidade_solicitante", df_pos.columns[1]: "pacientes_positivos"})
    df_neg = df_neg.rename(columns={df_neg.columns[0]: "unidade_solicitante", df_neg.columns[1]: "pacientes_negativos"})
    return df_pos, df_neg


def planilha_indicadores(df: pd.DataFrame, df_pos: pd.DataFrame, df_neg: pd.DataFrame) -> pd.DataFrame:
    cont = (
        df_pos.merge(df_neg, on="unidade_solicitante", how="outer")
        .fillna(0)
    )
    cont["total_pacientes"] = cont["pacientes_positivos"].astype(float) + cont["pacientes_negativos"].astype(float)
    iso = (
        df.groupby(["unidade_solicitante", "microorganismo"])["solicitacao"].nunique().reset_index(name="total_isolados")
    )
    res = (
        df[df["RSI"].str.upper().eq("R")]
        .groupby(["unidade_solicitante", "microorganismo"])
        .size()
        .reset_index(name="resistentes")
    )
    out = iso.merge(res, on=["unidade_solicitante", "microorganismo"], how="left").merge(cont, on="unidade_solicitante", how="left").fillna(0)
    out["Taxa de Incidência (por 1000 pacientes)"] = (
        out["total_isolados"] / out["total_pacientes"].replace(0, pd.NA) * 1000
    ).round(3)
    out["Taxa de Resistência (%)"] = (
        out["resistentes"] / out["total_isolados"].replace(0, pd.NA) * 100
    ).round(2)
    cols = [
        "unidade_solicitante",
        "microorganismo",
        "Taxa de Incidência (por 1000 pacientes)",
        "Taxa de Resistência (%)",
    ]
    return out[cols]


In [None]:

# =========================================================
# Função principal para gerar Excel
# =========================================================

def gerar_planilhas_cciras(csv_in: str, pos_counts: str, neg_counts: str, xlsx_out: str):
    df = read_csv_robusto(csv_in)
    df_pos, df_neg = carregar_contagens(pos_counts, neg_counts)

    p1 = planilha_isolados(df)
    p2 = planilha_sensibilidade(df)
    p3 = planilha_tendencia(df)
    p4 = planilha_indicadores(df, df_pos, df_neg)

    with pd.ExcelWriter(xlsx_out, engine="openpyxl") as wr:
        p1.to_excel(wr, sheet_name="Isolados Detalhados", index=False)
        p2.to_excel(wr, sheet_name="Sensibilidade", index=False)
        p3.to_excel(wr, sheet_name="Tendência Temporal", index=False)
        p4.to_excel(wr, sheet_name="Indicadores", index=False)
    print(f"✔ Gerado: {xlsx_out}")


In [None]:

# =========================================================
# Processar todos os arquivos do ano (opcional)
# =========================================================

def processar_todos_arquivos_ano(ano_2d: str, pos_counts: str, neg_counts: str):
    import re, os
    r = re.compile(rf"vigiram-[a-z]{{3}}{ano_2d}\.csv$", re.IGNORECASE)
    for nome_arq in os.listdir(DATA_DIR):
        if r.match(nome_arq):
            csv_in = DATA_DIR / nome_arq
            base = os.path.splitext(nome_arq)[0]
            xlsx_out = OUT_DIR / f"CCIRAS_{base}.xlsx"
            gerar_planilhas_cciras(csv_in, pos_counts, neg_counts, xlsx_out)
    print("✔ Finalizado para todos os arquivos do ano 20" + ano_2d)


In [None]:

# =========================================================
# Exemplo de uso
# =========================================================
# gerar_planilhas_cciras(
#     DATA_DIR / "vigiram-jan25.csv",
#     DATA_DIR / "Contagem pacientes amostra positiva.csv",
#     DATA_DIR / "Contagem pacientes amostras negativas.csv",
#     OUT_DIR / "CCIRAS_vigiram-jan25.xlsx",
# )

# processar_todos_arquivos_ano(
#     "25",
#     DATA_DIR / "Contagem pacientes amostra positiva.csv",
#     DATA_DIR / "Contagem pacientes amostras negativas.csv",
# )
