
# Projeto Integrado — Deep Learning (QuantumFinance) — **Solução em CSV**

Este notebook implementa a solução solicitada no enunciado:
- Treinar modelos **CNN 1D** e **LSTM** com **TensorFlow/Keras** usando os arquivos `treino.csv` e `teste.csv` para **cada um dos 4 ativos**.
- **Relatar métricas no conjunto de teste**: **acurácia**, **matriz de confusão**, **precision** e **recall**.
- **Backtest (opcional)** simples, aplicando o sinal previsto do dia T no retorno de T+1.
- **Salvar predições** por ativo/modelo em `output/` e gerar um **resumo consolidado**.

> Estrutura esperada de diretórios (exemplo):
```
bases/
 ├─ BBAS3SA/
 │   ├─ treino.csv
 │   ├─ teste.csv
 │   └─ visualizacao.html   (não modificado pelo notebook)
 ├─ VALE3SA/
 │   ├─ treino.csv
 │   ├─ teste.csv
 │   └─ visualizacao.html
 ├─ PETR4SA/
 └─ CSNA3SA/
output/   (será criado se não existir)
```

> Observação: o arquivo `visualizacao.html` **não é alterado**. Caso deseje EDA adicional,
> o notebook pode salvar um `eda_auto_<TICKER>.html` separado, sem sobrescrever o original.


In [1]:

# ================== SETUP ==================
import os, re, warnings, json
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

warnings.filterwarnings("ignore")

# TensorFlow / Keras
try:
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras import layers
except Exception as e:
    raise RuntimeError(
        "TensorFlow/Keras não está disponível neste ambiente. "
        "Instale com `pip install tensorflow` e reinicie o kernel.\n"
        f"Erro original: {e}"
    )

from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, precision_recall_fscore_support
from sklearn.preprocessing import StandardScaler

SEED = 42
np.random.seed(SEED)
tf.random.set_seed(SEED)

# Caminhos principais (ajuste se necessário)
DATA_ROOT = Path("bases")
OUT_ROOT  = Path("output")
OUT_ROOT.mkdir(parents=True, exist_ok=True)

# Aliases de colunas para autodetecção
LABEL_ALIASES = ["label", "rotulo", "rótulo", "target", "y"]
DATE_ALIASES  = ["date", "data", "day", "dia"]
CLOSE_ALIASES = ["close", "fechamento", "close_price", "preco_fechamento", "preço_fechamento"]

WINDOW = 15  # janela de lags D-1...D-15
EPOCHS = 25
BATCH  = 64


In [2]:

# ================== FUNÇÕES AUXILIARES ==================
def find_col(cols, aliases):
    cols_norm = [c.strip().lower() for c in cols]
    for a in aliases:
        a_norm = a.lower()
        # igualdade exata
        for c in cols:
            if c.strip().lower() == a_norm:
                return c
        # contém alias
        for c in cols:
            if a_norm in c.strip().lower():
                return c
    return None

def autodiscover_pairs(root: Path):
    """
    Descobre subpastas com treino.csv e teste.csv.
    Retorna dict: {TICKER: {'train': Path, 'test': Path}}
    """
    pairs = {}
    for d in sorted([p for p in root.iterdir() if p.is_dir()]):
        tr = d/"treino.csv"
        te = d/"teste.csv"
        if tr.exists() and te.exists():
            pairs[d.name.upper()] = {"train": tr, "test": te}
    return pairs

def load_pair(train_path: Path, test_path: Path):
    df_tr = pd.read_csv(train_path)
    df_te = pd.read_csv(test_path)
    return df_tr, df_te

def prepare_xy(df: pd.DataFrame):
    """
    Detecta coluna de rótulo e 15 features (lags) numéricas.
    Normaliza (StandardScaler) e devolve X em formato (N, 15, 1) para CNN/LSTM.
    """
    cols = list(df.columns)
    label_col = find_col(cols, LABEL_ALIASES)
    date_col  = find_col(cols, DATE_ALIASES)
    close_col = find_col(cols, CLOSE_ALIASES)

    if label_col is None:
        raise ValueError("Não foi possível detectar a coluna de rótulo (use label/rotulo/target/y)."

        )
    # Heurística: use as últimas 15 colunas numéricas (exceto label)
    feat_cols = [c for c in cols if c != label_col and pd.api.types.is_numeric_dtype(df[c])]
    if len(feat_cols) < 15:
        raise ValueError(f"Menos de 15 features numéricas encontradas. Colunas numéricas: {feat_cols}")
    feat_cols = feat_cols[-15:]

    X = df[feat_cols].values
    y = df[label_col].astype(int).values

    scaler = StandardScaler()
    X = scaler.fit_transform(X)
    X3 = X.reshape(X.shape[0], X.shape[1], 1)
    meta = {"label": label_col, "date": date_col, "close": close_col, "features": feat_cols}
    return X3, y, meta

