In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
import seaborn as sns
import matplotlib.pyplot as plt
import math

from keras.models import Sequential
from keras.layers import Dense
from keras.layers import LSTM
from sklearn.preprocessing import MinMaxScaler,StandardScaler
from sklearn.metrics import mean_squared_error

In [None]:
df_train = pd.read_csv('treino.csv', index_col=0)
df_test = pd.read_csv('teste.csv', index_col=0)

In [None]:
df_train['Daily_Change'] = df_train['Past_1_Days_Close'] - df_train['Past_2_Days_Close']

df_train['Gain'] = np.where(df_train['Daily_Change'] > 0, df_train['Daily_Change'], 0)
df_train['Loss'] = np.where(df_train['Daily_Change'] < 0, abs(df_train['Daily_Change']), 0)

window = 14
df_train['Avg_Gain'] = df_train['Gain'].rolling(window=window).mean()
df_train['Avg_Loss'] = df_train['Loss'].rolling(window=window).mean()

df_train['RS'] = df_train['Avg_Gain'] / df_train['Avg_Loss']
df_train['RSI'] = 100 - (100 / (1 + df_train['RS']))

df_train['SMA_RSI'] = df_train['RSI'].rolling(window=window).mean()

df_train['SMA_5'] = df_train[['Past_1_Days_Close', 'Past_2_Days_Close', 'Past_3_Days_Close', 
                             'Past_4_Days_Close', 'Past_5_Days_Close']].mean(axis=1)

df_train['SMA_9'] = df_train[['Past_1_Days_Close', 'Past_2_Days_Close', 'Past_3_Days_Close', 
                             'Past_4_Days_Close', 'Past_5_Days_Close', 'Past_6_Days_Close', 
                             'Past_7_Days_Close', 'Past_8_Days_Close', 'Past_9_Days_Close']].mean(axis=1)

df_train['SMA_15'] = df_train[['Past_1_Days_Close', 'Past_2_Days_Close', 'Past_3_Days_Close', 
                              'Past_4_Days_Close', 'Past_5_Days_Close', 'Past_6_Days_Close', 
                              'Past_7_Days_Close', 'Past_8_Days_Close', 'Past_9_Days_Close', 
                              'Past_10_Days_Close', 'Past_11_Days_Close', 'Past_12_Days_Close', 
                              'Past_13_Days_Close', 'Past_14_Days_Close', 'Past_15_Days_Close']].mean(axis=1)

df_train = df_train.dropna()

df_train.info()

In [None]:
df_test['Daily_Change'] = df_test['Past_1_Days_Close'] - df_test['Past_2_Days_Close']

df_test['Gain'] = np.where(df_test['Daily_Change'] > 0, df_test['Daily_Change'], 0)
df_test['Loss'] = np.where(df_test['Daily_Change'] < 0, abs(df_test['Daily_Change']), 0)

window = 14
df_test['Avg_Gain'] = df_test['Gain'].rolling(window=window).mean()
df_test['Avg_Loss'] = df_test['Loss'].rolling(window=window).mean()

df_test['RS'] = df_test['Avg_Gain'] / df_test['Avg_Loss']
df_test['RSI'] = 100 - (100 / (1 + df_test['RS']))

df_test['SMA_RSI'] = df_test['RSI'].rolling(window=window).mean()

df_test['SMA_5'] = df_test[['Past_1_Days_Close', 'Past_2_Days_Close', 'Past_3_Days_Close', 
                           'Past_4_Days_Close', 'Past_5_Days_Close']].mean(axis=1)

df_test['SMA_9'] = df_test[['Past_1_Days_Close', 'Past_2_Days_Close', 'Past_3_Days_Close', 
                           'Past_4_Days_Close', 'Past_5_Days_Close', 'Past_6_Days_Close', 
                           'Past_7_Days_Close', 'Past_8_Days_Close', 'Past_9_Days_Close']].mean(axis=1)

