In [1]:
import json
import os
import pandas as pd
import numpy as np
import datetime as dt
import matplotlib.pyplot as plt
import networkx as nx
import warnings
from itertools import combinations
from sqlalchemy import create_engine
import time
from functools import lru_cache
import random

warnings.simplefilter(action='ignore', category=FutureWarning)

engine = create_engine("mysql+pymysql://root:root@localhost:3306/tcc")

def dias_uteis_entre_datas(data_inicial, data_final, feriados=None):
    
    # Datas em que não houve pregão na B3 por motivos especiais desde 2000
    datas_sem_pregao = [
        '2001-09-12', '2002-06-17', '2002-06-21', '2002-06-26', '2002-06-30',
        '2006-06-27', '2006-06-30', '2010-06-25', '2014-06-12', '2014-06-17',
        '2014-06-23', '2014-06-28', '2014-07-04', '2018-06-22', '2018-06-27',
        '2018-07-02', '2018-07-06'
    ]

    feriados = datas_sem_pregao if feriados is None else feriados + datas_sem_pregao

    # Lista de feriados nacionais brasileiros fixos
    feriados_fixos = ['01-01', '04-21', '05-01', '09-07', '10-12', '11-02', '11-15', '12-25']
    
    data_inicial = np.datetime64(data_inicial)
    data_final = np.datetime64(data_final)
    anos = list(range(data_inicial.astype('M8[Y]').astype(int) + 1970, data_final.astype('M8[Y]').astype(int) + 1971))
    
    for ano in anos:
        for feriado_fixo in feriados_fixos:
            feriados.append(f"{ano}-{feriado_fixo}")
    
    feriados = [np.datetime64(feriado) for feriado in feriados]
    datas = np.arange(data_inicial, data_final + np.timedelta64(1, 'D'))
    dias_uteis = np.is_busday(datas)
    
    for feriado in feriados:
        dias_uteis &= (datas != feriado)

    return np.sum(dias_uteis)


# Implementação Grafo de Visibilidade

Nessa seção fazemos a implementação da geração do grafo de visibilidade.

In [2]:
# Função que transforma uma lista de conexões em um grafo
def list_to_graph(connections):
    graph = nx.Graph()
    graph.add_edges_from(connections)
    return graph

# Função que transforma um grafo em uma lista de conexões
def graph_to_list(graph):
    return [sorted(list(i)) for i in list(graph.edges())]

# Função que transforma uma série temporal em um grafo de visibilidade
def visibility_graph(time_series):

    n = len(time_series)
    graph = nx.Graph()

    for i in range(n):
        graph.add_node(i)

    for i in range(n):
        for j in range(i+1, n):
            if all((time_series[j]-time_series[i])/(j-i) > (time_series[k]-time_series[i])/(k-i) for k in range(i+1, j)):
                graph.add_edge(i, j)

    return graph


def plot_graph(graph, time_series):

    pos = {i: (i, y) for i, y in enumerate(time_series)}
    nx.draw(graph, pos, with_labels=False, node_color='red', edge_color='gray')
    plt.show()
    
# Função que a partir do código do ativo, uma data inicial e uma data final, retorna um grafo com os nodos normalizados
def grafo(ativo, dataA, dataB):
    query = f'''SELECT nodoA, nodoB 
    FROM grafos_conexoes 
    WHERE ativo = '{ativo}' AND dataA >= '{dataA}' AND dataB <= '{dataB}'
    ORDER BY dataA ASC;'''
    
    connections = list(engine.execute(query).fetchall())
    
    return list_to_graph([(i[0]-connections[0][0]+1, i[1] -connections[0][0]+1) for i in connections])

# Similaridades 

Implementações de cada método testado

In [None]:
# Implementação da GED para o contexto do trabalho: basicamente conta a quantidade de arestas incomuns aos grafos, que são as arestas a serem removidas/adicionadas para igualar os grafos.

