<a href="https://colab.research.google.com/github/souzamichel/ml_trading_test_code/blob/evolu%C3%A7%C3%A3o/ml_trading_por_reforco.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [8]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import yfinance as yf
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random

class TradingEnv:
    """
    Ambiente de trading com três ações: 0=Manter, 1=Comprar, 2=Vender.
    Estado enriquecido = (bin retorno instantâneo, posição atual, bin média móvel de retorno).
    Usa stops dinâmicos baseados em ATR, custos fixos, custos percentuais, slippage e recompensa percentual.
    """
    def __init__(
        self,
        prices,
        highs,
        lows,
        n_return_bins=10,
        initial_capital=1_000,
        lot_size=1,
        transaction_cost_pct=0.001,
        slippage_pct=0.001,
        atr_window=14,
        sl_atr_mult=0.5,
        tp_atr_mult=4.0,
        risk_aversion=0.0,
        fixed_trade_cost=0.5,           # ajuste: custo fixo por trade
        momentum_filter=True,           # ajuste: sinal de confirmação
        ma_window=5
    ):
        print(f"[Env] Inicializando TradingEnv: {len(prices)} candles, ATR window={atr_window}, SL_mult={sl_atr_mult}, TP_mult={tp_atr_mult}")
        self.prices = np.asarray(prices).flatten()
        self.highs  = np.asarray(highs).flatten()
        self.lows   = np.asarray(lows).flatten()
        if self.prices.size < atr_window + 1:
            raise ValueError("Série muito curta para ATR.")

        # Retornos e bins
        diffs       = np.diff(self.prices)
        prev_prices = self.prices[:-1]
        self.returns = diffs / prev_prices
        self.n_bins  = n_return_bins
        self.bins    = np.linspace(self.returns.min(),
                                   self.returns.max(),
                                   self.n_bins + 1)

        # Média móvel de retornos
        self.ma_window = ma_window
        self.return_ma = pd.Series(self.returns)\
                            .rolling(window=ma_window, min_periods=1)\
                            .mean()\
                            .to_numpy()

        # Parâmetros de trade
        self.initial_capital      = initial_capital
        self.lot_size             = lot_size
        self.transaction_cost_pct = transaction_cost_pct
        self.slippage_pct         = slippage_pct
        self.fixed_trade_cost     = fixed_trade_cost    # ajuste

        # Ajustes extras
        self.momentum_filter = momentum_filter         # ajuste

        # Parâmetros ATR stops
        self.atr_window  = atr_window
        self.sl_atr_mult = sl_atr_mult
        self.tp_atr_mult = tp_atr_mult

        # Calcular ATR
        tr1    = self.highs[1:] - self.lows[1:]
        tr2    = np.abs(self.highs[1:] - self.prices[:-1])
        tr3    = np.abs(self.lows[1:]  - self.prices[:-1])
        tr_max = np.maximum.reduce([tr1, tr2, tr3])
        atr    = pd.Series(tr_max)\
                    .rolling(window=atr_window, min_periods=1)\
                    .mean()\
                    .bfill()
        self.atr = np.concatenate([[atr.iloc[0]], atr.to_numpy()])
        # ajuste: limiar mínimo de volatilidade para entradas
        self.atr_median = np.median(self.atr)

        self.reset()

    def reset(self):
        self.t           = 0
        self.position    = 0
        self.cash        = self.initial_capital
        self.inventory   = 0
        self.entry_price = None
        return self._get_state()

    def _get_state(self):
        ret     = self.returns[self.t] if self.t < len(self.returns) else self.returns[-1]
        bin_idx = np.clip(np.digitize(ret, self.bins) - 1, 0, self.n_bins - 1)

        ma_ret     = self.return_ma[self.t] if self.t < len(self.return_ma) else self.return_ma[-1]
        ma_bin_idx = np.clip(np.digitize(ma_ret, self.bins) - 1, 0, self.n_bins - 1)

        pos_idx = self.position
        return (bin_idx, pos_idx, ma_bin_idx)

    def step(self, action):
        price_prev      = self.prices[self.t]
        self.t         += 1
        price_now       = self.prices[self.t]
        prev_portfolio = self.cash + self.inventory * price_prev

        # volume pode ser fixo ou proporcional
        volume = self.lot_size

        # BUY: só se cumprir filtros de volatilidade e momentum
        if action == 1:
            current_atr = self.atr[self.t]
            # ajuste: só compra se ATR > mediana e retorno anterior positivo
            prev_ret = self.returns[self.t-1] if self.t-1 < len(self.returns) else 0
            if current_atr > self.atr_median and (not self.momentum_filter or prev_ret > 0):
                buy_price  = price_now * (1 + self.slippage_pct)
                cost_pct   = buy_price * volume * (1 + self.transaction_cost_pct)
                total_cost = cost_pct + self.fixed_trade_cost
                if self.cash >= total_cost:
                    self.cash       -= total_cost
                    self.inventory  += volume
                    self.position    = 1
                    self.entry_price = price_now

        # SELL: padrão, mas com custo fixo
        elif action == 2 and self.inventory >= volume:
            sell_price = price_now * (1 - self.slippage_pct)
            proceeds_pct = sell_price * volume * (1 - self.transaction_cost_pct)
            total_proceeds = proceeds_pct - self.fixed_trade_cost
            self.cash      += total_proceeds
            self.inventory -= volume
            self.position   = 0
            self.entry_price = None

        # Stops dinâmicos (SL / TP)
        if self.position == 1 and self.entry_price is not None:
            atr      = self.atr[self.t]
            sl_price = self.entry_price - self.sl_atr_mult * atr
            tp_price = self.entry_price + self.tp_atr_mult * atr
            if price_now <= sl_price or price_now >= tp_price:
                exit_price     = price_now * (1 - self.slippage_pct)
                exit_proceeds  = exit_price * volume * (1 - self.transaction_cost_pct)
                exit_proceeds -= self.fixed_trade_cost      # ajuste
                self.cash       += exit_proceeds
                self.inventory  -= volume
                self.position    = 0
                self.entry_price = None

        curr_portfolio = self.cash + self.inventory * price_now
        raw_reward     = curr_portfolio - prev_portfolio
        reward         = raw_reward / prev_portfolio if prev_portfolio > 0 else 0.0

        done       = (self.t == len(self.prices) - 1)
        next_state = self._get_state()
        return next_state, reward, done