df_test['SMA_15'] = df_test[['Past_1_Days_Close', 'Past_2_Days_Close', 'Past_3_Days_Close', 
                            'Past_4_Days_Close', 'Past_5_Days_Close', 'Past_6_Days_Close', 
                            'Past_7_Days_Close', 'Past_8_Days_Close', 'Past_9_Days_Close', 
                            'Past_10_Days_Close', 'Past_11_Days_Close', 'Past_12_Days_Close', 
                            'Past_13_Days_Close', 'Past_14_Days_Close', 'Past_15_Days_Close']].mean(axis=1)

df_test = df_test.dropna()

df_test.info()

In [None]:
X = df_train.drop(['Date', 'Smoothed_Close', 'RS', 'Close', 'Past_2_Days_Close', 'Past_3_Days_Close', 
             'Past_4_Days_Close', 'Past_5_Days_Close', 'Past_6_Days_Close', 'Past_7_Days_Close', 'Past_8_Days_Close',
             'Past_9_Days_Close', 'Past_10_Days_Close', 'Past_11_Days_Close', 'Past_12_Days_Close', 'Past_13_Days_Close',
             'Past_14_Days_Close', 'Past_15_Days_Close', 'Gain', 'Loss', 'Avg_Gain', 'Avg_Loss'], axis=1)

feature_index = X.columns.get_loc('Label')

column_names = X.columns

X = np.where(np.isinf(X), np.nan, X.values) 
medians = np.nanmedian(X, axis=0)
inds = np.where(np.isnan(X))
X[inds] = np.take(medians, inds[1])

scaler = MinMaxScaler(feature_range=(0, 1))
X_scaled = scaler.fit_transform(X)

In [None]:
B = df_test.drop(['Date', 'Smoothed_Close', 'RS', 'Close', 'Past_2_Days_Close', 'Past_3_Days_Close', 
             'Past_4_Days_Close', 'Past_5_Days_Close', 'Past_6_Days_Close', 'Past_7_Days_Close', 'Past_8_Days_Close',
             'Past_9_Days_Close', 'Past_10_Days_Close', 'Past_11_Days_Close', 'Past_12_Days_Close', 'Past_13_Days_Close',
             'Past_14_Days_Close', 'Past_15_Days_Close', 'Gain', 'Loss', 'Avg_Gain', 'Avg_Loss'], axis=1)

feature_index = B.columns.get_loc('Label')

column_names = B.columns

B = np.where(np.isinf(B), np.nan, B.values) 
medians = np.nanmedian(B, axis=0)
inds = np.where(np.isnan(B))
B[inds] = np.take(medians, inds[1])

B_scaled = scaler.transform(B)

In [None]:
train = X_scaled
test = B_scaled

In [None]:
plt.plot(train)

In [None]:
plt.plot(test)

In [None]:
def create_dataset(dataset, look_back, feature_index):
    dataX, dataY = [], []
    for i in range(len(dataset) - look_back - 1):
        a = dataset[i:(i + look_back), :]  
        dataX.append(a)
        dataY.append(dataset[i + look_back, feature_index]) 
    return np.array(dataX), np.array(dataY)

In [None]:
look_back = 15

X_train, y_train = create_dataset(train, look_back, feature_index)
X_test, y_test = create_dataset(test, look_back, feature_index)

In [None]:
X_train

In [None]:
y_train

In [None]:
from keras.layers import Dropout, BatchNormalization
model = Sequential()
model.add(LSTM(64, input_shape=(look_back, 8), return_sequences=True))
model.add(BatchNormalization())
model.add(Dropout(0.1))
model.add(LSTM(units=32))
model.add(BatchNormalization())
model.add(Dropout(0.1))
model.add(Dense(1, activation='sigmoid'))

model.compile(loss='binary_crossentropy', optimizer='adam', metrics = [tf.keras.metrics.Precision(), tf.keras.metrics.Recall(), 'accuracy', tf.keras.metrics.AUC(), 'mae'])

In [None]:
from keras.utils import plot_model
plot_model(model, to_file='LSTM_v1-4_CSNA3.png', show_shapes=True, show_layer_names=True)

In [None]:
from keras.callbacks import ModelCheckpoint