def build_cnn1d(input_shape):
    m = keras.Sequential([
        layers.Input(shape=input_shape),
        layers.Conv1D(32, 3, padding="same", activation="relu"),
        layers.Conv1D(32, 3, padding="same", activation="relu"),
        layers.GlobalAveragePooling1D(),
        layers.Dense(32, activation="relu"),
        layers.Dense(1, activation="tanh")  # saída contínua [-1,1]
    ])
    m.compile(optimizer=keras.optimizers.Adam(1e-3), loss="mse")
    return m

def build_lstm(input_shape):
    m = keras.Sequential([
        layers.Input(shape=input_shape),
        layers.LSTM(32, return_sequences=False),
        layers.Dense(32, activation="relu"),
        layers.Dense(1, activation="tanh")
    ])
    m.compile(optimizer=keras.optimizers.Adam(1e-3), loss="mse")
    return m

def evaluate_on_test(model, Xte, yte, ticker, model_name, close_series=None):
    # contínuo -> classe {-1,+1}
    y_pred_cont = model.predict(Xte, verbose=0).ravel()
    y_pred = np.where(y_pred_cont >= 0, 1, -1)

    acc = accuracy_score(yte, y_pred)
    cm  = confusion_matrix(yte, y_pred, labels=[-1, 1])
    pre, rec, f1, sup = precision_recall_fscore_support(
        yte, y_pred, labels=[-1,1], zero_division=0
    )
    report = classification_report(yte, y_pred, target_names=["SELL(-1)", "BUY(+1)"], digits=4)

    # Backtest simples
    bt = None
    if close_series is not None:
        close = close_series.astype(float).values
        ret_next = np.zeros_like(close, dtype=float)
        safe = np.where(close[:-1]==0, 1e-12, close[:-1])
        ret_next[:-1] = (close[1:] - close[:-1]) / safe
        pos = y_pred.astype(float)
        strat_ret = np.zeros_like(ret_next)
        strat_ret[:-1] = pos[:-1] * ret_next[:-1]
        bt = {
            "bh_cum": float(np.prod(1+ret_next) - 1),
            "model_cum": float(np.prod(1+strat_ret) - 1),
            "bh_avg": float(np.mean(ret_next)),
            "model_avg": float(np.mean(strat_ret)),
            "bh_vol": float(np.std(ret_next)),
            "model_vol": float(np.std(strat_ret)),
        }

    # Salvar predições
    out_csv = Path("output") / f"predicoes_{ticker}_{model_name}.csv"
    out_csv.parent.mkdir(parents=True, exist_ok=True)
    import pandas as pd
    pd.DataFrame({"y_true": yte, "y_pred": y_pred}).to_csv(out_csv, index=False)

    # Retornar dicionário para consolidação
    return {
        "ticker": ticker,
        "model": model_name,
        "accuracy": acc,
        "precision_-1": pre[0], "recall_-1": rec[0],
        "precision_+1": pre[1], "recall_+1": rec[1],
        "confusion_matrix": cm.tolist(),
        "report": report,
        "backtest": bt,
        "pred_csv": str(out_csv)
    }


In [3]:

# ================== EXECUÇÃO PRINCIPAL ==================
pairs = autodiscover_pairs(DATA_ROOT)
if not pairs:
    raise SystemExit(f"Nenhuma pasta encontrada em {DATA_ROOT.resolve()} com treino.csv/teste.csv.")

print("Ativos detectados:")
for t, p in pairs.items():
    print(f"- {t}: {p['train']} / {p['test']}")

results = []

