In [7]:
!pip install yfinance pandas gym stable-baselines3[extra] tensorflow

^C




## Criação do ambiente

In [21]:
import gym
import numpy as np
import yfinance as yf
import time
import tensorflow as tf
from stable_baselines3 import ppo

In [22]:
class MercadoFinanceiro(gym.Env):
    
    def __init__(self, data):
        super(MercadoFinanceiro, self).__init__()
        # Inicializa o ambiente com a série temporal de preços, espaço de ação e espaço de observação.
        self.data = data
        self.action_space = gym.spaces.Discrete(2) # Comprar (0) ou Vender (1)
        self.observation_space = gym.spaces.Box(low=0, high=np.inf, shape=(2,), dtype=np.float32)
        
    def reset(self):
        # Reinicia o ambiente para o início de um episódio.
        self.current_step = 0
        self.profit = 0  # Lucro
        self.shares = 0  # Quantidade de ações da bolsa
        return self._get_observation()
    
    def step(self, action):
        # Executa uma etapa do ambiente com base na ação fornecida.
        assert self.action_space.contains(action), "Ação inválida!"
        done = False
        
        # Obtém os preços de compra e atual com base no histórico de preços.
        price_buy = self.data[self.current_step]
        price_current = self.data[self.current_step + 1]
        
        if action == 0:  # Comprar
            # Atualiza as ações e o lucro com base na ação de compra.
            self.shares += 1
            self.profit -= price_buy
            
        elif action == 1:  # Vender
            # Atualiza as ações e o lucro com base na ação de venda.
            if self.shares > 0:
                self.shares -= 1
                self.profit += price_current
                
        # Atualiza o passo atual.
        self.current_step += 1
        
        # Verifica se o episódio terminou.
        if self.current_step == len(self.data) - 1:
            done = True
            
        # Retorna a observação atualizada, o lucro, se o episódio terminou e informações adicionais.
        return self._get_observation(), self.profit, done, {}
    
    def _get_observation(self):
        # Retorna a observação atual, que consiste nos preços atual e próximo.
        if self.current_step == len(self.data) - 1:
            return np.zeros(2)
        
        return np.array([self.data[self.current_step], self.data[self.current_step + 1]])


## criação do agente: modelo

In [75]:
def train_agent(env, num_steps):
    # Define a política do agente como uma rede neural com duas camadas ocultas de 64 unidades cada e uma camada de saída com o número de ações no espaço de ação do ambiente.
    policy = tf.keras.Sequential([
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dense(env.action_space.n)
    ])

    # Define o otimizador Adam com uma taxa de aprendizado de 0.0001.
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001)

    # Loop de treinamento principal.
    for iteration in range(num_steps):
        # Reseta o ambiente para o início de um episódio.
        obs = env.reset()

        # Variável para indicar se o episódio terminou.
        done = False

        # Inicia o tape para gravar operações para o cálculo do gradiente.
        with tf.GradientTape() as tape:
            # Loop para interagir com o ambiente até que o episódio termine.
            while not done:
                # Obtém as logits da política para a observação atual.
                logits = policy(tf.convert_to_tensor(obs[None, :], dtype=tf.float32))
                # Amostra uma ação estocasticamente a partir das logits.
                action = tf.random.categorical(logits, num_samples=1)[0, 0]
                action = int(action.numpy())  # Converte a ação para um inteiro.

                # Executa a ação no ambiente e obtém a próxima observação, recompensa, e se o episódio terminou.
                next_obs, reward, done, _ = env.step(action)

                # Atualiza a observação para a próxima iteração.
                obs = next_obs

                # Se o episódio terminou, sai do loop.
                if done:
                    break

            # Calcula a loss, penalizando ou recompensando com base na recompensa final do episódio.
            loss = -tf.reduce_mean(logits) if reward > 0 else tf.reduce_mean(logits)

        # Calcula os gradientes da loss em relação aos parâmetros da política.
        grads = tape.gradient(loss, policy.trainable_variables)
        # Aplica os gradientes usando o otimizador.
        optimizer.apply_gradients(zip(grads, policy.trainable_variables))

        # Calcula a recompensa total ao longo do episódio.
        total_reward = env.profit

        # Calcula o percentual de lucro em relação ao investimento inicial.
        percentual_lucro = ((total_reward / investimento_inicial) + 1) * 100

        # Imprime informações sobre a iteração atual.
        print(f"Iteração {iteration + 1}: | Lucro absoluto: R${total_reward:.2f} | Lucro percentual: {percentual_lucro:.2f}%")
        
    return policy


