<a href="https://colab.research.google.com/github/mbenedicto99/RUNDECK_AI/blob/main/Rundeck_AI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# build_ai_json.py

In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Gera ai_analysis.json a partir de:
  data/execucoes.csv (projeto, job, exec_id, inicio, status, duracao_s)
  data/score.csv     (exec_id, re)
"""

import argparse, json, sys
from pathlib import Path
import pandas as pd
import numpy as np

def _fail(msg: str, code: int = 2):
    print(f"ERRO: {msg}", file=sys.stderr)
    sys.exit(code)

def _read_execucoes(exec_path: Path) -> pd.DataFrame:
    df = pd.read_csv(exec_path, dtype=str, keep_default_na=False, na_values=["", "NA", "NaN"])
    # normaliza cabeçalhos comuns (aliases) -> nomes esperados
    lower = {c.lower().strip(): c for c in df.columns}
    rename = {}
    if "project" in lower:      rename[lower["project"]]      = "projeto"
    if "job_name" in lower:     rename[lower["job_name"]]     = "job"
    if "start_time" in lower:   rename[lower["start_time"]]   = "inicio"
    if "duration_sec" in lower: rename[lower["duration_sec"]] = "duracao_s"
    df = df.rename(columns=rename)

    required = ["projeto", "job", "exec_id", "inicio", "status", "duracao_s"]
    missing = [c for c in required if c not in df.columns]
    if missing:
        _fail(f"Coluna obrigatória ausente em execucoes.csv: {', '.join(missing)}")

    # tipos mínimos
    df["duracao_s"] = pd.to_numeric(df["duracao_s"], errors="coerce")
    try:
        df["inicio"] = pd.to_datetime(df["inicio"], errors="coerce", dayfirst=True)
    except Exception:
        pass

    # limpeza
    df["exec_id"] = df["exec_id"].astype(str).str.strip()
    df = df[df["exec_id"].notna() & (df["exec_id"] != "")]
    return df

def _read_score(score_path: Path) -> pd.DataFrame:
    df = pd.read_csv(score_path, dtype=str, keep_default_na=False, na_values=["", "NA", "NaN"])
    if "exec_id" not in df.columns or "re" not in df.columns:
        _fail("Colunas obrigatórias ausentes em score.csv: exec_id, re")
    df["exec_id"] = df["exec_id"].astype(str).str.strip()
    df["re"] = pd.to_numeric(df["re"], errors="coerce")
    df = df[df["exec_id"].notna() & (df["exec_id"] != "")]
    df = df[df["re"].notna()]
    return df

def build_analysis(df_exec: pd.DataFrame, df_score: pd.DataFrame) -> dict:
    df = df_exec.merge(df_score[["exec_id", "re"]], on="exec_id", how="left")

    total = len(df)
    por_status = df["status"].fillna("desconhecido").value_counts(dropna=False).to_dict()
    duracao_med = float(np.nanmean(df["duracao_s"])) if "duracao_s" in df else None
    re_p95_global = float(np.nanpercentile(df["re"], 95)) if df["re"].notna().any() else None

    resumo = {
        "total_execucoes": int(total),
        "por_status": por_status,
        "duracao_media_s": None if (duracao_med is None or np.isnan(duracao_med)) else duracao_med,
        "re_p95_global": re_p95_global,
    }

    chave_job = ["projeto","job"] if all(c in df.columns for c in ["projeto","job"]) else ["job"]
    risco_p95_por_job = (
        df.dropna(subset=["re"]).groupby(chave_job)["re"].quantile(0.95)
          .reset_index().rename(columns={"re":"re_p95"})
          .sort_values("re_p95", ascending=False).head(200)
          .to_dict(orient="records")
    )

    def _ser(v):
        if pd.isna(v): return None
        return v.isoformat() if hasattr(v,"isoformat") else v

    hotspots = (
        df.dropna(subset=["re"])
          .sort_values("re", ascending=False)
          .loc[:, ["projeto","job","exec_id","inicio","status","duracao_s","re"]]
          .head(50).to_dict(orient="records")
    )
    hotspots = [
        {"projeto":h.get("projeto"), "job":h.get("job"), "exec_id":h.get("exec_id"),
         "inicio":_ser(h.get("inicio")), "status":h.get("status"),
         "duracao_s": None if pd.isna(h.get("duracao_s")) else float(h.get("duracao_s")),
         "re": float(h.get("re"))}
        for h in hotspots
    ]

    return {
        "resumo": resumo,
        "risco_p95_por_job": risco_p95_por_job,
        "hotspots": hotspots,
        "top_amostras": hotspots[:100],
    }

def main():
    ap = argparse.ArgumentParser(description="Gera ai_analysis.json a partir de .csv em 'data/'.")
    ap.add_argument("--data-dir", default="data")
    ap.add_argument("--out", default="app/ai_analysis.json")  # padrão canônico
    args = ap.parse_args()

    data_dir = Path(args.data_dir)
    exec_path = data_dir / "execucoes.csv"
    score_path = data_dir / "score.csv"

    if not exec_path.exists():
        _fail(f"Arquivo obrigatório não encontrado: {exec_path}")
    if not score_path.exists():
        _fail(f"Arquivo obrigatório não encontrado: {score_path}")

    df_exec = _read_execucoes(exec_path)
    df_score = _read_score(score_path)

    result = build_analysis(df_exec, df_score)

    out_path = Path(args.out)
    out_path.parent.mkdir(parents=True, exist_ok=True)
    with out_path.open("w", encoding="utf-8") as f:
        json.dump(result, f, ensure_ascii=False, indent=2)

    print(json.dumps({
        "status": "ok",
        "out": str(out_path),
        "resumo": result["resumo"],
        "counts": {
            "hotspots": len(result["hotspots"]),
            "risco_p95_por_job": len(result["risco_p95_por_job"]),
            "top_amostras": len(result["top_amostras"])
        }
    }, ensure_ascii=False))

if __name__ == "__main__":
    main()

# etl.py

In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import hashlib
from pathlib import Path
import pandas as pd
import numpy as np
from dateutil import parser

# Entradas/Saídas
INPUT_FILE   = os.getenv("INPUT_CSV", "data/slice.txt")   # pode ser .txt ou .csv
CLEAN_CSV    = os.getenv("OUTPUT_CSV", "data/clean.csv")
EXECUCOES_CSV = os.getenv("EXECUCOES_CSV", "data/execucoes.csv")

def _ensure_exists(p: str | Path):
    if not Path(p).exists():
        raise FileNotFoundError(f"Arquivo não encontrado: {p}")

def _try_read(path: str | Path) -> pd.DataFrame:
    """
    Lê CSV/TXT com robustez:
      1) tenta ; (slice.txt geralmente vem com ;) e BOM
      2) tenta , como fallback
    """
    # tentativa 1: ;
    try:
        return pd.read_csv(path, sep=';', encoding='utf-8-sig', quotechar='"', engine='python')
    except Exception:
        pass
    # tentativa 2: ,
    return pd.read_csv(path, sep=',', encoding='utf-8-sig', quotechar='"', engine='python')

def _parse_dt(x):
    if pd.isna(x) or x is None:
        return pd.NaT
    try:
        # datas pt-BR: dayfirst=True
        return parser.parse(str(x), dayfirst=True)
    except Exception:
        return pd.NaT

def _norm_status(s: pd.Series) -> pd.Series:
    st = s.astype(str).str.lower().str.strip()
    return st.replace({
        "succeeded": "success",
        "succeed":   "success",
        "successful":"success",
        "ok":        "success",
        "pass":      "success",
        "completed": "success",
        "done":      "success",
        "fail":      "failed",
        "failed":    "failed",
        "error":     "failed",
        "ko":        "failed"
    })

def _hash_exec_id(proj: str, job: str, inicio) -> str:
    base = f"{proj}|{job}|{inicio}"
    return hashlib.sha1(base.encode("utf-8"), usedforsecurity=False).hexdigest()[:16]

def main():
    _ensure_exists(INPUT_FILE)

    df_raw = _try_read(INPUT_FILE)
    if df_raw.empty:
        raise ValueError(f"{INPUT_FILE} lido mas sem linhas.")

    # normaliza cabeçalhos
    df_raw.columns = [c.strip().lower() for c in df_raw.columns]

    # mapeia possíveis nomes (aliases) vindos do slice
    # ex.: "Ended Status", "Start Time", "End Time", "Application", "Sub-Application"
    colmap = {
        "job_id":     ["job_id", "id", "execution_id"],
        "job":        ["job", "job_name", "name", "application"],
        "projeto":    ["projeto", "project", "project_name", "sub-application", "folder"],
        "status":     ["status", "result", "state", "ended status"],
        "inicio":     ["inicio", "start_time", "started_at", "start", "start time"],
        "fim":        ["fim", "end_time", "ended_at", "end", "finish_time", "end time"],
    }

    def pick(keys):
        for k in keys:
            if k in df_raw.columns:
                return df_raw[k]
        return pd.Series([None] * len(df_raw))

    df = pd.DataFrame({
        "job_id":  pick(colmap["job_id"]),
        "job":     pick(colmap["job"]),
        "projeto": pick(colmap["projeto"]),
        "status":  pick(colmap["status"]),
        "inicio":  pick(colmap["inicio"]),
        "fim":     pick(colmap["fim"]),
    })

    # parsing de datas e duração
    df["inicio"] = df["inicio"].apply(_parse_dt)
    df["fim"]    = df["fim"].apply(_parse_dt)
    df["duration_sec"] = (df["fim"] - df["inicio"]).dt.total_seconds()

    # normalização de status + saneamento
    df["status"] = _norm_status(df["status"])
    df = df.dropna(subset=["inicio"])
    df["duration_sec"] = pd.to_numeric(df["duration_sec"], errors="coerce").fillna(0.0).clip(lower=0.0)

    # defaults para texto
    df["job"] = df["job"].fillna("UNKNOWN").astype(str).str.strip()
    df["projeto"] = df["projeto"].fillna("UNKNOWN").astype(str).str.strip()

    # derivações de tempo
    df["date"]    = df["inicio"].dt.date
    df["hour"]    = df["inicio"].dt.hour
    df["weekday"] = df["inicio"].dt.weekday

    # exec_id: usa job_id se existir; senão hash determinístico de projeto|job|inicio ISO
    if df["job_id"].notna().any():
        df["exec_id"] = df["job_id"].astype(str).str.strip()
    else:
        df["exec_id"] = df.apply(lambda r: _hash_exec_id(r["projeto"], r["job"], getattr(r["inicio"], "isoformat", lambda: r["inicio"])()), axis=1)

    # ordena por inicio (opcional)
    df = df.sort_values("inicio").reset_index(drop=True)

    # salva clean.csv (mantém colunas úteis ao features.py)
    cols_clean = [
        "projeto", "job", "exec_id", "inicio", "fim", "status",
        "duration_sec", "date", "hour", "weekday"
    ]
    Path(CLEAN_CSV).parent.mkdir(parents=True, exist_ok=True)
    df[cols_clean].to_csv(CLEAN_CSV, index=False)
    print(f"[etl] Gravado {CLEAN_CSV} com {len(df)} linhas.")

    # salva execucoes.csv (layout esperado pelo build_ai_json.py)
    execucoes = df.rename(columns={
        "inicio": "inicio",
        "duration_sec": "duracao_s"
    })[["projeto", "job", "exec_id", "inicio", "status", "duracao_s"]]

    Path(EXECUCOES_CSV).parent.mkdir(parents=True, exist_ok=True)
    execucoes.to_csv(EXECUCOES_CSV, index=False)
    print(f"[etl] Gravado {EXECUCOES_CSV} com {len(execucoes)} linhas.")

    # diagnóstico rápido
    print("[etl] Amostra clean.csv:")
    print(df[cols_clean].head(3).to_string(index=False))

if __name__ == "__main__":
    main()

# pipeline.py

In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import subprocess
import sys
from pathlib import Path

STEPS = [
    ["python", "scripts/etl.py"],
    ["python", "scripts/features.py"],
    ["python", "scripts/train_rbm.py"],
    ["python", "scripts/detect_anomalies.py"],  # gera data/score.csv e (opcional) app/ai_analysis.json
    ["python", "scripts/build_ai_json.py", "--data-dir", "data", "--out", "app/ai_analysis.json"],
]

def run_step(cmd):
    print(f"[pipeline] Executando: {' '.join(cmd)}", flush=True)
    try:
        subprocess.run(cmd, check=True)
    except subprocess.CalledProcessError as e:
        print(f"[pipeline] Erro no passo: {' '.join(cmd)}", file=sys.stderr, flush=True)
        sys.exit(e.returncode)

def main():
    Path("data").mkdir(parents=True, exist_ok=True)
    Path("models").mkdir(parents=True, exist_ok=True)
    Path("app").mkdir(parents=True, exist_ok=True)

    for step in STEPS:
        run_step(step)

    out_path = Path("app/ai_analysis.json").resolve()
    print(f"[pipeline] ✔ Finalizado com sucesso.\n[pipeline] Saída: {out_path}")

if __name__ == "__main__":
    main()

# detect_anomalies.py

In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
from pathlib import Path
import numpy as np
import pandas as pd
import joblib
import json

# Entradas/Saídas (podem ser sobrescritas por env vars)
FEATS_CSV  = os.getenv("FEATS_CSV", "data/features.csv")
SCALER_JOB = os.getenv("SCALER_JOB", "models/scalers.joblib")  # salvo no train_rbm.py
RBM_JOB    = os.getenv("RBM_JOB", "models/rbm.joblib")

SCORE_CSV  = os.getenv("SCORE_CSV", "data/score.csv")          # <- requerido pelo build_ai_json.py
OUT_JSON   = os.getenv("OUT_JSON", "app/ai_analysis.json")      # <- caminho canônico do painel

def _ensure_exists(path: str | Path, kind: str):
    if not Path(path).exists():
        raise FileNotFoundError(f"{kind} não encontrado: {path}")

def _load_inputs():
    _ensure_exists(FEATS_CSV, "CSV de features")
    _ensure_exists(SCALER_JOB, "Scaler/metadata")
    _ensure_exists(RBM_JOB, "Modelo RBM")
    df = pd.read_csv(FEATS_CSV)
    meta = joblib.load(SCALER_JOB)
    rbm  = joblib.load(RBM_JOB)

    used_cols = meta.get("used_cols")
    scaler    = meta.get("scaler")
    if used_cols is None or scaler is None:
        raise ValueError("models/scalers.joblib não possui 'used_cols' e/ou 'scaler'.")

    if not all(c in df.columns for c in used_cols):
        faltando = [c for c in used_cols if c not in df.columns]
        raise ValueError(f"Colunas de features ausentes no features.csv: {faltando}")

    return df, used_cols, scaler, rbm

def _prepare_matrix(df: pd.DataFrame, used_cols, scaler):
    X = df[used_cols].copy()
    # coerção robusta pra numérico
    for c in X.columns:
        if not pd.api.types.is_numeric_dtype(X[c]):
            # troca vírgula por ponto, tenta numérico
            X[c] = pd.to_numeric(X[c].astype(str).str.replace(",", ".", regex=False), errors="coerce")
        # imputação mediana
        med = X[c].median(skipna=True)
        X[c] = X[c].fillna(med)
    # escala igual ao treino e clipa a [0,1]
    Xn = scaler.transform(X.values.astype(np.float64))
    Xn = np.clip(Xn, 0.0, 1.0)
    return Xn

def main():
    df, used_cols, scaler, rbm = _load_inputs()

    # Garantir identificador por linha
    id_col = "exec_id" if "exec_id" in df.columns else None
    if id_col is None:
        df["exec_id"] = np.arange(len(df)).astype(str)
        id_col = "exec_id"

    X = _prepare_matrix(df, used_cols, scaler)

    # Reconstrução (usando passo de Gibbs do RBM)
    V_recon = rbm.gibbs(X)
    re = np.mean((X - V_recon) ** 2, axis=1)

    # Salva score.csv para o build final
    out_df = pd.DataFrame({"exec_id": df[id_col].astype(str), "re": re.astype(float)})
    Path(SCORE_CSV).parent.mkdir(parents=True, exist_ok=True)
    out_df.to_csv(SCORE_CSV, index=False)
    print(f"[detect_anomalies] Gravado {SCORE_CSV} com {len(out_df)} linhas.")

    # (opcional) JSON leve no caminho canônico; o build_ai_json.py sobrescreve depois com o layout completo
    resumo = {
        "total_execucoes": int(len(out_df)),
        "re_p95_global": float(np.percentile(re, 95)) if len(out_df) else None
    }
    payload = {"resumo": resumo, "scores_sample": out_df.head(10).to_dict(orient="records")}
    Path(OUT_JSON).parent.mkdir(parents=True, exist_ok=True)
    with open(OUT_JSON, "w", encoding="utf-8") as f:
        json.dump(payload, f, ensure_ascii=False, indent=2)
    print(f"[detect_anomalies] Gravado JSON em {OUT_JSON}")

if __name__ == "__main__":
    main()

# features.py

In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import hashlib
from pathlib import Path
import numpy as np
import pandas as pd

INPUT_CLEAN = os.getenv("INPUT_CLEAN", "data/clean.csv")
OUTPUT_FEATS = os.getenv("OUTPUT_FEATS", "data/features.csv")

def _ensure_exists(path: str | Path):
    if not Path(path).exists():
        raise FileNotFoundError(f"Arquivo não encontrado: {path}")

def _to_datetime(s: pd.Series):
    if np.issubdtype(s.dtype, np.datetime64):
        return s
    return pd.to_datetime(s, errors="coerce", dayfirst=True, utc=False)

def _hash_exec_id(row: pd.Series) -> str:
    base = f"{row.get('projeto','')}|{row.get('job','')}|{row.get('inicio','')}"
    h = hashlib.sha1(base.encode("utf-8"), usedforsecurity=False).hexdigest()[:16]
    return h

def _minmax_01(x: pd.Series) -> pd.Series:
    v = x.astype(float)
    vmin, vmax = np.nanmin(v.values), np.nanmax(v.values)
    if not np.isfinite(vmin) or not np.isfinite(vmax) or vmax == vmin:
        # tudo igual ou inválido -> vira zeros
        return pd.Series(np.zeros(len(v)), index=v.index, dtype=float)
    return (v - vmin) / (vmax - vmin)

def _zclip_to01(x: pd.Series, clip=3.0) -> pd.Series:
    v = x.astype(float)
    mu = np.nanmean(v.values)
    sd = np.nanstd(v.values)
    if not np.isfinite(sd) or sd == 0:
        return pd.Series(np.full(len(v), 0.5), index=v.index, dtype=float)
    z = (v - mu) / sd
    z = np.clip(z, -clip, clip)
    return (z + clip) / (2 * clip)

def _cyc_enc_01(vals: pd.Series, period: int):
    # seno/cosseno em [-1,1], depois remapeia para [0,1]
    ang = 2 * np.pi * (vals.astype(float) % period) / period
    s = np.sin(ang)
    c = np.cos(ang)
    return (s + 1.0) / 2.0, (c + 1.0) / 2.0

def _p95_flags_per_job(df: pd.DataFrame, dur_col="duration_sec", key_cols=("projeto","job")) -> pd.Series:
    tmp = df.copy()
    tmp[dur_col] = pd.to_numeric(tmp[dur_col], errors="coerce").fillna(0.0).clip(lower=0.0)
    if all(k in tmp.columns for k in key_cols):
        thr = tmp.groupby(list(key_cols))[dur_col].quantile(0.95)
        thr = thr.rename("thr").reset_index()
        j = tmp[list(key_cols) + [dur_col]].merge(thr, on=list(key_cols), how="left")
        # fallback pro global se alguma chave não tiver threshold
        global_thr = float(np.percentile(tmp[dur_col].values, 95)) if len(tmp) else np.inf
        j["thr"] = j["thr"].fillna(global_thr)
        flags = (j[dur_col] > j["thr"]).astype(int)
        flags.index = df.index
        return flags
    # fallback global direto
    gthr = float(np.percentile(tmp[dur_col].values, 95)) if len(tmp) else np.inf
    return (tmp[dur_col] > gthr).astype(int)

def main():
    _ensure_exists(INPUT_CLEAN)
    df = pd.read_csv(INPUT_CLEAN)

    # Normaliza nomes esperados pelo pipeline
    # Esperado (do ETL ajustado): projeto, job, exec_id, inicio, status, duration_sec, date, hour, weekday
    # Garante colunas mínimas:
    col_alias = {c.lower(): c for c in df.columns}
    # padroniza para lower para trabalhar
    df.columns = [c.strip().lower() for c in df.columns]

    # Mapeia possíveis nomes
    rename_map = {}
    if "project" in df.columns: rename_map["project"] = "projeto"
    if "job_name" in df.columns: rename_map["job_name"] = "job"
    if "start_time" in df.columns: rename_map["start_time"] = "inicio"
    if "duration_sec" not in df.columns and "duracao_s" in df.columns:
        rename_map["duracao_s"] = "duration_sec"
    if rename_map:
        df = df.rename(columns=rename_map)

    # Conserta datetime e deriva hora/weekday se faltar
    if "inicio" in df.columns:
        df["inicio"] = _to_datetime(df["inicio"])
        df["hour"] = df["hour"] if "hour" in df.columns else df["inicio"].dt.hour
        df["weekday"] = df["weekday"] if "weekday" in df.columns else df["inicio"].dt.weekday
    else:
        # se não houver 'inicio', cria hora/weekday nulos
        df["hour"] = df.get("hour", pd.Series([np.nan]*len(df)))
        df["weekday"] = df.get("weekday", pd.Series([np.nan]*len(df)))

    # duration_sec
    if "duration_sec" not in df.columns:
        raise ValueError("Coluna 'duration_sec' ausente em data/clean.csv (verifique o ETL).")
    df["duration_sec"] = pd.to_numeric(df["duration_sec"], errors="coerce").fillna(0.0).clip(lower=0.0)

    # status → failed
    if "status" in df.columns:
        st = df["status"].astype(str).str.lower().str.strip()
        failed = st.eq("failed").astype(int)
    else:
        failed = pd.Series(np.zeros(len(df), dtype=int), index=df.index)

    # projeto/job para agregações
    if "projeto" not in df.columns: df["projeto"] = "UNKNOWN"
    if "job" not in df.columns: df["job"] = "UNKNOWN"

    # exec_id (prioriza campo existente; senão tenta job_id; se não houver, hash determinístico)
    if "exec_id" in df.columns:
        exec_id = df["exec_id"].astype(str).fillna("").str.strip()
    elif "job_id" in df.columns:
        exec_id = df["job_id"].astype(str).fillna("").str.strip()
    else:
        # cria id determinístico
        if "inicio" not in df.columns:
            df["inicio"] = pd.NaT
        exec_id = df.apply(_hash_exec_id, axis=1)

    # features
    duration_sec_mm = _minmax_01(df["duration_sec"])
    duration_z_clipped_mm = _zclip_to01(df["duration_sec"], clip=3.0)

    # hora e weekday (tratando NaN como 0)
    hour = pd.to_numeric(df["hour"], errors="coerce").fillna(0).clip(lower=0, upper=23)
    wday = pd.to_numeric(df["weekday"], errors="coerce").fillna(0).clip(lower=0, upper=6)

    hour_sin_mm, hour_cos_mm = _cyc_enc_01(hour, period=24)
    wday_sin_mm, wday_cos_mm = _cyc_enc_01(wday, period=7)

    high_runtime = _p95_flags_per_job(df, dur_col="duration_sec", key_cols=("projeto","job"))

    feats = pd.DataFrame({
        "exec_id": exec_id.astype(str),
        "duration_sec_mm": duration_sec_mm.astype(float),
        "duration_z_clipped_mm": duration_z_clipped_mm.astype(float),
        "hour_sin_mm": hour_sin_mm.astype(float),
        "hour_cos_mm": hour_cos_mm.astype(float),
        "wday_sin_mm": wday_sin_mm.astype(float),
        "wday_cos_mm": wday_cos_mm.astype(float),
        "failed": failed.astype(int),
        "high_runtime": high_runtime.astype(int),
    })

    # Diagnóstico rápido
    print("[features] linhas:", len(feats))
    print("[features] nulos por coluna:\n", feats.isna().sum())
    print("[features] amostra:\n", feats.head(3).to_string(index=False))

    # Grava
    Path(OUTPUT_FEATS).parent.mkdir(parents=True, exist_ok=True)
    feats.to_csv(OUTPUT_FEATS, index=False)
    print(f"[features] Gravado {OUTPUT_FEATS} com {len(feats.columns)-1} features (+ exec_id).")

if __name__ == "__main__":
    main()