In [153]:
import numpy as np
from numpy import sqrt, log, exp, pi
import pandas as pd
import scipy.stats
from scipy.stats import norm

from math import sqrt

import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.express as px
import matplotlib.pyplot as plt

import yfinance as yf

import json

import warnings
warnings.filterwarnings("ignore")


In [154]:
# Carregar dados históricos

ticker1 = "^BVSP"
data = yf.download(ticker1, start = "2012-01-01", end = "2024-12-31")
p = 20

# Calcular as variações diárias percentuais
data["Return"] = data["Adj Close"].pct_change(p)
data["Target"] = data["Return"].shift(-p)
data["MA"] = data["Adj Close"].rolling(52).mean()
data["RSL"] = (data["Adj Close"]/data["MA"]-1)

# Definir os limites para classificar as variações
limite_superior = 0.08 # Variação positiva significativa
limite_inferior = -0.08 # Variação negativa significativa

[*********************100%***********************]  1 of 1 completed


In [155]:
# Vamos treinar com 
year_train = "2020"
year_test = "2021"

start_train = year_train + "-01-01"
end_train = year_train + "-12-31"

# Vamos testar com 
start_test = year_test + "-01-01"
end_test = year_test + "-12-31"

data_train = data.loc[start_train : end_train]

data_test = data.loc[start_test : end_test]

In [156]:
# Definindo uma função para classificar as variações
def classify_change(x):
    # Se a variação é maior que o limite superior, classifica como "Aumento"
    if x > limite_superior:
        return "Aumento"
    # Se a variação é menor que o limite inferior, classifica como "Diminuição"
    elif x < limite_inferior:
        return "Diminuição"
    # Caso contrário, classifica como "Estável"
    else:
        return "Estável"

# Aplicando a função de classificação à coluna 'Return' do DataFrame 'data_train'
# e armazenando o resultado na nova coluna 'Class'
data_train["Class"] = data_train["Return"].apply(classify_change)

# Inicializando um dicionário para armazenar as probabilidades de transição
# O dicionário terá uma estrutura aninhada para cada combinação de estados
estados = ["Aumento", "Diminuição", "Estável"]
transicao = {estado: {e: 0 for e in estados} for estado in estados}

# Percorrendo o DataFrame para contar as transições de um estado para outro
for i in range(1, len(data_train)):
    # Capturando o estado atual e o próximo estado
    estado_atual = data_train["Class"].iloc[i-1]
    prox_estado = data_train["Class"].iloc[i]
    # Incrementando a contagem da transição correspondente
    transicao[estado_atual][prox_estado] += 1

# Normalizando as contagens para converter em probabilidades
for estado_atual, transicoes in transicao.items():
    # Calculando o total de transições para o estado atual
    total = sum(transicoes.values())
    # Atualizando o dicionário com as probabilidades para cada transição
    for estado in estados:
        transicao[estado_atual][estado] /= total

# Exibindo a matriz de transição
# A matriz mostra as probabilidades de transição de um estado para outro
for estado_atual, transicoes in transicao.items():
    print(f"De {estado_atual}:")
    for prox_estado, probabilidade in transicoes.items():
        print(f"  para {prox_estado}: {probabilidade:.2f}")
    print()


De Aumento:
  para Aumento: 0.81
  para Diminuição: 0.00
  para Estável: 0.19

De Diminuição:
  para Aumento: 0.00
  para Diminuição: 0.92
  para Estável: 0.08

De Estável:
  para Aumento: 0.08
  para Diminuição: 0.01
  para Estável: 0.90



In [157]:
data["Class"] = data["Return"].apply(classify_change)

In [158]:
data

