# 🛡️ Previsões com Stop-Loss, Take-Profit e Visualização com Setas
Neste notebook:
- Utilizamos um modelo RandomForest para prever altas
- Realizamos um backtest com Stop-Loss e Take-Profit
- Visualizamos as compras com setas verdes no gráfico
- Mostramos onde o SL ou TP foram ativados com ícones


In [44]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from imblearn.over_sampling import SMOTE
import plotly.graph_objects as go
import matplotlib.pyplot as plt
import ta
plt.style.use('ggplot')

In [45]:
ticker = 'AAPL'
df = pd.read_csv(f'../data/{ticker}_ativo_com_indicadores.csv')
df['Datetime'] = pd.to_datetime(df['Datetime'])
df.set_index('Datetime', inplace=True)
df.dropna(inplace=True)

In [46]:
df['target'] = df['close'].shift(-3)
df['target_class'] = np.where(df['target'] > df['close'] * 1.002, 1,
                         np.where(df['target'] < df['close'] * 0.998, 0, -1))
df = df[df['target_class'] != -1]

In [47]:
features = ['open', 'high', 'low', 'close', 'volume', 'rsi', 'macd', 'macd_signal',
            'sma_20', 'ema_20', 'bb_upper', 'bb_lower']

X = df[features]
y = df['target_class']
X_train, X_test, y_train, y_test = train_test_split(X, y, shuffle=False, test_size=0.2)

sm = SMOTE(random_state=42)
X_res, y_res = sm.fit_resample(X_train, y_train)

modelo = RandomForestClassifier(n_estimators=100, class_weight='balanced', random_state=42)
modelo.fit(X_res, y_res)

probs = modelo.predict_proba(X_test)[:, 1]
limiar = 0.7
df_test = df.iloc[-len(X_test):].copy()
df_test['proba_alta'] = probs
df_test['sinal_compra'] = (df_test['proba_alta'] > limiar).astype(int)

In [48]:
stop_loss = 0.01
take_profit = 0.02
capital = 10000
retornos = []
entradas = []
saidas = []
tipos_saida = []

for i in range(len(df_test) - 3):
    if df_test['sinal_compra'].iloc[i] == 1:
        preco_entrada = df_test['close'].iloc[i]
        preco_alvo = preco_entrada * (1 + take_profit)
        preco_stop = preco_entrada * (1 - stop_loss)
        saida = df_test['close'].iloc[i + 3]
        tipo_saida = 'Neutra'
        for j in range(1, 4):  # verifica os próximos 3 candles
            preco = df_test['close'].iloc[i + j]
            if preco >= preco_alvo:
                saida = preco
                tipo_saida = 'TP'
                break
            elif preco <= preco_stop:
                saida = preco
                tipo_saida = 'SL'
                break
        retorno = (saida - preco_entrada) / preco_entrada
        entradas.append((df_test.index[i], df_test['low'].iloc[i] * 0.995))
        saidas.append((df_test.index[i + j], df_test['high'].iloc[i + j] * 1.002))
        tipos_saida.append(tipo_saida)
        retornos.append(retorno)
    else:
        retornos.append(0)

In [49]:
fig = go.Figure()
fig.add_trace(go.Candlestick(
    x=df_test.index,
    open=df_test['open'], high=df_test['high'],
    low=df_test['low'], close=df_test['close'],
    name='Candlestick'
))

# Marcar compras
for entrada in entradas:
    fig.add_annotation(x=entrada[0], y=entrada[1],
        showarrow=True, arrowhead=3, arrowsize=1,
        arrowwidth=2, arrowcolor='green', text='🟢 Compra')

# Marcar saídas com SL ou TP
for saida, tipo in zip(saidas, tipos_saida):
    cor = 'red' if tipo == 'SL' else 'blue' if tipo == 'TP' else 'gray'
    emoji = '🔻 SL' if tipo == 'SL' else '🔺 TP' if tipo == 'TP' else '⏹️'
    fig.add_annotation(x=saida[0], y=saida[1],
        showarrow=True, arrowhead=2, arrowsize=1,
        arrowwidth=2, arrowcolor=cor, text=emoji)

fig.update_layout(
    title=f'📈 Entradas e Saídas com SL={stop_loss*100:.1f}%, TP={take_profit*100:.1f}%',
    xaxis_title='Data', yaxis_title='Preço',
    xaxis_rangeslider_visible=False, height=700
)
fig.show()

In [50]:
# 📈 Análise do retorno médio por tipo de saída

# Criar DataFrame com os dados das operações
df_resultados = pd.DataFrame({
    'retorno': retornos[:len(tipos_saida)],  # garantir tamanho correto
    'tipo_saida': tipos_saida
})