for ticker, paths in pairs.items():
    print(f"\n=== {ticker} ===")
    df_tr, df_te = load_pair(paths["train"], paths["test"])

    # Preparar dados
    Xtr, ytr, meta_tr = prepare_xy(df_tr)
    Xte, yte, meta_te = prepare_xy(df_te)
    assert meta_tr["features"] == meta_te["features"], "Features de treino e teste não coincidem."

    # CNN 1D
    cnn = build_cnn1d(Xtr.shape[1:])
    hist1 = cnn.fit(Xtr, ytr, validation_split=0.1, epochs=EPOCHS, batch_size=BATCH, verbose=0)
    res1 = evaluate_on_test(cnn, Xte, yte, ticker, "CNN1D",
                            close_series=df_te[meta_te["close"]] if meta_te["close"] else None)
    results.append(res1)

    # LSTM
    lstm = build_lstm(Xtr.shape[1:])
    hist2 = lstm.fit(Xtr, ytr, validation_split=0.1, epochs=EPOCHS, batch_size=BATCH, verbose=0)
    res2 = evaluate_on_test(lstm, Xte, yte, ticker, "LSTM",
                            close_series=df_te[meta_te["close"]] if meta_te["close"] else None)
    results.append(res2)

# ================== CONSOLIDADO ==================
import pandas as pd, json
res_df = pd.DataFrame([{
    "Ticker": r["ticker"],
    "Modelo": r["model"],
    "Accuracy": r["accuracy"],
    "Precision(-1)": r["precision_-1"],
    "Recall(-1)": r["recall_-1"],
    "Precision(+1)": r["precision_+1"],
    "Recall(+1)": r["recall_+1"],
    "PredicoesCSV": r["pred_csv"]
} for r in results]).sort_values(["Ticker","Modelo"])

display(res_df)

# Salva CSV consolidado
consol_csv = Path("output") / "resultados_consolidados.csv"
res_df.to_csv(consol_csv, index=False)
print(f"\n[OK] Consolidado salvo em: {consol_csv}")

# Opcional: salvar relatório simples em Markdown
md_lines = ["# Resultados — Projeto Integrado (CSV)\n"]
for r in results:
    md_lines.append(f"## {r['ticker']} — {r['model']}")
    md_lines.append(f"- Accuracy: {r['accuracy']:.4f}")
    md_lines.append(f"- Predições: `{r['pred_csv']}`")
    md_lines.append("\n**Classification report:**\n")
    md_lines.append("```\n" + r["report"] + "\n```")
    if r["backtest"]:
        md_lines.append("**Backtest (retornos cumulativos aproximados):**")
        md_lines.append("```\n" + json.dumps(r["backtest"], indent=2) + "\n```")
    md_lines.append("\n")
md_path = Path("output") / "RELATORIO_RESULTADOS.md"
md_path.write_text("\n".join(md_lines), encoding="utf-8")
print(f"[OK] Relatório Markdown salvo em: {md_path}")


Ativos detectados:
- BBAS3SA: bases\BBAS3SA\treino.csv / bases\BBAS3SA\teste.csv
- CSNA3SA: bases\CSNA3SA\treino.csv / bases\CSNA3SA\teste.csv
- PETRA4SA: bases\PETRA4SA\treino.csv / bases\PETRA4SA\teste.csv
- VALE3SA: bases\VALE3SA\treino.csv / bases\VALE3SA\teste.csv

=== BBAS3SA ===

=== CSNA3SA ===

=== PETRA4SA ===

=== VALE3SA ===


Unnamed: 0,Ticker,Modelo,Accuracy,Precision(-1),Recall(-1),Precision(+1),Recall(+1),PredicoesCSV
0,BBAS3SA,CNN1D,0.84557,0.909385,0.815675,0.776014,0.887097,output\predicoes_BBAS3SA_CNN1D.csv
1,BBAS3SA,LSTM,0.752743,0.827815,0.725689,0.674699,0.790323,output\predicoes_BBAS3SA_LSTM.csv
2,CSNA3SA,CNN1D,0.852445,0.840637,0.920058,0.872979,0.759036,output\predicoes_CSNA3SA_CNN1D.csv
3,CSNA3SA,LSTM,0.844013,0.892356,0.831395,0.787156,0.861446,output\predicoes_CSNA3SA_LSTM.csv
4,PETRA4SA,CNN1D,0.80798,0.798635,0.805508,0.816856,0.810289,output\predicoes_PETRA4SA_CNN1D.csv
5,PETRA4SA,LSTM,0.791355,0.818533,0.729776,0.770803,0.848875,output\predicoes_PETRA4SA_LSTM.csv
6,VALE3SA,CNN1D,0.831255,0.781073,0.920133,0.90303,0.742525,output\predicoes_VALE3SA_CNN1D.csv
7,VALE3SA,LSTM,0.718204,0.652681,0.93178,0.881159,0.504983,output\predicoes_VALE3SA_LSTM.csv



