In [6]:
import os, re, unicodedata, warnings, json
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd

from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression, RidgeCV
from sklearn.model_selection import GroupKFold, KFold
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error

# ---------------- Parâmetros ----------------
PATH_XLSX = "base_logit_2001_2008.xlsx"
OUT_DIR   = "./saidas_reg"
ROUND = 3
N_SPLITS = 5
os.makedirs(OUT_DIR, exist_ok=True)

# ---------------- Normalização de nomes ----------------
def unaccent(s: str) -> str:
    return "".join(ch for ch in unicodedata.normalize("NFKD", str(s)) if not unicodedata.combining(ch))

def norm_colname(s: str) -> str:
    s = unaccent(s).lower()
    s = re.sub(r"\s+", " ", s).strip()
    s = s.replace("ebtida", "ebitda")  # corrige variante comum
    return re.sub(r"[^a-z0-9]", "", s)  # só [a-z0-9]

def normalize_df(df: pd.DataFrame) -> pd.DataFrame:
    df2 = df.copy()
    df2.columns = [norm_colname(c) for c in df.columns]
    return df2

def load_base_xlsx(path: str):
    xl = pd.ExcelFile(path)
    sheet = "base" if "base" in xl.sheet_names else max(xl.sheet_names, key=lambda s: xl.parse(s).shape[0])
    raw = xl.parse(sheet, header=0)
    norm = normalize_df(raw)
    return raw, norm, sheet

# ---------------- Localizadores ----------------
def first_match(df_norm: pd.DataFrame, tokens: list[str]):
    for c in df_norm.columns:
        if all(t in c for t in tokens):
            return c
    return None

def pick_ops_col(df_norm: pd.DataFrame):
    for p in ["ops","idops","codigoops","codops","cnpj"]:
        if p in df_norm.columns: return p
    for c in df_norm.columns:
        if re.search(r"(?:^|[^a-z0-9])(ops|idops|codigoops|codops|cnpj)(?:$|[^a-z0-9])", c):
            return c
    return None

# Alvo (DADOS_CONTAB_AUSENTES) — padrões
TARGET_PATTERNS = [
    ["dados","contab","ausent"],
    ["falt","contab"],
    ["nulo","contab"]
]

def find_target_col(df_norm: pd.DataFrame) -> str:
    for pats in TARGET_PATTERNS:
        c = first_match(df_norm, pats)
        if c: return c
    raise ValueError("Não encontrei a coluna alvo 'dados_contab_ausentes' (ou variações) na planilha.")

# Preditores — mapeamento por tokens (lista fornecida)
PRED_TOKENS = {
    "contrap_efetivas":            ["contrap","efetiv"],
    "resultado_bruto":             ["resultado","bruto"],
    "resultado_liquido":           ["resultado","liquid"],
    "ebit":                        ["ebit"],            # evitar confundir com ebitda
    "ebitda":                      ["ebitda"],
    "divida_liquida":              ["divida","liquid"],
    "resultado_financeiro_liquido":["result","finance","liquid"],
    "roa":                         ["roa"],
    "roe":                         ["roe"],
    "mlb":                         ["mlb"],
    "mll":                         ["mll"],
    "margem_ebit":                 ["margem","ebit"],
    "margem_ebitda":               ["margem","ebitda"],
    "imob":                        ["imob"],
    "endiv":                       ["endiv"],
    "ce":                          ["ce"],
    "dm":                          ["dm"],
    "dc":                          ["dc"],
    "da":                          ["da"],
    "comb":                        ["comb"],
    "comba":                       ["comba"],
    "pcmr":                        ["pcmr"],
    "pmpe":                        ["pmpe"],
    "lg":                          ["lg"],
    "lc":                          ["lc"],
    "divida_bruta_ativos":         ["divida","brut","ativo"],
    "divida_liquida_ativos":       ["divida","liquid","ativo"],
    "divida_liquida_ebit":         ["divida","liquid","ebit"],
    "divida_liquida_ebitda":       ["divida","liquid","ebitda"],
    "pl_ativos":                   ["pl","ativo"],
    "divida_bruta_pl":             ["divida","brut","pl"],
    "divida_liquida_pl":           ["divida","liquid","pl"],
    "gat":                         ["gat"],
}
SHORT_EXACT = {"roa","roe","mlb","mll","ce","dm","dc","da","comb","comba","pcmr","pmpe","lg","lc","gat","ebit","ebitda","pl"}