# Calcular estatísticas
resumo = df_resultados.groupby("tipo_saida")["retorno"].agg(["count", "mean", "std", "min", "max"])
resumo = resumo.sort_values(by="mean", ascending=False)

# Exibir resultados
print("📊 Retorno médio por tipo de saída:")
display(resumo)


📊 Retorno médio por tipo de saída:


Unnamed: 0_level_0,count,mean,std,min,max
tipo_saida,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
Neutra,12,0.0,0.0,0,0


In [51]:
# Melhorar o modelo preditivo com mais features e otimização de hiperparâmetros
from sklearn.model_selection import GridSearchCV
from sklearn.preprocessing import StandardScaler

# 1. Adicionar mais features técnicas
df['atr'] = df['high'] - df['low']  # Average True Range simplificado
df['volume_change'] = df['volume'].pct_change()
df['price_change'] = df['close'].pct_change()
df['volatility'] = df['close'].rolling(10).std()
df['momentum'] = df['close'] - df['close'].shift(5)

# 2. Criar features temporais
df['hour'] = df.index.hour
df['day_of_week'] = df.index.dayofweek

# 3. Normalizar os dados
features_extended = features + ['atr', 'volume_change', 'price_change', 'volatility',
                               'momentum', 'hour', 'day_of_week']
X = df[features_extended].copy()
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X.fillna(0))
X_scaled_df = pd.DataFrame(X_scaled, columns=features_extended, index=df.index)

# 4. Otimizar hiperparâmetros
param_grid = {
    'n_estimators': [100, 200, 300],
    'max_depth': [None, 10, 20, 30],
    'min_samples_split': [2, 5, 10],
    'min_samples_leaf': [1, 2, 4]
}

grid_search = GridSearchCV(
    RandomForestClassifier(random_state=42, class_weight='balanced'),
    param_grid=param_grid,
    cv=5,
    scoring='f1',
    n_jobs=-1
)

grid_search.fit(X_res, y_res)
modelo_otimizado = grid_search.best_estimator_

In [54]:
def aplicar_filtros_mercado(df):
    # Evita alteração no DataFrame original
    df = df.copy()

    # Filtro de tendência
    df['sma_50'] = df['close'].rolling(50).mean()
    df['tendencia'] = df['close'] > df['sma_50']

    # Filtro de volatilidade: calcula se a volatilidade atual é maior que a média * 1.5
    df['atr'] = ta.volatility.AverageTrueRange(df['high'], df['low'], df['close']).average_true_range()
    df['volatilidade_alta'] = df['atr'] > df['atr'].rolling(20).mean() * 1.5

    # Filtro de volume
    df['volume_adequado'] = df['volume'] > df['volume'].rolling(20).mean() * 0.7

    # Filtro de horário (10h às 16h)
    if "Datetime" in df.columns:
        df["hora"] = pd.to_datetime(df["Datetime"]).dt.hour
    else:
        df["hora"] = df.index.hour
    df["horario_favoravel"] = df["hora"].between(10, 16)

    # Combinar filtros
    df["filtros_ok"] = (
        df["tendencia"] &
        ~df["volatilidade_alta"] &
        df["volume_adequado"] &
        df["horario_favoravel"]
    )

    return df

df_test = aplicar_filtros_mercado(df)

In [55]:
# Estratégia de entrada e saída mais sofisticada
def backtest_avancado(df_test, stop_loss=0.01, take_profit=0.02, trailing_stop=True):
    capital = 10000
    retornos = []
    entradas = []
    saidas = []
    tipos_saida = []
    posicao_aberta = False

    for i in range(len(df_test) - 5):
        # Verificar se já existe posição aberta
        if posicao_aberta:
            continue

        # Verificar sinal de entrada com filtros adicionais
        if (df_test['sinal_compra'].iloc[i] == 1 and
            df_test['filtros_ok'].iloc[i] and
            df_test['proba_alta'].iloc[i] > 0.75):  # Aumentar confiança mínima

            preco_entrada = df_test['close'].iloc[i]
            preco_alvo = preco_entrada * (1 + take_profit)
            preco_stop = preco_entrada * (1 - stop_loss)
            stop_atual = preco_stop  # Para trailing stop

            # Monitorar a posição por até 10 candles
            for j in range(1, min(10, len(df_test) - i)):
                preco_atual = df_test['close'].iloc[i + j]

                # Atualizar trailing stop se ativado
                if trailing_stop and preco_atual > preco_entrada:
                    novo_stop = preco_entrada + (preco_atual - preco_entrada) * 0.5
                    if novo_stop > stop_atual:
                        stop_atual = novo_stop

                # Verificar condições de saída
                if preco_atual >= preco_alvo:
                    saida = preco_atual
                    tipo_saida = 'TP'
                    posicao_aberta = False
                    break
                elif preco_atual <= stop_atual:
                    saida = preco_atual
                    tipo_saida = 'SL' if stop_atual == preco_stop else 'TS'  # TS = Trailing Stop
                    posicao_aberta = False
                    break

                # Saída por tempo (se chegar ao último candle monitorado)
                if j == min(10, len(df_test) - i) - 1:
                    saida = preco_atual
                    tipo_saida = 'Tempo'
                    posicao_aberta = False

            # Calcular retorno e registrar operação
            retorno = (saida - preco_entrada) / preco_entrada
            entradas.append((df_test.index[i], df_test['low'].iloc[i] * 0.995))
            saidas.append((df_test.index[i + j], df_test['high'].iloc[i + j] * 1.002))
            tipos_saida.append(tipo_saida)
            retornos.append(retorno)
        else:
            retornos.append(0)

    return retornos, entradas, saidas, tipos_saida