[OK] Consolidado salvo em: output\resultados_consolidados.csv
[OK] Relatório Markdown salvo em: output\RELATORIO_RESULTADOS.md


In [4]:

# ================== EDA AUTOMÁTICA (gera eda_auto_<TICKER>.html) ==================
# Não altera o visualizacao.html fornecido. Salva um arquivo novo por pasta.

import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from plotly.offline import plot as plot_offline

def _detect_cols_for_eda(df):
    label = next((c for c in df.columns if str(c).lower() in {"label","rotulo","rótulo","target","y"}), None)
    date  = next((c for c in df.columns if str(c).lower() in {"date","data","day","dia"}), None)
    close = next((c for c in df.columns if "close" in str(c).lower() or "fech" in str(c).lower()), None)
    feats = [c for c in df.columns if c != label and pd.api.types.is_numeric_dtype(df[c])]
    feats = feats[-15:] if len(feats) >= 15 else feats
    return label, date, close, feats

def gerar_eda_html_por_ticker(DATA_ROOT=DATA_ROOT):
    for d in sorted([p for p in DATA_ROOT.iterdir() if p.is_dir()]):
        train = d / "treino.csv"; test = d / "teste.csv"
        if not (train.exists() and test.exists()):
            continue
        df_tr = pd.read_csv(train); df_te = pd.read_csv(test)
        label, date, close, feats = _detect_cols_for_eda(df_tr)
        figs = []

        # 1) Distribuição do rótulo
        if label:
            figs.append(px.histogram(df_tr, x=label, title=f"{d.name} · Distribuição do rótulo (Treino)"))
            figs.append(px.histogram(df_te, x=label, title=f"{d.name} · Distribuição do rótulo (Teste)"))

        # 2) Série de fechamento (se existir)
        if close:
            x_tr = df_tr[date] if (date and date in df_tr.columns) else df_tr.index
            x_te = df_te[date] if (date and date in df_te.columns) else df_te.index
            fig_close = make_subplots(rows=2, cols=1, shared_xaxes=False,
                                      subplot_titles=("Treino - Close", "Teste - Close"))
            fig_close.add_trace(go.Scatter(x=x_tr, y=df_tr[close], mode="lines", name="Close (Treino)"), row=1, col=1)
            fig_close.add_trace(go.Scatter(x=x_te, y=df_te[close], mode="lines", name="Close (Teste)"), row=2, col=1)
            fig_close.update_layout(title_text=f"{d.name} · Série de Fechamento")
            figs.append(fig_close)

        # 3) Correlação entre features
        if len(feats) >= 2:
            corr = df_tr[feats].corr()
            figs.append(go.Figure(data=go.Heatmap(z=corr.values, x=corr.columns, y=corr.index)).update_layout(
                title=f"{d.name} · Correlação (treino)"
            ))

        # 4) Scatter-matrix (últimas 3 features vs label)
        if label and len(feats) >= 3:
            last3 = feats[-3:]
            mdf = df_tr[last3 + [label]].copy()
            mdf[label] = mdf[label].astype(str)
            figs.append(px.scatter_matrix(mdf, dimensions=last3, color=label,
                                          title=f"{d.name} · Scatter matrix (treino)"))

        # salvar HTML concatenado
        out_html = d / f"eda_auto_{d.name.upper()}.html"
        parts = []
        for i, f in enumerate(figs, 1):
            parts.append(plot_offline(f, include_plotlyjs=(i==1), output_type="div"))
        html = "<html><head><meta charset='utf-8'><title>EDA Auto {}</title></head><body>{}</body></html>".format(
            d.name.upper(), "\n<hr/>\n".join(parts)
        )
        out_html.write_text(html, encoding="utf-8")
        print(f"[OK] EDA salvo: {out_html}")

# Descomente para rodar quando quiser gerar os HTMLs
# gerar_eda_html_por_ticker()


ModuleNotFoundError: No module named 'plotly'

In [None]:

# ================== COMPARATIVO DESTACADO (CNN 1D vs LSTM) ==================
# Usa o 'res_df' criado na execução principal.
import pandas as pd

if 'res_df' in globals():
    pivot = res_df.pivot(index="Ticker", columns="Modelo", values="Accuracy")
    styled = pivot.style.highlight_max(axis=1)
    html_path = Path("output") / "comparativo_modelos.html"
    styled.to_html(html_path)
    display(pivot)
    print(f"[OK] Comparativo salvo em: {html_path}")
else:
    print("Execute a seção principal primeiro para criar 'res_df'.")