def GED(graph1,graph2, n):
    edges1 = set([tuple(sorted(list(i))) for i in list(graph1.edges())])
    edges2 = set([tuple(sorted(list(i))) for i in  list(graph2.edges())])
    
    intersection = len(edges1.intersection(edges2))
    union = len(edges1.union(edges2))
    
    d = union - intersection
    
    return n/(n+d)

def frobenius_distance(graph1, graph2, n = 1):

    A1 = nx.adjacency_matrix(graph1, nodelist=range(1, n+1)).toarray()
    A2 = nx.adjacency_matrix(graph2, nodelist=range(1, n+1)).toarray()
    
    
    diff = A1 - A2

    return (n-np.linalg.norm(diff))/n

def jaccard_coefficient(graph1, graph2):

    edges1 = set([tuple(sorted(list(i))) for i in list(graph1.edges())])
    edges2 = set([tuple(sorted(list(i))) for i in  list(graph2.edges())])
    
    intersection = len(edges1.intersection(edges2))
    union = len(edges1.union(edges2))
    
    return intersection / union if union != 0 else 0

# Simulação

In [3]:
# Função que calcula a média móvel da similaridade
def media_movel_dupla(grupo, tamanho_media):
    media_movel = pd.Series([0] * len(grupo), index=grupo.index)

    if len(grupo) > tamanho_media:
        media_movel[tamanho_media:] = grupo['similaridade'].rolling(window=tamanho_media, min_periods=1).mean()[tamanho_media:]

    return media_movel

# Função que extrai e trata os dados necessários para a simulação 
def get_data(limiar_var, data_inicial, data_final, n, metodo, tamanho_media): 
    
    query = f'''SELECT data, ativo, fechamento FROM dados_diarios_b3 WHERE data BETWEEN '{data_inicial}' AND '{data_final}';'''

    dados_diarios_b3 = pd.read_sql(query, engine)

    query = f'''SELECT *, 
    (IF(var >= -{limiar_var} AND var <= {limiar_var}, 'L', IF(var > {limiar_var}, 'A', 'B'))) AS estado
    FROM grafos 
    WHERE dataFinal BETWEEN '{data_inicial}' AND '{data_final}' AND n = {n} AND incluso = 1
    ORDER BY ativo,  dataFinal ASC;'''
    grafos = pd.read_sql(query,engine)

    grafos['transicao'] = grafos.groupby('ativo')['estado'].shift(1) != grafos['estado']
    grafos['transicao'] = grafos['transicao'].apply(lambda x: 1 if x else 0)
    grafos['transicao'] = grafos['transicao'].fillna(0)


    datas = grafos.sort_values(['dataFinal'], ascending = True)['dataFinal'].unique()

    query = f'''SELECT gA.dataFinal AS dataFinal, 
    similaridades.idGrafoA, 
    gA.ativo AS ativoA, 
    (IF(gA.var >= -{limiar_var} AND gA.var <= {limiar_var}, 'L', IF(gA.var > {limiar_var}, 'A', 'B'))) AS estadoA,
    similaridades.idGrafoB, 
    gB.ativo AS ativoB, 
    (IF(gB.var >= -{limiar_var} AND gB.var <= {limiar_var}, 'L', IF(gB.var  > {limiar_var}, 'A', 'B'))) AS estadoB,
    similaridades.{metodo} AS similaridade
    FROM similaridades 
    LEFT JOIN grafos AS gA ON similaridades.idGrafoA = gA.id 
    LEFT JOIN grafos AS gB ON similaridades.idGrafoB = gB.id
    WHERE similaridades.idGrafoA IN {tuple(grafos['id'].unique())} 
    OR similaridades.idGrafoB IN {tuple(grafos['id'].unique())};'''
    similaridades = pd.read_sql(query, engine)

    similaridades['similaridade'] = similaridades.apply(lambda x: -x['similaridade'] if ((x['estadoA'] == 'B' and x['estadoB'] == 'A') or (x['estadoA'] == 'A' and x['estadoB'] == 'B')) else x['similaridade'], axis = 1)
    similaridades['similaridadeMedia'] = 0


    for (ativoA, ativoB), grupo in similaridades.groupby(['ativoA', 'ativoB']):
        similaridades.loc[grupo.index, 'similaridadeMedia'] = media_movel_dupla(grupo, tamanho_media)

    return dados_diarios_b3, grafos, datas, similaridades