retornos, entradas, saidas, tipos_saida = backtest_avancado(df_test)

KeyError: 'sinal_compra'

In [29]:
# Implementar gestão de risco e capital
def calcular_tamanho_posicao(capital, risco_por_trade=0.02, stop_loss=0.01):
    """Calcula o tamanho da posição baseado no risco por trade"""
    valor_risco = capital * risco_por_trade
    tamanho_posicao = valor_risco / stop_loss
    return min(tamanho_posicao, capital * 0.2)  # Limitar a 20% do capital

def analisar_desempenho(retornos, tipos_saida):
    """Analisa o desempenho da estratégia"""
    df_resultados = pd.DataFrame({
        'retorno': retornos,
        'tipo_saida': tipos_saida
    })

    # Métricas gerais
    total_trades = len([r for r in retornos if r != 0])
    trades_ganhos = len([r for r in retornos if r > 0])
    trades_perdas = len([r for r in retornos if r < 0])

    if total_trades > 0:
        win_rate = trades_ganhos / total_trades
        avg_gain = np.mean([r for r in retornos if r > 0]) if trades_ganhos > 0 else 0
        avg_loss = np.mean([r for r in retornos if r < 0]) if trades_perdas > 0 else 0
        profit_factor = abs(sum([r for r in retornos if r > 0]) / sum([r for r in retornos if r < 0])) if sum([r for r in retornos if r < 0]) != 0 else float('inf')

        # Drawdown
        capital_acumulado = [10000]
        for r in retornos:
            if r != 0:  # Apenas considerar trades reais
                capital_acumulado.append(capital_acumulado[-1] * (1 + r))

        peak = capital_acumulado[0]
        max_drawdown = 0
        for capital in capital_acumulado:
            if capital > peak:
                peak = capital
            drawdown = (peak - capital) / peak
            max_drawdown = max(max_drawdown, drawdown)

        return {
            'total_trades': total_trades,
            'win_rate': win_rate,
            'avg_gain': avg_gain,
            'avg_loss': avg_loss,
            'profit_factor': profit_factor,
            'max_drawdown': max_drawdown,
            'expectancy': (win_rate * avg_gain) - ((1 - win_rate) * abs(avg_loss))
        }
    else:
        return {'total_trades': 0}

In [18]:
# Validação cruzada temporal para evitar overfitting
from sklearn.model_selection import TimeSeriesSplit

def validacao_cruzada_temporal(df, features, target, n_splits=5):
    tscv = TimeSeriesSplit(n_splits=n_splits)
    resultados = []

    for train_index, test_index in tscv.split(df):
        df_train, df_test = df.iloc[train_index], df.iloc[test_index]

        X_train = df_train[features]
        y_train = df_train[target]
        X_test = df_test[features]
        y_test = df_test[target]

        # Balancear dados
        sm = SMOTE(random_state=42)
        X_res, y_res = sm.fit_resample(X_train, y_train)

        # Treinar modelo
        modelo = RandomForestClassifier(n_estimators=100, class_weight='balanced', random_state=42)
        modelo.fit(X_res, y_res)

        # Fazer previsões
        probs = modelo.predict_proba(X_test)[:, 1]
        df_test_copy = df_test.copy()
        df_test_copy['proba_alta'] = probs
        df_test_copy['sinal_compra'] = (df_test_copy['proba_alta'] > 0.7).astype(int)

        # Aplicar filtros e backtest
        df_test_copy = aplicar_filtros_mercado(df_test_copy)
        retornos, _, _, _ = backtest_avancado(df_test_copy)

        # Analisar resultados
        resultados.append(sum(retornos))

    return resultados