checkpointer = ModelCheckpoint(filepath='LTSM_v1-4_CSNA3.hdf5', verbose=1,  save_best_only=True, monitor='val_loss') 
#es = EarlyStopping(monitor='loss', min_delta = 1e-10, patience = 100, verbose =1)
#rlr = ReduceLROnPlateau(monitor='loss', factor=0.2, patience = 50, verbose=1)
hist = model.fit(X_train, y_train, batch_size=100, epochs=25, validation_data=(X_test, y_test), callbacks=[checkpointer], verbose=1, shuffle=True)

In [None]:
plt.figure(1)
plt.plot(hist.history['accuracy'])
plt.plot(hist.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()

In [None]:
from sklearn.metrics import classification_report, confusion_matrix

y_pred = model.predict(X_test)
y_pred_classes = (y_pred > 0.5).astype(int).reshape(-1)

y_true = y_test  

cm = confusion_matrix(y_true, y_pred_classes)

plt.figure(figsize=(10, 7))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=['Short', 'Long'], yticklabels=['Short', 'Long'])
plt.title('Confusion Matrix')
plt.ylabel('Actual Labels')
plt.xlabel('Predicted Labels')
plt.show()

report = classification_report(y_true, y_pred_classes, target_names=['Short', 'Long'])
print("Classification Report:\n", report)

In [None]:
close_prices = df_test['Close']

plt.figure(figsize=(14, 7))

plt.plot(close_prices, label=f'Preço de Fechamento CSNA3', color='blue', alpha=0.5)

buy_signals = np.where(y_pred_classes == 1)[0]
sell_signals = np.where(y_pred_classes == 0)[0]

for buy in buy_signals:
    plt.scatter(close_prices.index[buy], close_prices.iloc[buy], color='green', marker='^', alpha=1)

for sell in sell_signals:
    plt.scatter(close_prices.index[sell], close_prices.iloc[sell], color='red', marker='v', alpha=1)

plt.title('Preço com cada sinal de compra e venda')
plt.xlabel('Tempo em Dias')
plt.ylabel('Preço de Fechamento')
plt.legend(loc='best')
plt.show()

In [None]:
initial_capital = 10000 
capital = initial_capital
capital_history = [initial_capital]
stocks_owned = 0
in_position = False  
position_type = None  
trades = []

biggest_win = 0
biggest_loss = 0

def calculate_max_drawdown(capital_history):
    capital_history = np.array(capital_history)
    peaks = np.maximum.accumulate(capital_history)
    drawdowns = (capital_history - peaks) / peaks
    max_drawdown = drawdowns.min()
    return max_drawdown