In [4]:
# Função que processa a simulação
def processing(dados_diarios_b3, grafos, datas, qtd_sinais, sim_min, similaridades, tamanho_media, limiar_var, concentracao_max):
    caixa = 1
    carteira = {}
    transacoes = []
    trades = 0
    trades_assertivos = 0

    
    grafos_agrupados = grafos.groupby('dataFinal')
    similaridades_agrupadas = similaridades.groupby('dataFinal')

    for data in datas[tamanho_media:]:
        if data in grafos_agrupados.groups and data in similaridades_agrupadas.groups:
            grafos_data = grafos_agrupados.get_group(data)
            similaridades_data = similaridades_agrupadas.get_group(data)
            ativos_data = grafos_data['ativo'].unique()
            
            sinais_compra_data = []

            for ativo in ativos_data:
                correlacionados_data = similaridades_data[(similaridades_data['similaridadeMedia'] >= sim_min) & ((similaridades_data['ativoA'] == ativo) | (similaridades_data['ativoB'] == ativo))]
                correlacionados = set(correlacionados_data['ativoA'].unique()).union(set(correlacionados_data['ativoB'].unique()))

                sinais_compra = grafos_data[grafos_data['ativo'].isin(correlacionados) & (grafos_data['estado'] == 'A')]['transicao'].sum()
                sinais_venda = grafos_data[grafos_data['ativo'].isin(correlacionados) & (grafos_data['estado'] == 'B')]['transicao'].sum()

                if (ativo not in carteira) and (sinais_compra - sinais_venda >= qtd_sinais):
                    sinais_compra_data.append({"ativo": ativo, "qtd": sinais_compra - sinais_venda})

                elif (ativo in carteira) and (sinais_venda - sinais_compra >= qtd_sinais):
                    var = dados_diarios_b3[(dados_diarios_b3['ativo'] == ativo) & (dados_diarios_b3['data'] == data)]['fechamento'].iloc[0] / dados_diarios_b3[(dados_diarios_b3['ativo'] == ativo) & (dados_diarios_b3['data'] == carteira[ativo]['data'])]['fechamento'].iloc[0]
                    valor_venda = carteira[ativo]['qtd'] * var
                    caixa += valor_venda

                    transacoes.append({"tipo": "venda", "ativo": ativo, "data": data, "sinais": sinais_venda - sinais_compra, "valor": valor_venda, 'rentabilidade': round(var - 1, 4)})
                    carteira.pop(ativo)

                    trades += 1
                    if var - 1 > limiar_var:
                        trades_assertivos += 1

            if sinais_compra_data:
                sinais_compra_data_df = pd.DataFrame(sinais_compra_data).sort_values(by='qtd', ascending=False)
                sinais_compra_data = sinais_compra_data_df.to_dict(orient='records')

                for sinal in sinais_compra_data:
                    if caixa > 0:
                        valor_investimento = concentracao_max if caixa - concentracao_max >= 0 else caixa
                        transacoes.append({"tipo": "compra", "ativo": sinal['ativo'], "data": data, "sinais": sinal['qtd'], "valor": valor_investimento, 'rentabilidade': '-'})
                        caixa -= valor_investimento
                        carteira[sinal['ativo']] = {"data": data, "qtd": valor_investimento}
                        continue
                    break

    ultima_data = datas[-1]

    for ativo in carteira:
        var = dados_diarios_b3[(dados_diarios_b3['ativo'] == ativo) & (dados_diarios_b3['data'] == ultima_data)]['fechamento'].iloc[0] / dados_diarios_b3[(dados_diarios_b3['ativo'] == ativo) & (dados_diarios_b3['data'] == carteira[ativo]['data'])]['fechamento'].iloc[0]
        valor_venda = carteira[ativo]['qtd'] * var
        caixa += valor_venda

        transacoes.append({"tipo": "venda", "ativo": ativo, "data": data, "sinais": 0, "valor": valor_venda, 'rentabilidade': round(var - 1, 4)})

    return transacoes, trades, trades_assertivos, round((caixa - 1), 4)