Unnamed: 0_level_0,Open,High,Low,Close,Adj Close,Volume,Return,Target,MA,RSL,Class
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
2012-01-03,57836.0,59288.0,57836.0,59265.0,59265.0,3083000,,0.089463,,,Estável
2012-01-04,59263.0,59519.0,58558.0,59365.0,59365.0,2252000,,0.088065,,,Estável
2012-01-05,59354.0,59354.0,57963.0,58546.0,58546.0,2351200,,0.113945,,,Estável
2012-01-06,58565.0,59261.0,58355.0,58600.0,58600.0,1659200,,0.113038,,,Estável
2012-01-09,58601.0,59220.0,58599.0,59083.0,59083.0,2244600,,0.115668,,,Estável
...,...,...,...,...,...,...,...,...,...,...,...
2024-01-15,130988.0,131606.0,130253.0,131521.0,131521.0,5746600,0.015881,,126521.923077,0.039512,Estável
2024-01-16,131515.0,131517.0,129147.0,129294.0,129294.0,11911300,-0.011831,,126829.480769,0.019432,Estável
2024-01-17,129293.0,129296.0,128312.0,128524.0,128524.0,9952500,-0.012850,,127137.019231,0.010909,Estável
2024-01-18,128524.0,129047.0,127316.0,127316.0,127316.0,12460800,-0.028745,,127409.557692,-0.000734,Estável


In [159]:
data["M_prob"] = np.where((data["Class"] == "Aumento") & (data["Class"].shift(1) == "Aumento")
                          , transicao["Aumento"]["Aumento"], 0)
data["M_prob"] = np.where((data["Class"] == "Aumento") & (data["Class"].shift(1) == "Diminuição")
                          , transicao["Aumento"]["Diminuição"], data["M_prob"])
data["M_prob"] = np.where((data["Class"] == "Aumento") & (data["Class"].shift(1) == "Estável")
                          , transicao["Aumento"]["Estável"], data["M_prob"])

data["M_prob"] = np.where((data["Class"] == "Diminuição") & (data["Class"].shift(1) == "Aumento")
                          , transicao["Diminuição"]["Aumento"], data["M_prob"])
data["M_prob"] = np.where((data["Class"] == "Diminuição") & (data["Class"].shift(1) == "Diminuição")
                          , transicao["Diminuição"]["Diminuição"], data["M_prob"])
data["M_prob"] = np.where((data["Class"] == "Aumento") & (data["Class"].shift(1) == "Estável")
                          , transicao["Diminuição"]["Estável"], data["M_prob"])

data["M_prob"] = np.where((data["Class"] == "Estável") & (data["Class"].shift(1) == "Aumento")
                          , transicao["Estável"]["Aumento"], data["M_prob"])
data["M_prob"] = np.where((data["Class"] == "Estável") & (data["Class"].shift(1) == "Diminuição")
                          , transicao["Estável"]["Diminuição"], data["M_prob"])
data["M_prob"] = np.where((data["Class"] == "Estável") & (data["Class"].shift(1) == "Estável")
                          , transicao["Estável"]["Estável"], data["M_prob"])

In [160]:
# Definição de uma função para obter a probabilidade de transição com base na linha atual
def get_transition_probability(row, transicao):
    # Obter a classe atual e a classe do registro anterior
    current_class = row["Class"]
    previous_class = row["Previous_Class"]
    # Retornar a probabilidade de transição do dicionário "transicao"
    # Se "current_class" ou "previous_class" não estiverem no dicionário, retorna 0
    return transicao.get(current_class, {}).get(previous_class, 0)

# Criar uma nova coluna "Previous_Class" que contém a classe da linha anterior
# O método shift(1) desloca as linhas para baixo, então a linha atual terá a classe da linha anterior
data["Previous_Class"] = data["Class"].shift(1)

# Agora aplicar a função a cada linha do DataFrame
# O método apply() percorre cada linha do DataFrame e aplica a função lambda fornecida
# A função lambda chama "get_transition_probability" passando a linha atual e o dicionário de transição
# O parâmetro axis=1 indica que a aplicação é feita linha por linha
data["M_prob"] = data.apply(lambda row: get_transition_probability(row, transicao), axis=1)


In [161]:
data.tail(20)