def find_predictors(df_norm: pd.DataFrame):
    mapping = {}
    used = set()
    cols = df_norm.columns.tolist()
    for label, toks in PRED_TOKENS.items():
        found = None
        # match exato para siglas curtas
        if len(toks)==1 and toks[0] in SHORT_EXACT:
            if toks[0] in cols:
                found = toks[0]
            else:
                for c in cols:
                    if re.search(rf"(?:^|[^a-z0-9]){re.escape(toks[0])}(?:$|[^a-z0-9])", c):
                        found = c; break
            # proteger 'ebit' ≠ 'ebitda'
            if toks[0]=="ebit" and found and "ebitda" in found:
                found = None
        # tokens (todos presentes)
        if not found:
            for c in cols:
                if toks==["ebit"] and "ebitda" in c:
                    continue
                if all(t in c for t in toks):
                    found = c; break
        mapping[label] = found if (found and found not in used) else None
        if mapping[label]:
            used.add(mapping[label])
    found_cols = [c for c in mapping.values() if c]
    return found_cols, mapping

# ---------------- Avaliação em CV ----------------
def avaliar_regressao_cv(pipe, X, y, grupos=None, n_splits=5):
    if grupos is not None:
        cv = GroupKFold(n_splits=min(n_splits, len(np.unique(grupos)))).split(X, y, grupos)
    else:
        cv = KFold(n_splits=n_splits, shuffle=True, random_state=42).split(X, y)
    r2s, maes, rmses = [], [], []
    for tr, te in cv:
        pipe.fit(X.iloc[tr], y[tr])
        p = pipe.predict(X.iloc[te])
        r2s.append(r2_score(y[te], p))
        maes.append(mean_absolute_error(y[te], p))
        rmses.append(np.sqrt(mean_squared_error(y[te], p)))
    return {"R2_CV": float(np.mean(r2s)),
            "MAE_CV": float(np.mean(maes)),
            "RMSE_CV": float(np.mean(rmses))}

# ---------------- Carregar e preparar ----------------
df_raw, df_norm, sheet = load_base_xlsx(PATH_XLSX)
col_ops = pick_ops_col(df_norm)

# Alvo
col_y = find_target_col(df_norm)
y = pd.to_numeric(df_norm[col_y], errors="coerce").values

# Preditores (SEM dummy_ano)
pred_cols, pred_map = find_predictors(df_norm)
if not pred_cols:
    raise ValueError("Nenhum preditor da lista foi encontrado. Verifique os nomes na planilha.")
X = df_norm[pred_cols].apply(pd.to_numeric, errors="coerce")

# Grupos para CV (por OPS se existir)
grupos = df_norm[col_ops].astype(str).values if col_ops else None

# ---------------- Modelos ----------------
preprocess = Pipeline(steps=[
    ("imp", SimpleImputer(strategy="median")),
    ("scaler", StandardScaler()),
])

model_ols = Pipeline(steps=[
    ("prep", preprocess),
    ("ols", LinearRegression())
])

model_ridge = Pipeline(steps=[
    ("prep", preprocess),
    ("ridge", RidgeCV(alphas=np.logspace(-3, 3, 13), store_cv_values=False))
])

# ---------------- Avaliação (CV) ----------------
met_ols  = avaliar_regressao_cv(model_ols,  X, y, grupos=grupos, n_splits=N_SPLITS)
met_rdg  = avaliar_regressao_cv(model_ridge, X, y, grupos=grupos, n_splits=N_SPLITS)

print("\n=== CV (OLS padronizado) ===", {k: round(v, ROUND) for k,v in met_ols.items()})
print("=== CV (Ridge) ===",          {k: round(v, ROUND) for k,v in met_rdg.items()})

# ---------------- Ajuste final (Ridge) ----------------
model_ridge.fit(X, y)
yhat = model_ridge.predict(X)

# In-sample
r2_in   = r2_score(y, yhat)
mae_in  = mean_absolute_error(y, yhat)
rmse_in = np.sqrt(mean_squared_error(y, yhat))
print("\n=== In-sample (Ridge) ===", {"R2_in": round(r2_in, ROUND),
                                      "MAE_in": round(mae_in, ROUND),
                                      "RMSE_in": round(rmse_in, ROUND)})

# Coeficientes padronizados (efeito por 1 desvio-padrão)
prep = model_ridge.named_steps["prep"]
X_std = prep.fit_transform(X)
ridge = model_ridge.named_steps["ridge"].fit(X_std, y)
coef_ridge = ridge.coef_
feat_names = X.columns.tolist()
coef_ridge_df = pd.DataFrame({"feature": feat_names, "coef_std": coef_ridge}).sort_values("coef_std", key=np.abs, ascending=False)
coef_ridge_df["coef_std"] = coef_ridge_df["coef_std"].round(ROUND)