for i in range(1, len(y_pred_classes)):
    current_price = close_prices.iloc[i]
    profit = 0
    if in_position:
        if position_type == 'long' and y_pred_classes[i] == 0:
            profit = stocks_owned * (current_price - trades[-1]['price'])
            capital += stocks_owned * current_price 
            capital_history.append(capital)
            exit_index = i
            entry_index = trades[-1]['index']
            duration = exit_index - entry_index
            trades[-1]['exit_index'] = exit_index
            trades[-1]['duration'] = duration
            trades[-1]['exit_price'] = current_price
            trades[-1]['profit'] = profit
            stocks_owned = 0
            in_position = False
            position_type = None
        elif position_type == 'short' and y_pred_classes[i] == 1:
            profit = trades[-1]['stocks'] * (trades[-1]['price'] - current_price)
            capital += trades[-1]['stocks'] * (trades[-1]['price'] - current_price)
            capital_history.append(capital)
            exit_index = i
            entry_index = trades[-1]['index']
            duration = exit_index - entry_index
            trades[-1]['exit_index'] = exit_index
            trades[-1]['duration'] = duration
            trades[-1]['exit_price'] = current_price
            trades[-1]['profit'] = profit
            in_position = False
            position_type = None

        if profit > biggest_win:
            biggest_win = profit
        if profit < biggest_loss:
            biggest_loss = profit
    
    if not in_position:
        if y_pred_classes[i] == 1 and capital >= current_price: 
            stocks_to_buy = int(capital // current_price)
            if stocks_to_buy > 0:
                capital -= stocks_to_buy * current_price
                stocks_owned += stocks_to_buy
                trades.append({'type': 'long', 'stocks': stocks_to_buy, 'price': current_price,'entry': close_prices.iloc[i], 'index': i})
                in_position = True
                position_type = 'long'
        elif y_pred_classes[i] == 0:  
            stocks_to_sell = int(capital // current_price)
            if stocks_to_sell > 0:
                trades.append({'type': 'short', 'stocks': stocks_to_sell, 'price': current_price,'entry': close_prices.iloc[i], 'index': i})
                in_position = True
                position_type = 'short'

final_portfolio_value = capital + (stocks_owned * close_prices.iloc[-1] if stocks_owned else 0)
    
print(f"Valor bruto final do Portifolio: R${final_portfolio_value:.2f}")
print(f"Total bruto Profit/Loss: R${final_portfolio_value - initial_capital:.2f}")
print(f"Total de Ações em Posse: {stocks_owned}")
print(f"Maior ganho bruto: R${biggest_win:.2f}")
print(f"Maior Loss bruto: R${biggest_loss:.2f}")

trades_df = pd.DataFrame(trades)
trades_df.fillna('N/A', inplace=True)

trades_df['profit'] = pd.to_numeric(trades_df['profit'], errors='coerce')
trades_df['duration'] = pd.to_numeric(trades_df['duration'], errors='coerce')


mean_duration_all = trades_df['duration'].mean()
mean_duration_long = trades_df[trades_df['type'] == 'long']['duration'].mean()
mean_duration_short = trades_df[trades_df['type'] == 'short']['duration'].mean()
mean_duration_profit = trades_df[trades_df['profit'] > 0]['duration'].mean()
mean_duration_loss = trades_df[trades_df['profit'] <= 0]['duration'].mean()
print(f"Tempo médio das operações: {mean_duration_all:.0f} pregões")
print(f"Tempo médio das compras: {mean_duration_long:.0f} pregões")
print(f"Tempo médio das vendas: {mean_duration_short:.0f} pregões")
print(f"Tempo médio gain: {mean_duration_profit:.0f} pregões")
print(f"Tempo médio loss: {mean_duration_loss:.0f} pregões")
max_drawdown = calculate_max_drawdown(capital_history)
print(f"Drawdown Máximo: {max_drawdown * 100:.2f}%")

display(trades_df)

trades_df.to_excel(f"trades_CSNA3.xlsx", index=False)
    
plt.figure(figsize=(14, 7))
plt.plot(close_prices.index, close_prices, label='Preço', color='blue', alpha=0.5)

for trade in trades:
    if trade['type'] == 'long':
        plt.scatter(close_prices.index[trade['index']], trade['entry'], color='green', marker='^', alpha=1)
    else:
        plt.scatter(close_prices.index[trade['index']], trade['entry'], color='red', marker='v', alpha=1)

plt.scatter([], [], color='green', label='Compra', marker='^', alpha=1)
plt.scatter([], [], color='red', label='Venda', marker='v', alpha=1)

plt.title(f'Gráfico de preço de CSNA3, com entradas e saídas')
plt.xlabel('Dias')
plt.ylabel('Preço')
plt.legend()
plt.show()

In [None]:
for i, prob in enumerate(y_pred.flatten()):
    class_label = "Sinal de Compra" if y_pred_classes[i] == 1 else "Sinal de Venda"
    print(f"Index {i}: {class_label}, Probabilidade de Compra = {prob:.4f}, Probabilidade de Venda = {1 - prob:.4f}")

In [None]:
data = []

for i, prob in enumerate(y_pred.flatten()):
    class_label = "Sinal de Compra" if y_pred_classes[i] == 1 else "Sinal de Venda"
    data.append({
        'index': i,
        'Class Label': class_label,
        'Probabilidade de Compra': prob,
        'Probabilidade de Venda': 1 - prob
    })


predictions_df = pd.DataFrame(data)

In [None]:
trades_df['index'] = trades_df['index'].astype(int)
predictions_df['index'] = predictions_df['index'].astype(int)

merged_df = pd.merge(trades_df, predictions_df, on='index', how='left')

merged_df.to_excel('Trades_CSNA3_withProb.xlsx', index=False, engine='openpyxl')