In [69]:
def test_agent(env, policy):
    # Reseta o ambiente para o início de um episódio.
    obs = env.reset()
    
    # Variável para indicar se o episódio terminou.
    done = False
    
    # Loop para interagir com o ambiente até que o episódio termine.
    while not done:
        # Obtém as logits da política para a observação atual.
        logits = policy(tf.convert_to_tensor(obs[None, :], dtype=tf.float32))
        
        # Escolhe a ação com a maior probabilidade (argmax das logits).
        action = tf.argmax(logits, axis=1)[0]
        action = int(action.numpy())  # Converte a ação para um inteiro.
        
        # Executa a ação no ambiente e obtém a próxima observação, recompensa, e se o episódio terminou.
        obs, reward, done, _ = env.step(action)
    
    # Retorna o lucro total obtido durante o episódio.
    return env.profit


## Passando os dados: treinando e avaliando efetivamente o nosso projeto

In [79]:
#obtém a série histórica de ações do Banco do Brasil (BBAS3)
data = yf.download('BBAS3.SA', start='2021-12-25', end='2024-01-05')['Close'].values

# Normalizando os preços para o intervalo [0, 1]
normalized_data = (data - np.min(data)) / (np.max(data) - np.min(data))

# Criação do ambiente
env = MercadoFinanceiro(normalized_data)

# Definindo o valor do investimento inicial
investimento_inicial = 10000

# Treinamento do agente
policy = train_agent(env, num_steps=1000) # num_steps quantas repetições de teste

# Teste final do agente
final_profit = test_agent(env, policy)
percentual_lucro = (final_profit / investimento_inicial + 1) * 100

print(f"Lucro final em valor: R$ {final_profit:.2f}")
print(f"Lucro final em percentual: {percentual_lucro:.2f}%")

[*********************100%%**********************]  1 of 1 completed


Iteração 1: | Lucro absoluto: R$0.11 | Lucro percentual: 100.00%
Iteração 2: | Lucro absoluto: R$7.99 | Lucro percentual: 100.08%
Iteração 3: | Lucro absoluto: R$-3.23 | Lucro percentual: 99.97%
Iteração 4: | Lucro absoluto: R$4.35 | Lucro percentual: 100.04%
Iteração 5: | Lucro absoluto: R$-5.54 | Lucro percentual: 99.94%
Iteração 6: | Lucro absoluto: R$5.01 | Lucro percentual: 100.05%
Iteração 7: | Lucro absoluto: R$-1.63 | Lucro percentual: 99.98%
Iteração 8: | Lucro absoluto: R$-7.28 | Lucro percentual: 99.93%
Iteração 9: | Lucro absoluto: R$3.82 | Lucro percentual: 100.04%
Iteração 10: | Lucro absoluto: R$4.69 | Lucro percentual: 100.05%
Iteração 11: | Lucro absoluto: R$8.02 | Lucro percentual: 100.08%
Iteração 12: | Lucro absoluto: R$4.89 | Lucro percentual: 100.05%
Iteração 13: | Lucro absoluto: R$0.07 | Lucro percentual: 100.00%
Iteração 14: | Lucro absoluto: R$-4.14 | Lucro percentual: 99.96%
Iteração 15: | Lucro absoluto: R$4.92 | Lucro percentual: 100.05%
Iteração 16: | Luc