# Também OLS padronizado (coeficientes)
model_ols.fit(X, y)
ols = model_ols.named_steps["ols"]
coef_ols = ols.coef_
coef_ols_df = pd.DataFrame({"feature": feat_names, "coef_std": coef_ols}).sort_values("coef_std", key=np.abs, ascending=False)
coef_ols_df["coef_std"] = coef_ols_df["coef_std"].round(ROUND)

# Predições
preds = pd.DataFrame({"y_true": np.round(y, ROUND), "y_hat": np.round(yhat, ROUND)})
if col_ops: preds["ops"] = df_norm[col_ops].astype(str).values

# ---------------- Salvar ----------------
coef_ridge_df.to_csv(os.path.join(OUT_DIR, "coeficientes_padronizados_ridge.csv"), index=False, encoding="utf-8")
coef_ols_df.to_csv(os.path.join(OUT_DIR, "coeficientes_padronizados_ols.csv"), index=False, encoding="utf-8")

pd.DataFrame([{
    **{f"ols_{k}": round(v, ROUND) for k,v in met_ols.items()},
    **{f"ridge_{k}": round(v, ROUND) for k,v in met_rdg.items()},
    "R2_in": round(r2_in, ROUND),
    "MAE_in": round(mae_in, ROUND),
    "RMSE_in": round(rmse_in, ROUND),
    "n_preditores": int(X.shape[1]),
    "n_obs": int(len(y)),
    "cv_por_ops": bool(col_ops)
}]).to_csv(os.path.join(OUT_DIR, "metricas.csv"), index=False, encoding="utf-8")

preds.to_csv(os.path.join(OUT_DIR, "predicoes_in_sample_ridge.csv"), index=False, encoding="utf-8")

# Mapeamento indicador -> coluna encontrada
mapping = []
for label, toks in PRED_TOKENS.items():
    col_found = None
    if len(toks)==1 and toks[0] in SHORT_EXACT and toks[0] in df_norm.columns:
        col_found = toks[0]
        if toks[0]=="ebit" and "ebitda" in col_found:
            col_found = None
    if not col_found:
        for c in df_norm.columns:
            if toks==["ebit"] and "ebitda" in c:
                continue
            if all(t in c for t in toks):
                col_found = c; break
    mapping.append({"indicador_canonico": label, "coluna_encontrada_norm": col_found or "(não encontrado)"})
pd.DataFrame(mapping).to_csv(os.path.join(OUT_DIR, "mapeamento_colunas.csv"), index=False, encoding="utf-8")

# (Opcional) OLS com p-valores (statsmodels, erros robustos HC3)
try:
    import statsmodels.api as sm
    X_imp = pd.DataFrame(SimpleImputer(strategy="median").fit_transform(X), columns=X.columns, index=X.index)
    X_std_sm = (X_imp - X_imp.mean()) / X_imp.std(ddof=0)
    X_sm = sm.add_constant(X_std_sm)
    ols_sm = sm.OLS(y, X_sm).fit(cov_type="HC3")
    with open(os.path.join(OUT_DIR, "ols_resumo.txt"), "w", encoding="utf-8") as f:
        f.write(str(ols_sm.summary()))
    tbl = pd.DataFrame({"coef_std": ols_sm.params, "std_err": ols_sm.bse, "t": ols_sm.tvalues, "pval": ols_sm.pvalues}).round(ROUND)
    tbl.to_csv(os.path.join(OUT_DIR, "ols_coeficientes_pvalues.csv"), encoding="utf-8")
    print("Resumo OLS (HC3) salvo.")
except Exception as e:
    print("[Aviso] Statsmodels indisponível/erro; p-valores não gerados:", e)

print("\nArquivos salvos em:", OUT_DIR)



=== CV (OLS padronizado) === {'R2_CV': -6460503.018, 'MAE_CV': 2.865, 'RMSE_CV': 136.874}
=== CV (Ridge) === {'R2_CV': -5060261.805, 'MAE_CV': 2.62, 'RMSE_CV': 125.188}

=== In-sample (Ridge) === {'R2_in': 0.031, 'MAE_in': 0.082, 'RMSE_in': np.float64(0.111)}
Resumo OLS (HC3) salvo.

Arquivos salvos em: ./saidas_reg