class DQNAgent:
    """
    Agente DQN com epsilon-greedy e replay buffer.
    """
    def __init__(
        self,
        n_return_bins,
        n_position_states,
        n_actions,
        lr=1e-3,
        gamma=0.99,
        epsilon=1.0,
        epsilon_decay=0.9999,
        epsilon_min=0.01,
        batch_size=64,
        target_update=200,
        memory_size=10000,
        device=None
    ):
        state_dim = 2 * n_return_bins + n_position_states
        print(f"[Agent] state_dim={state_dim}, actions={n_actions}")
        self.n_return_bins     = n_return_bins
        self.n_position_states = n_position_states
        self.state_dim         = state_dim
        self.n_actions         = n_actions
        self.gamma             = gamma
        self.epsilon           = epsilon
        self.epsilon_decay     = epsilon_decay
        self.epsilon_min       = epsilon_min
        self.batch_size        = batch_size
        self.target_update     = target_update
        self.memory            = deque(maxlen=memory_size)
        self.steps_done        = 0
        self.device            = device or ("cuda" if torch.cuda.is_available() else "cpu")

        class Net(nn.Module):
            def __init__(self, inp, out):
                super().__init__()
                self.layers = nn.Sequential(
                    nn.Linear(inp, 128),
                    nn.ReLU(),
                    nn.Linear(128, 128),
                    nn.ReLU(),
                    nn.Linear(128, out)
                )
            def forward(self, x):
                return self.layers(x)

        self.policy_net = Net(self.state_dim, n_actions).to(self.device)
        self.target_net = Net(self.state_dim, n_actions).to(self.device)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=lr)
        self.loss_fn   = nn.MSELoss()

    def _one_hot(self, state):
        vec = np.zeros(self.state_dim, dtype=np.float32)
        vec[state[0]] = 1.0
        vec[self.n_return_bins + state[1]] = 1.0
        offset = self.n_return_bins + self.n_position_states
        vec[offset + state[2]] = 1.0
        return vec

    def select_action(self, state):
        self.steps_done += 1
        if random.random() < self.epsilon:
            return random.randrange(self.n_actions)
        v = torch.tensor(self._one_hot(state),
                         dtype=torch.float32,
                         device=self.device).unsqueeze(0)
        with torch.no_grad():
            q = self.policy_net(v)
        return q.argmax().item()

    def remember(self, s, a, r, ns, done):
        self.memory.append((s, a, r, ns, done))

    def learn(self):
        if len(self.memory) < self.batch_size:
            return
        batch = random.sample(self.memory, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        states_np      = np.array([self._one_hot(s) for s in states], dtype=np.float32)
        next_states_np = np.array([self._one_hot(s) for s in next_states], dtype=np.float32)

        states      = torch.from_numpy(states_np).to(self.device)
        next_states = torch.from_numpy(next_states_np).to(self.device)
        actions     = torch.tensor(actions, device=self.device).unsqueeze(1)
        rewards     = torch.tensor(rewards, dtype=torch.float32, device=self.device).unsqueeze(1)
        dones       = torch.tensor(dones,   dtype=torch.float32, device=self.device).unsqueeze(1)

        q_values      = self.policy_net(states).gather(1, actions)
        next_q_values = self.target_net(next_states).max(1)[0].unsqueeze(1)
        q_targets     = rewards + self.gamma * next_q_values * (1 - dones)

        loss = self.loss_fn(q_values, q_targets)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        if self.steps_done % self.target_update == 0:
            self.target_net.load_state_dict(self.policy_net.state_dict())

        self.epsilon = max(self.epsilon * self.epsilon_decay, self.epsilon_min)

def train_agent(env, agent, n_episodes=200):
    print(f"[Train] {n_episodes} episódios...")
    history = {'episode_rewards': []}
    for ep in range(1, n_episodes + 1):
        state = env.reset()
        total_reward = 0
        done = False
        while not done:
            action = agent.select_action(state)
            next_state, reward, done = env.step(action)
            agent.remember(state, action, reward, next_state, done)
            agent.learn()
            state = next_state
            total_reward += reward
        history['episode_rewards'].append(total_reward)
        if ep % 50 == 0:
            print(f"[Train] Ep {ep}/{n_episodes}, Rew={total_reward:.2f}, ε={agent.epsilon:.3f}")
    print("[Train] Concluído.")
    return history


def evaluate_agent(env, agent, exploration=True):
    print("[Eval] Teste com ε exploratório" if exploration else "[Eval] Teste sem exploração")
    orig_eps = agent.epsilon
    if exploration:
        agent.epsilon = 0.1

    state = env.reset()
    portfolio        = [env.initial_capital]
    action_log       = []
    trade_logs       = []
    position_hist    = []
    entry            = None
    price_series     = env.prices.copy()
    tc               = env.transaction_cost_pct
    sl               = env.slippage_pct
    vol              = env.lot_size

    cum_reward = 0
    done = False
    while not done:
        action = agent.select_action(state)
        action_log.append(action)
        next_state, reward, done = env.step(action)

        # posição e PnL
        position_hist.append(env.position)
        if env.position == 1 and entry is None:
            entry = {'price': env.entry_price, 'idx': env.t}
        if entry and env.position == 0:
            idx_exit = env.t
            mkt_price = price_series[idx_exit]
            buy_p  = entry['price'] * (1 + sl)
            cost   = buy_p * vol * (1 + tc)
            sell_p = mkt_price      * (1 - sl)
            prof   = sell_p * vol   * (1 - tc) - cost
            trade_logs.append({
                'entry_idx': entry['idx'],
                'exit_idx':  idx_exit,
                'pnl':       prof
            })
            entry = None

        state = next_state
        cum_reward += reward
        portfolio.append(env.cash + env.inventory * env.prices[env.t])

    if exploration:
        agent.epsilon = orig_eps

    print(f"[Eval] Lucro final: {cum_reward:.2f}")
    plt.figure(figsize=(8,4))
    plt.plot(portfolio, label='Portfólio')
    plt.title('Evolução no teste')
    plt.show()

    df_tr = pd.DataFrame(trade_logs)
    print("\nTrades:\n", df_tr)
    df_tr.to_csv("trade_logs.csv", index=False)

    plt.figure(figsize=(12,2))
    plt.plot(position_hist, drawstyle='steps-post')
    plt.yticks([0,1], ['Flat','Long'])
    plt.title('Posição ao longo do tempo')
    plt.show()

    return portfolio, action_log


def compute_metrics(portfolio, action_log):
    returns = np.diff(portfolio)
    sharpe  = (np.mean(returns) / np.std(returns) * np.sqrt(252)) if np.std(returns)>0 else 0
    trades  = sum(1 for a in action_log if a in [1,2])
    peak    = np.maximum.accumulate(portfolio)
    dd      = peak - portfolio
    return sharpe, trades, dd


def main():
    ticker = "AAPL"
    start  = "2015-01-01"
    end    = "2025-01-01"
    df = yf.download(ticker, start=start, end=end, auto_adjust=False)
    prices = df["Close"].to_numpy()
    highs  = df["High"].to_numpy()
    lows   = df["Low"].to_numpy()

    n_bins     = 20
    capital    = 1_000
    lot        = 1     # lote ajustado a 1 ação
    tc, sl     = 0.001, 0.002
    atr_w      = 14
    sl_mult, tp_mult = 0.5, 4.0  # novos multipliers

    N          = len(prices)
    train_sz   = int(N*0.6)
    test_sz    = int(N*0.2)
    step       = test_sz

    all_results = []
    last_log    = []
    last_pf     = []
    periods     = 0

    for i in range(0, N-train_sz-test_sz+1, step):
        periods += 1
        tr_p = prices[i : i+train_sz]
        tr_h = highs [i : i+train_sz]
        tr_l = lows  [i : i+train_sz]
        te_p = prices[i+train_sz : i+train_sz+test_sz]
        te_h = highs [i+train_sz : i+train_sz+test_sz]
        te_l = lows  [i+train_sz : i+train_sz+test_sz]

        env_tr = TradingEnv(tr_p, tr_h, tr_l,
                            n_return_bins=n_bins,
                            initial_capital=capital,
                            lot_size=lot,
                            transaction_cost_pct=tc,
                            slippage_pct=sl,
                            atr_window=atr_w,
                            sl_atr_mult=sl_mult,
                            tp_atr_mult=tp_mult,
                            risk_aversion=0.0,
                            ma_window=5)

        agent = DQNAgent(n_return_bins=n_bins,
                         n_position_states=3,
                         n_actions=3)

        history = train_agent(env_tr, agent, n_episodes=1000)

        env_te = TradingEnv(te_p, te_h, te_l,
                            n_return_bins=n_bins,
                            initial_capital=capital,
                            lot_size=lot,
                            transaction_cost_pct=tc,
                            slippage_pct=sl,
                            atr_window=atr_w,
                            sl_atr_mult=sl_mult,
                            tp_atr_mult=tp_mult,
                            risk_aversion=0.0,
                            ma_window=5)

        pf, log = evaluate_agent(env_te, agent, exploration=True)
        all_results.append(pf[-1])
        last_log    = log
        last_pf     = pf

        sharpe, trades, dd = compute_metrics(pf, log)
        print(f"[Metrics] Sharpe={sharpe:.2f}, Trades={trades}")

    print("\nWalk-forward finalizado. Lucros:", all_results)

    plt.figure(figsize=(6,4))
    plt.hist(last_log, bins=[-0.5,0.5,1.5,2.5], rwidth=0.8, edgecolor='k')
    plt.xticks([0,1,2], ['Hold','Buy','Sell'])
    plt.title('Ações no último teste')
    plt.show()

    plt.figure(figsize=(8,4))
    plt.plot(history['episode_rewards'], label='Recompensa por episódio')
    plt.plot(last_pf, label='Portfólio (último período)')
    plt.legend()
    plt.show()


if __name__ == "__main__":
    main()


[*********************100%***********************]  1 of 1 completed


[Env] Inicializando TradingEnv: 1509 candles, ATR window=14, SL_mult=0.5, TP_mult=4.0
[Agent] state_dim=43, actions=3
[Train] 1000 episódios...


KeyboardInterrupt: 