In [5]:
# Função que une a extração de dados ao processamento

def simulacao(n, metodo, qtd_sinais, sim_min, tamanho_media, data_inicial, data_final, limiar_var = 0.0015, concentracao_max = 0.1):
    
    start = time.time()
    
    dados_diarios_b3, grafos, datas, similaridades = get_data(limiar_var, data_inicial, data_final, n, metodo, tamanho_media)
    
    transacoes, trades, trades_assertivos, rentabilidade = processing(dados_diarios_b3, grafos, datas, qtd_sinais, sim_min, similaridades, tamanho_media, limiar_var, concentracao_max)
    
    tempo = round(time.time()-start, 2)
    
    print('----------------------------------RESULTADOS----------------------------------')
    print('Número de Trades: ', trades)
    print('Número de Trades Assertivos: ', trades_assertivos, f' - Assertividade {round(100*(trades_assertivos/trades if trades else 0), 2)}%')
    print(f'Rentabilidade {round(100*rentabilidade, 2)}%')
    print(f'Tempo total da simulação: {tempo} segundos')
    
    return transacoes, trades, trades_assertivos, rentabilidade, tempo

# Testando Parâmetros

In [10]:
metodos = ['sJaccard', 'sFrobenius', 'sEdicao']
ns = [5, 10, 25, 35, 50]
tamanhos_media = [5, 10, 15]
concentracoes_max = [0.1, 0.075, 0.05]
qtds_minima_sinais = [3, 5, 7, 10]
similaridades_minimas = [0.65, 0.75, 0.85, 0.9]

def gerar_combinacoes_aleatorias(num_combinacoes):
    combinacoes = []
    for _ in range(num_combinacoes):
        combinacao = {
            'metodo': random.choice(metodos),
            'n': random.choice(ns),
            'tamanho_media': random.choice(tamanhos_media),
            'janela': random.choice(janelas),
            'concentracao_max': random.choice(concentracoes_max),
            'qtd_minima_sinais': random.choice(qtds_minima_sinais),
            'similaridade_minima': random.choice(similaridades_minimas)
        }
        combinacoes.append(combinacao)
    return combinacoes

combinacoes_aleatorias = gerar_combinacoes_aleatorias(2500)

In [12]:
def testa(combinacao): 
    
    transacoes, trades, trades_assertivos, rentabilidade, tempo = simulacao(
        combinacao['n'],
        combinacao['metodo'],
        combinacao['qtd_minima_sinais'],
        combinacao['similaridade_minima'],
        combinacao['tamanho_media'],
        combinacao['janela']['dataInicial'],
        combinacao['janela']['dataFinal'],
        concentracao_max=combinacao['concentracao_max']
    )
    
    
    if trades > 0:
    
        resultado_df = pd.DataFrame([{
            'n': combinacao['n'],
            'metodo': combinacao['metodo'],
            'qtd_minima_sinais': combinacao['qtd_minima_sinais'],
            'similaridade_minima': combinacao['similaridade_minima'],
            'tamanho_media': combinacao['tamanho_media'],
            'data_inicial': combinacao['janela']['dataInicial'],
            'data_final': combinacao['janela']['dataFinal'],
            'limiar_variacao': 0.0015,
            'concentracao_max': combinacao['concentracao_max'],
            'numero_trades': trades,
            'numero_trades_assertivos': trades_assertivos,
            'rentabilidade': rentabilidade,
            'tempo_simulacao': tempo,
            'transacoes': str(transacoes)
        }])

        resultado_df.to_sql('simulacoes', engine, index=False, if_exists='append')

In [None]:
for combinacao in combinacoes_aleatorias: 
    testa(combinacao)