Teste

In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Preprocessing.py

EN:
Main ingestion and preprocessing script for TB dataset.
Filters only pulmonary TB (forma == 1), detects date/case/population columns,
performs minimal preprocessing (missing handling & outlier marking),
and writes clean mnemonic CSVs into the folder 'processed_data/'.

PT-BR:
Script principal de ingestão e pré-processamento do conjunto de TB.
Filtra apenas tuberculose pulmonar (forma == 1), detecta colunas de data/casos/população,
executa pré-processamento mínimo (tratamento de missing e marcação de outliers),
e grava CSVs mnemônicos na pasta 'processed_data/'.
"""

# =======================================================
# GLOBAL DEBUGGING FLAGS
# =======================================================
DEBUG = True          # True = show debug messages / False = silent
LANGUAGE = "EN"       # "EN" or "PT"
# =======================================================

from pathlib import Path
import pandas as pd
import numpy as np
from dateutil import parser
import unicodedata
from typing import Optional


# =======================================================
# DEBUG PRINT FUNCTION
# =======================================================
def debug(msg_en: str, msg_pt: str = ""):
    """
    EN:
        Prints debug messages depending on DEBUG flag and LANGUAGE setting.
    PT-BR:
        Imprime mensagens de debug dependendo da flag DEBUG e configuração LANGUAGE.
    """
    if not DEBUG:
        return
    if LANGUAGE.upper() == "EN":
        print(f"[DEBUG] {msg_en}")
    else:
        print(f"[DEBUG] {msg_pt if msg_pt else msg_en}")


# =======================================================
# CONFIG
# =======================================================

PATH_XLSX = "tb.xlsx"           # original data file
SHEET_NAME: Optional[str] = None
FORMA_COL = "forma"
PULMONARY_CODE = 1
OUTPUT_FOLDER = Path("processed_data")

MIN_ROWS_FOR_MONTHLY = 12
REMOVE_OUTLIERS = False


# =======================================================
# UTILITIES
# =======================================================

def normalize_colname(s: str) -> str:
    """Normalize column name (EN/PT)."""
    s = str(s)
    s = unicodedata.normalize("NFKD", s).encode("ASCII", "ignore").decode()
    return s.strip().lower().replace(" ", "_")


def detect_best_sheet(path: str) -> str:
    """Auto-selects sheet by keyword score."""
    xls = pd.ExcelFile(path)
    sheets = xls.sheet_names
    keywords = ["tb", "tuberculose", "dados", "notificado", "notific", "registro", "caso", "case", "base", "raw"]
    scores = []

    for sh in sheets:
        try:
            df_head = pd.read_excel(path, sheet_name=sh, nrows=5)
            cols_text = " ".join([str(c).lower() for c in df_head.columns])
            score = sum(k in cols_text for k in keywords) + sum(k in sh.lower() for k in keywords)
            scores.append((sh, score))
        except:
            scores.append((sh, 0))

    scores.sort(key=lambda x: x[1], reverse=True)
    chosen = scores[0][0] if scores else sheets[0]

    debug(
        f"Auto-selected sheet: {chosen}",
        f"Sheet selecionada automaticamente: {chosen}"
    )
    return chosen


def read_data(path: str, sheet: Optional[str]):
    """Reads sheet and normalizes columns."""
    if sheet is None:
        sheet = detect_best_sheet(path)

    df = pd.read_excel(path, sheet_name=sheet)
    df.columns = [normalize_colname(c) for c in df.columns]

    debug(
        f"Read sheet '{sheet}' with shape {df.shape}",
        f"Lida sheet '{sheet}' com shape {df.shape}"
    )
    return df


def try_parse_date_column(series: pd.Series) -> pd.Series:
    """Robust date parsing."""
    parsed = []
    for v in series:
        try:
            if pd.isna(v):
                parsed.append(pd.NaT)
            else:
                parsed.append(parser.parse(str(v), dayfirst=True))
        except:
            parsed.append(pd.NaT)
    return pd.to_datetime(parsed)


def find_column_by_candidates(df: pd.DataFrame, candidates: list) -> Optional[str]:
    """Finds first column matching substrings."""
    for cand in candidates:
        for c in df.columns:
            if cand in c:
                return c
    return None


def mark_outliers_iqr(series: pd.Series, multiplier: float = 1.5) -> pd.Series:
    """Marks outliers using IQR rule."""
    q1 = series.quantile(0.25)
    q3 = series.quantile(0.75)
    iqr = q3 - q1
    lower = q1 - multiplier * iqr
    upper = q3 + multiplier * iqr
    return (series < lower) | (series > upper)


# =======================================================
# MAIN PIPELINE
# =======================================================

def process_tb(path_xlsx, sheet_name, forma_col, pulmonary_code, output_folder):
    """Core processing pipeline (EN/PT)."""

    df = read_data(path_xlsx, sheet_name)
    output_folder.mkdir(exist_ok=True)

    forma_col_norm = normalize_colname(forma_col)
    if forma_col_norm not in df.columns:
        candidate = find_column_by_candidates(df, ["forma", "tipo", "class", "clin"])
        if candidate:
            forma_col_norm = candidate
            debug(
                f"Detected forma column -> {candidate}",
                f"Coluna 'forma' detectada -> {candidate}"
            )
        else:
            debug(
                "ERROR: 'forma' column not found.",
                "ERRO: coluna 'forma' não encontrada."
            )
            raise SystemExit(1)

    # Show distribution (debug)
    if forma_col_norm in df.columns:
        try:
            vc = df[forma_col_norm].value_counts()
            debug(
                f"Top forma values:\n{vc}",
                f"Valores na coluna forma:\n{vc}"
            )
        except:
            pass

    # Filter pulmonary TB
    df_filtered = pd.DataFrame()
    try:
        mask = pd.to_numeric(df[forma_col_norm], errors="coerce") == pulmonary_code
        if mask.sum() > 0:
            df_filtered = df.loc[mask].copy()
            debug(
                f"Filtered {df_filtered.shape[0]} rows by numeric code {pulmonary_code}",
                f"Filtradas {df_filtered.shape[0]} linhas usando código numérico {pulmonary_code}"
            )
    except:
        pass

    if df_filtered.empty:
        mask = df[forma_col_norm].astype(str).str.strip().str.lower() == str(pulmonary_code)
        df_filtered = df.loc[mask].copy()
        debug(
            f"Filtered {df_filtered.shape[0]} rows by string match",
            f"Filtradas {df_filtered.shape[0]} linhas via comparação por string"
        )

    if df_filtered.empty:
        debug(
            "Filtering returned ZERO rows. Saving a sample for inspection.",
            "Filtro retornou ZERO linhas. Salvando amostra para inspeção."
        )
        df.head(100).to_csv(output_folder / "sample_inspection.csv", index=False)
        raise SystemExit(1)

    # Look for columns
    DATE_CAND = ["data", "date", "data_notificacao", "dt_notif"]
    YEAR_CAND = ["ano", "year"]
    MONTH_CAND = ["mes", "month"]
    CASE_CAND = ["casos", "cases", "n_casos", "count", "numero"]
    POP_CAND = ["pop", "population", "populacao"]

    date_col = find_column_by_candidates(df_filtered, DATE_CAND)
    year_col = find_column_by_candidates(df_filtered, YEAR_CAND)
    month_col = find_column_by_candidates(df_filtered, MONTH_CAND)
    case_col = find_column_by_candidates(df_filtered, CASE_CAND)
    pop_col = find_column_by_candidates(df_filtered, POP_CAND)

    debug(
        f"Detected: date={date_col}, year={year_col}, month={month_col}, cases={case_col}, pop={pop_col}",
        f"Detectado: data={date_col}, ano={year_col}, mes={month_col}, casos={case_col}, pop={pop_col}"
    )

    # Date parsing
    if date_col:
        df_filtered["date_parsed"] = try_parse_date_column(df_filtered[date_col])
        df_filtered["year"] = df_filtered["date_parsed"].dt.year
        df_filtered["month"] = df_filtered["date_parsed"].dt.month
        debug(
            "Parsed date column and extracted year/month.",
            "Parseada coluna data e extraído ano/mês."
        )
    else:
        if year_col:
            df_filtered["year"] = pd.to_numeric(df_filtered[year_col], errors="coerce")
        if month_col:
            df_filtered["month"] = pd.to_numeric(df_filtered[month_col], errors="coerce")

    # Case handling
    if case_col:
        df_filtered["cases_raw"] = pd.to_numeric(df_filtered[case_col], errors="coerce").fillna(0).astype(int)
        debug(
            f"Using explicit case column '{case_col}'",
            f"Usando coluna explícita de casos '{case_col}'"
        )
    else:
        df_filtered["cases_raw"] = 1
        debug(
            "No case column found -> assuming one row = one case.",
            "Coluna de casos não encontrada -> assumindo 1 linha = 1 caso."
        )

    # Population mapping
    population_map = None
    if pop_col and year_col:
        try:
            df_pop = df[[year_col, pop_col]].drop_duplicates()
            df_pop = df_pop.dropna()
            population_map = dict(zip(df_pop[year_col], df_pop[pop_col]))
            debug(
                "Population mapping created.",
                "Mapeamento de população criado."
            )
        except:
            debug(
                "Could not create population map.",
                "Não foi possível criar mapa de população."
            )

    # ---------------------------
    # Aggregations
    # ---------------------------
    # Annual
    if "year" in df_filtered.columns:
        annual = (
            df_filtered.groupby("year")["cases_raw"]
            .sum()
            .reset_index()
            .rename(columns={"cases_raw": "cases"})
        )

        if population_map:
            annual["population"] = annual["year"].map(population_map)
            annual["incidence_per_100k"] = (annual["cases"] / annual["population"]) * 1e5
        else:
            annual["population"] = pd.NA
            annual["incidence_per_100k"] = pd.NA

        annual_path = output_folder / "annual_pulmonary_tb_by_year.csv"
        annual.to_csv(annual_path, index=False)

        debug(
            f"Annual CSV saved: {annual_path}",
            f"CSV anual salvo: {annual_path}"
        )

    # Monthly
    monthly = None
    if "year" in df_filtered.columns and "month" in df_filtered.columns:
        valid = df_filtered["month"].notna().sum()
        if valid > 0:
            if "nu_notific" in df_filtered.columns:
                monthly = (
                    df_filtered
                    .groupby(["year", "month"])["nu_notific"]
                    .nunique()
                    .reset_index()
                    .rename(columns={"nu_notific": "cases"})
                )
            else:
                # fallback: count rows if no notification ID exists
                monthly = (
                    df_filtered
                    .groupby(["year", "month"])
                    .size()
                    .reset_index(name="cases")
                )

            monthly["date"] = pd.to_datetime(
                monthly["year"].astype(int).astype(str)
                + "-"
                + monthly["month"].astype(int).astype(str)
                + "-01",
                errors="coerce",
            )

            monthly_path = output_folder / "monthly_pulmonary_tb_by_month.csv"
            monthly.to_csv(monthly_path, index=False)

            debug(
                f"Monthly CSV saved: {monthly_path}",
                f"CSV mensal salvo: {monthly_path}"
            )

            # OPTIONAL outlier flag
            monthly["is_outlier_iqr"] = mark_outliers_iqr(monthly["cases"])
            flagged_path = output_folder / "monthly_pulmonary_tb_by_month_flagged_outliers.csv"
            monthly.to_csv(flagged_path, index=False)

            debug(
                f"Monthly with outlier flags saved: {flagged_path}",
                f"Mensal com outliers marcado salvo: {flagged_path}"
            )

    # Save filtered rows
    filtered_path = output_folder / "filtered_pulmonary_tb_rows.csv"
    df_filtered.to_csv(filtered_path, index=False)

    debug(
        f"Filtered pulmonary rows saved: {filtered_path}",
        f"Linhas filtradas salvas: {filtered_path}"
    )

    debug(
        "Processing complete.",
        "Processamento concluído."
    )

# =======================================================
# ENTRY POINT
# =======================================================
if __name__ == "__main__":
    process_tb(
        path_xlsx=PATH_XLSX,
        sheet_name=SHEET_NAME,
        forma_col=FORMA_COL,
        pulmonary_code=PULMONARY_CODE,
        output_folder=OUTPUT_FOLDER,
    )

[DEBUG] Auto-selected sheet: DADOSTB
[DEBUG] Read sheet 'DADOSTB' with shape (30282, 102)
[DEBUG] Top forma values:
forma
1.0    27749
2.0     2240
3.0      283
Name: count, dtype: int64
[DEBUG] Filtered 27749 rows by numeric code 1
[DEBUG] Detected: date=dt_notific, year=nu_ano, month=None, cases=nu_numero, pop=None
[DEBUG] Parsed date column and extracted year/month.
[DEBUG] Using explicit case column 'nu_numero'
[DEBUG] Annual CSV saved: processed_data\annual_pulmonary_tb_by_year.csv
[DEBUG] Monthly CSV saved: processed_data\monthly_pulmonary_tb_by_month.csv
[DEBUG] Monthly with outlier flags saved: processed_data\monthly_pulmonary_tb_by_month_flagged_outliers.csv
[DEBUG] Filtered pulmonary rows saved: processed_data\filtered_pulmonary_tb_rows.csv
[DEBUG] Processing complete.