Unnamed: 0_level_0,Open,High,Low,Close,Adj Close,Volume,Return,Target,MA,RSL,Class,M_prob,Previous_Class
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1
2023-12-20,131851.0,132341.0,130710.0,130804.0,130804.0,10335800,0.037839,,121453.980769,0.076984,Estável,0.901961,Estável
2023-12-21,130826.0,132277.0,130822.0,132182.0,132182.0,8681000,0.04429,,121817.403846,0.085083,Estável,0.901961,Estável
2023-12-22,132553.0,133035.0,132094.0,132753.0,132753.0,8231100,0.057439,,122174.769231,0.086583,Estável,0.901961,Estável
2023-12-26,132753.0,133645.0,132753.0,133533.0,133533.0,0,0.062459,,122528.173077,0.089815,Estável,0.901961,Estável
2023-12-27,133523.0,134195.0,133328.0,134194.0,134194.0,6168200,0.060504,,122863.884615,0.092217,Estável,0.901961,Estável
2023-12-28,134194.0,134392.0,133832.0,134185.0,134185.0,7812700,0.064107,,123193.384615,0.089222,Estável,0.901961,Estável
2024-01-02,134186.0,134195.0,132095.0,132697.0,132697.0,8437800,0.042142,,123519.211538,0.074303,Estável,0.901961,Estável
2024-01-03,132697.0,133576.0,132250.0,132834.0,132834.0,8702400,0.036931,,123832.673077,0.072689,Estável,0.901961,Estável
2024-01-04,132831.0,132885.0,131024.0,131226.0,131226.0,8972300,0.034881,,124127.25,0.057189,Estável,0.901961,Estável
2024-01-05,131218.0,132635.0,130579.0,132023.0,132023.0,9199700,0.040346,,124472.692308,0.060658,Estável,0.901961,Estável


In [162]:
custo_op = 0.001

data_test = data.loc[start_test : end_test]

import random

data_test.loc[: , "Model_Return"] = np.where(((data_test.loc[: , "Class"] == "Diminuição") & (data_test.loc[: , "Previous_Class"] == "Estável"))
                                       , data_test["Target"] - custo_op
                                       , np.where(((data_test.loc[: , "Class"] == "Aumento") & (data_test.loc[: , "Previous_Class"] == "Estável"))
                                                 , -data_test["Target"] - custo_op 
                                                 , 0 ))

data_test.loc[: , "Model_Return_Acc"] = data_test["Model_Return"].cumsum()*100

data_test.loc[: , "NoMarkov"] = np.where(data_test.loc[: , "Return"] < -limite_superior
                                       , data_test["Target"] - custo_op
                                       , np.where(data_test.loc[: , "Return"] > limite_superior
                                                 , -data_test["Target"] - custo_op 
                                                 , 0 ))

data_test.loc[: , "NoMarkov_Acc"] = data_test["NoMarkov"].cumsum()*100


fig = make_subplots(rows = 1, cols = 1
                    , shared_xaxes = True
                    , vertical_spacing = 0.05)

fig.add_trace(go.Scatter(x = data_test.index, y = data_test["Model_Return_Acc"]
                                , name = "Markov Chain"
                                , line = dict(color = "blue"))
              , row = 1, col = 1)

fig.add_trace(go.Scatter(x = data_test.index, y = data_test["NoMarkov_Acc"]
                                , name = "Simple Reversion"
                                , line = dict(color = "red"))
              , row = 1, col = 1)

fig.add_vline(x = end_train, line_width = 3, line_dash="dash", line_color = "black")

fig.update_layout(height = 600, width = 600
                  , title_text = "Markov Chain " + ticker1 + " - Accumulated Returns"
                  , font_color = "blue"
                  , title_font_color = "black"
                  , xaxis_title = "Time"
                  , yaxis_title = "Accumulated returns (%)"
                  , font = dict(size = 15, color = "Black")
                 )

fig.update_layout(hovermode = "x unified")

# Code to exclude empty dates from the chart
dt_all = pd.date_range(start = data_test.index[0]
                       , end = data_test.index[-1]
                       , freq = "D")
dt_all_py = [d.to_pydatetime() for d in dt_all]
dt_obs_py = [d.to_pydatetime() for d in data_test.index]

dt_breaks = [d for d in dt_all_py if d not in dt_obs_py]

fig.update_xaxes(
    rangebreaks = [dict(values = dt_breaks)]
)


fig.show()