In [1]:
import os
os.environ['PYDEVD_DISABLE_FILE_VALIDATION'] = '1'
import dask.dataframe as dd
import datetime
import logging
import numpy as np
import pandas as pd
from dask.distributed import Client, progress
from numba import njit, types
from numba.typed import List
import time
from sqlalchemy import create_engine

# Configuração do logging
logging.basicConfig(level=logging.INFO)

# Configuração do cliente Dask
client = Client(n_workers=10, threads_per_worker=1, memory_limit='6.4GB')
logging.info(client)

# Caminhos dos arquivos
raw_dataset_path = '../datasets/dataset-raw-daily-compressed/'
output_base_path = '../output'
timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
output_path = f'{output_base_path}_v{timestamp}'

Perhaps you already have a cluster running?
Hosting the HTTP server on port 64191 instead
INFO:root:<Client: 'tcp://127.0.0.1:64192' processes=10 threads=10, memory=59.60 GiB>


In [2]:
def read_parquet_files_optimized(raw_dataset_path, file):
    """Lê arquivos Parquet de forma otimizada."""
    parquet_pattern = os.path.join(raw_dataset_path, file)
    df_dask = dd.read_parquet(
        parquet_pattern,
        columns=['price', 'qty', 'quoteQty', 'time'],
        engine='pyarrow',
        dtype={'price': 'float32', 'qty': 'float32', 'quoteQty': 'float32'}
    )
    return df_dask

def assign_side_optimized(df):
    """Atribui o lado da negociação com base na mudança de preço."""
    df['side'] = np.where(df['price'].shift() > df['price'], 1,
                          np.where(df['price'].shift() < df['price'], -1, np.nan))
    df['side'] = df['side'].ffill().fillna(1).astype('int8')
    return df

def apply_operations_optimized(df_dask, meta):
    """Aplica operações otimizadas no DataFrame."""
    df_dask = df_dask.map_partitions(assign_side_optimized, meta=meta)
    df_dask['dollar_imbalance'] = df_dask['quoteQty'] * df_dask['side']
    return df_dask

# Função compilada com Numba
@njit(
    types.Tuple((
        # Primeiro elemento: lista de barras (cada barra é uma tupla)
        types.ListType(types.Tuple((types.float64,)*10)),
        # Segundo elemento: estado final da partição atual (tupla com 10 valores float64)
        types.Tuple((types.float64,) * 10),
    ))(
        # Parâmetros de entrada
        types.float64[:],   # prices - Preços das operações
        types.float64[:],   # times - Horários (em milissegundos/seconds)
        types.float64[:],   # imbalances - Dados de desequilíbrio em USD
        types.int8[:],      # sides - Lado da negociação (-1, 0 ou +1)
        types.float64[:],   # qtys - Quantidade negociada (em ordens)
        types.Tuple((types.float64,) * 10),    # res_init: estado inicial
        types.float64       # dollar_volume: volume em dólar para formar a barra
    )
)
def process_partition_dollar_numba(
    prices, times, imbalances, sides, qtys, res_init, dollar_volume
):
    """Processa uma partição usando numba para aceleração."""
    threshold = dollar_volume

    bars = List()  # Lista tipada para armazenar as barras formadas

    # Desempacota res_init
    bar_open, bar_high, bar_low, bar_close, bar_start_time, bar_end_time, \
    current_imbalance, buy_volume_usd, total_volume_usd, total_volume = res_init

    # Verifica se res_init está inicializado (usando -1.0 como sentinela para não inicializado)
    if bar_open == -1.0:
        # Reseta as variáveis de agregação
        bar_open = np.nan
        bar_high = -np.inf
        bar_low = np.inf
        bar_close = np.nan
        bar_start_time = np.nan
        bar_end_time = np.nan
        current_imbalance = 0.0
        buy_volume_usd = 0.0
        total_volume_usd = 0.0
        total_volume = 0.0

    for i in range(len(prices)):
        if np.isnan(bar_open):
            bar_open = prices[i]
            bar_start_time = times[i]

        trade_price = prices[i]
        bar_high = max(bar_high, trade_price)
        bar_low = min(bar_low, trade_price)
        bar_close = trade_price

        trade_imbalance = imbalances[i]

        if sides[i] > 0:
            buy_volume_usd += trade_imbalance

        total_volume += qtys[i]
        total_volume_usd += abs(trade_imbalance)
        current_imbalance += trade_imbalance

        if total_volume_usd >= threshold:
            bar_end_time = times[i]

            # Salva a barra formada
            bars.append((
                bar_start_time, bar_end_time, bar_open, bar_high, bar_low, bar_close,
                current_imbalance, buy_volume_usd, total_volume_usd, total_volume
            ))

            # Reseta as variáveis de agregação
            bar_open = np.nan
            bar_high = -np.inf
            bar_low = np.inf
            bar_close = np.nan
            bar_start_time = np.nan
            bar_end_time = np.nan
            current_imbalance = 0.0
            buy_volume_usd = 0.0
            total_volume_usd = 0.0
            total_volume = 0.0

    # Prepara o estado final para a próxima partição
    final_state = (
        bar_open, bar_high, bar_low, bar_close,
        bar_start_time, bar_end_time, current_imbalance,
        buy_volume_usd, total_volume_usd, total_volume
    )

    return bars, final_state

def create_dollar_bars_numba(partition, res_init, dollar_volume):
    """Função wrapper para processar uma partição com numba."""
    # Converte a partição para arrays numpy
    prices = partition['price'].values.astype(np.float64)
    times = partition['time'].values.astype(np.float64)
    imbalances = partition['dollar_imbalance'].values.astype(np.float64)
    sides = partition['side'].values.astype(np.int8)
    qtys = partition['qty'].values.astype(np.float64)

    # Inicializa res_init se vazio ou inválido
    if res_init is None or len(res_init) != 10:
        res_init = (-1.0, -1.0, -1.0, -1.0, -1.0, -1.0, 0.0, 0.0, 0.0, 0.0)

    # Processa a partição usando a função compilada com numba
    bars, res_init = process_partition_dollar_numba(  # Corrected to unpack two values
        prices, times, imbalances, sides, qtys, res_init, dollar_volume
    )

    # Converte as barras para um DataFrame
    if len(bars) > 0:
        bars_df = pd.DataFrame(bars, columns=[
            'start_time', 'end_time', 'open', 'high', 'low', 'close',
            'imbalance_col', 'total_volume_buy_usd', 'total_volume_usd', 'total_volume'
        ])
    else:
        # Retorna um DataFrame vazio com as colunas apropriadas
        bars_df = pd.DataFrame(columns=[
            'start_time', 'end_time', 'open', 'high', 'low', 'close',
            'imbalance_col', 'total_volume_buy_usd', 'total_volume_usd', 'total_volume'
        ])

    return bars_df, res_init

def batch_create_dollar_bars_optimized(df_dask, res_init, dollar_volume):
    """Processa partições em lote para criar barras de desequilíbrio em dólares."""
    results = []
    for partition in range(df_dask.npartitions):
        logging.info(f'Processando partição {partition+1} de {df_dask.npartitions}')
        part = df_dask.get_partition(partition).compute()

        bars, res_init = create_dollar_bars_numba(
            part, res_init, dollar_volume
        )
        results.append(bars)
    # Filtra DataFrames vazios
    results = [df for df in results if not df.empty]
    if results:
        results_df = pd.concat(results, ignore_index=True)
    else:
        # Retorna um DataFrame vazio com as colunas apropriadas se não houver resultados
        results_df = pd.DataFrame(columns=[
            'start_time', 'end_time', 'open', 'high', 'low', 'close',
            'imbalance_col', 'total_volume_buy_usd', 'total_volume_usd', 'total_volume'
        ])
    return results_df, res_init

In [3]:
# len(initial_state)*8/60/24
# initial_state = [40000000]  # Valores individuais

In [4]:
# import pandas as pd
# df_agg_m2 = pd.read_excel('agg_m2.xlsx')

In [5]:
# df_agg_m2

In [None]:
# Meta DataFrame para map_partitions
meta = pd.DataFrame({
    'price': pd.Series(dtype='float32'),
    'qty': pd.Series(dtype='float32'),
    'quoteQty': pd.Series(dtype='float32'),
    'time': pd.Series(dtype='float64'),  # Alterado para float64 para compatibilidade com Numba
    'side': pd.Series(dtype='int8')
})

# Listar arquivos
files = [f for f in os.listdir(raw_dataset_path) if os.path.isfile(os.path.join(raw_dataset_path, f))]
file_count = len(files)

initial_state = [[dollar_volume, number]
                  for dollar_volume in range(4_000_000, 40_000_000, 500_000)
                  for number in range(1, file_count)]

for dollar_volume, number in initial_state:
    if number == 1:
        start_time = time.time()
        output_file = f'dollar_bars_v{dollar_volume}'
        logging.info(output_file)

        results = pd.DataFrame()

        # init_T0 = 100_000_000
        init_dif = 1.0
        # alpha_volume = 0.5
        # alpha_imbalance = 0.9
        res_init = (-1.0, -1.0, -1.0, -1.0, -1.0, -1.0, 0.0, 0.0, 0.0, 0.0)  # Estado inicial

    # Dicionário para armazenar os tempos de processamento
    processing_times = {}

    logging.info(f"Dask n{number} de {file_count-1}")
    number = str(number).zfill(2)
    file = f'BTCUSDT-Trades-{number}.parquet'

    if not os.path.exists(os.path.join(raw_dataset_path, file)):
        logging.warning(f"Arquivo {file} não encontrado. Pulando para o próximo.")
        continue

    df_dask = read_parquet_files_optimized(raw_dataset_path, file)

    df_dask = apply_operations_optimized(df_dask, meta)

    bars, res_init = batch_create_dollar_bars_optimized(
        df_dask, res_init, dollar_volume
    )
    results = pd.concat([results, bars], ignore_index=True)

    if number == str(file_count - 1):
        bar_open, bar_high, bar_low, bar_close, bar_start_time, bar_end_time, \
        current_imbalance, buy_volume_usd, total_volume_usd, total_volume = res_init

        bar_end_time = df_dask['time'].tail().iloc[-1]

        lastbar = [[bar_start_time, bar_end_time, bar_open, bar_high, bar_low, bar_close,
                        current_imbalance, buy_volume_usd, total_volume_usd, total_volume]]

        lastbar = pd.DataFrame(lastbar, columns=['start_time', 'end_time', 'open', 'high', 'low', 'close', 'imbalance_col', 'total_volume_buy_usd', 'total_volume_usd', 'total_volume'])

        results = pd.concat([results, lastbar], ignore_index=True)

        results_ = results.copy()

        output_file = f'dollar_v{dollar_volume}'

        # results_['start_time'] = pd.to_datetime(results_['start_time'], unit='ns')  # Ajuste para 's' se 'time' era em segundos
        # results_['end_time'] = pd.to_datetime(results_['end_time'], unit='ns')

        results_['start_time'] = pd.to_datetime(results_['start_time'])  # Ajuste para 's' se 'time' era em segundos
        results_['end_time'] = pd.to_datetime(results_['end_time'])
        results_.drop(columns=['start_time'], inplace=True)

        results_['params'] = output_file

        results_['time_trial'] = timestamp
        # results_.to_excel(f'../output/{output_file}.xlsx', index=False)

        # # host = "superset-postgresql.default.svc.cluster.local"  # Nome do serviço no Kubernetes÷
        # host = "localhost"  # Agora o PostgreSQL está acessível via localhost
        # port = 5432  # Porta do PostgreSQL
        # dbname = "superset"  # Nome do banco de dados
        # user = "superset"  # Usuário do banco de dados
        # password = "superset"  # Senha do banco de dados


        # # Conectar ao banco de dados PostgreSQL usando SQLAlchemy
        # connection_string = f"postgresql://{user}:{password}@{host}:{port}/{dbname}"

        # with create_engine(connection_string).connect() as connection:
        #     # Enviar o DataFrame para o PostgreSQL
        #     results_.to_sql('imbalance-bars-start', connection, if_exists='append', index=False)

        #     print("Dados enviados para o banco de dados com sucesso!")

        # Finaliza a medição do tempo
        end_time = time.time()
        elapsed_time = end_time - start_time
        elapsed_time_minutes = elapsed_time / 60  # Converte para minutos

        # Armazena o tempo de processamento
        processing_times[file] = elapsed_time_minutes
        logging.info(f"Tempo de processamento para {file}: {elapsed_time_minutes:.2f} minutos")

        results_.to_parquet(f'../output/{output_file}.parquet', index=False)

        results_


INFO:root:dollar_bars_v3000000
INFO:root:Dask n1 de 11
INFO:root:Processando partição 1 de 48
INFO:root:Processando partição 2 de 48
INFO:root:Processando partição 3 de 48
INFO:root:Processando partição 4 de 48
INFO:root:Processando partição 5 de 48
INFO:root:Processando partição 6 de 48
INFO:root:Processando partição 7 de 48
INFO:root:Processando partição 8 de 48
INFO:root:Processando partição 9 de 48
INFO:root:Processando partição 10 de 48
INFO:root:Processando partição 11 de 48
INFO:root:Processando partição 12 de 48
INFO:root:Processando partição 13 de 48
INFO:root:Processando partição 14 de 48
INFO:root:Processando partição 15 de 48
INFO:root:Processando partição 16 de 48
INFO:root:Processando partição 17 de 48
INFO:root:Processando partição 18 de 48
INFO:root:Processando partição 19 de 48
INFO:root:Processando partição 20 de 48
INFO:root:Processando partição 21 de 48
INFO:root:Processando partição 22 de 48
INFO:root:Processando partição 23 de 48
INFO:root:Processando partição 24 

ValueError: This sheet is too large! Your sheet size is: 1662741, 11 Max sheet size is: 1048576, 16384

2025-06-22 19:33:52,045 - tornado.application - ERROR - Exception in callback <bound method SystemMonitor.update of <SystemMonitor: cpu: 3 memory: 194 MB fds: 25>>
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.12/site-packages/tornado/ioloop.py", line 937, in _run
    val = self.callback()
          ^^^^^^^^^^^^^^^
  File "/opt/anaconda3/lib/python3.12/site-packages/distributed/system_monitor.py", line 168, in update
    net_ioc = psutil.net_io_counters()
              ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/anaconda3/lib/python3.12/site-packages/psutil/__init__.py", line 2139, in net_io_counters
    rawdict = _psplatform.net_io_counters()
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
OSError: [Errno 12] Cannot allocate memory


In [13]:
results_

Unnamed: 0,end_time,open,high,low,close,imbalance_col,total_volume_buy_usd,total_volume_usd,total_volume,params,time_trial
0,2017-08-17 22:38:40.767000064,4261.48,4485.39,4200.74,4328.24,2.505353e+05,1.626170e+06,3.001805e+06,690.316375,dollar_v3000000,20250622-185844
1,2017-08-18 08:56:22.560999936,4328.24,4359.13,4134.61,4293.55,5.525937e+05,1.776412e+06,3.000231e+06,701.045414,dollar_v3000000,20250622-185844
2,2017-08-19 04:05:26.527000064,4293.55,4371.52,3938.77,4138.55,4.248407e+05,1.712459e+06,3.000077e+06,714.854092,dollar_v3000000,20250622-185844
3,2017-08-20 23:54:00.081999872,4156.00,4211.08,3850.00,4087.42,2.241448e+05,1.613485e+06,3.002826e+06,733.140932,dollar_v3000000,20250622-185844
4,2017-08-22 00:56:54.788000000,4106.36,4119.62,3860.49,3860.49,1.068438e+05,1.553629e+06,3.000414e+06,742.945162,dollar_v3000000,20250622-185844
...,...,...,...,...,...,...,...,...,...,...,...
1662736,2025-06-08 23:36:42.151849984,105768.33,105772.61,105656.50,105674.42,1.972748e+06,2.487835e+06,3.002922e+06,28.410520,dollar_v3000000,20250622-185844
1662737,2025-06-08 23:43:47.842355968,105674.42,105744.19,105655.26,105671.71,7.352472e+05,1.869734e+06,3.004221e+06,28.421850,dollar_v3000000,20250622-185844
1662738,2025-06-08 23:51:56.993444096,105671.70,105703.36,105617.52,105654.57,3.453606e+05,1.675314e+06,3.005267e+06,28.443490,dollar_v3000000,20250622-185844
1662739,2025-06-08 23:59:45.734322944,105654.57,105755.84,105609.56,105734.06,-1.153405e+05,1.445757e+06,3.006854e+06,28.453500,dollar_v3000000,20250622-185844


In [14]:
results_.to_parquet(f'../output/{output_file}.parquet', index=False)

In [None]:
# # df_eq = pd.DataFrame([[init_T, init_dif, output_file]], columns=['v-ewma', 'imbalance-ewma', 'params'])

# # host = "superset-postgresql.default.svc.cluster.local"  # Nome do serviço no Kubernetes÷

# host = "localhost"  # Agora o PostgreSQL está acessível via localhost
# port = 5432  # Porta do PostgreSQL
# dbname = "superset"  # Nome do banco de dados
# user = "superset"  # Usuário do banco de dados
# password = "superset"  # Senha do banco de dados


# # Conectar ao banco de dados PostgreSQL usando SQLAlchemy
# connection_string = f"postgresql://{user}:{password}@{host}:{port}/{dbname}"

# with create_engine(connection_string).connect() as connection:
#     # Enviar o DataFrame para o PostgreSQL
#     results_.to_sql('imbalance-bars-start', connection, if_exists='append', index=False)

#     print("Dados enviados para o banco de dados com sucesso!")

OperationalError: (psycopg2.OperationalError) connection to server at "localhost" (::1), port 5432 failed: Connection refused
	Is the server running on that host and accepting TCP/IP connections?
connection to server at "localhost" (127.0.0.1), port 5432 failed: Connection refused
	Is the server running on that host and accepting TCP/IP connections?

(Background on this error at: https://sqlalche.me/e/20/e3q8)

In [None]:
# with create_engine(connection_string).connect() as conn:
#     query = """ SELECT * FROM "imbalance-bars-start" """
#     df = pd.read_sql(query, conn)

# df_filt = df[df['params'] == 'dollar_v40000000']
# df_filt

In [None]:
# import mplfinance as mpf
# # import matplotlib
# import pandas as pd
# # matplotlib.use('TkAgg')  # Ou outro backend interativo

# df = results_.copy()

# # Convertendo a coluna 'start_time' para o índice do DataFrame
# df['end_time'] = pd.to_datetime(df['end_time'])
# df.set_index('end_time', inplace=True)

# # Selecionando apenas as colunas necessárias para o mplfinance
# df = df[['open', 'high', 'low', 'close', 'imbalance_col', 'total_volume_usd']]

# # Renomeando as colunas para um formato mais amigável
# df.rename(columns={
#     'open': 'Open',
#     'high': 'High',
#     'low': 'Low',
#     'close': 'Close',
#     'imbalance_col': 'Volume'}, inplace=True)

# ap = mpf.make_addplot(df['total_volume_usd'], panel=2, color='C0', ylabel='USD Total Volume', type='bar', width=1)
# # Plotando o gráfico OHLC em preto e branco
# fig = mpf.plot(df, type='candle', title='Gráfico OHLC', volume=True, addplot=ap, show_nontrading=False)

# # Show the plot
# mpf.show()


In [None]:
# import pandas as pd
# import plotly.graph_objects as go

# df = results.copy()

# df['end_time'] = pd.to_datetime(df['end_time'])

# # Criando uma nova coluna com data e hora formatadas como strings (YYYY-MM-DD HH:MM:SS%S)
# df['end_time_str'] = df['end_time'].dt.strftime('%Y-%m-%d %H:%M')

# # Criando o gráfico de candlestick
# fig = go.Figure(data=[go.Candlestick(x=df['end_time_str'],
#                                        open=df['open'],
#                                        high=df['high'],
#                                        low=df['low'],
#                                        close=df['close'],
#                                        increasing_line_color='slateblue',
#                                        decreasing_line_color='black')])

# # Adicionando título e rótulos
# fig.update_layout(title='Gráfico de Candlestick',
#                   xaxis_title='Data',
#                   yaxis_title='Preço',
#                   xaxis_rangeslider_visible=True,
#                   plot_bgcolor='white',
#                   paper_bgcolor='whitesmoke')


# fig.update_xaxes(type='category', tickangle=-45)

# fig.update_yaxes(
#     type='log',
#     tickmode='auto'# Escala logarítmica
# )

# # Exibindo o gráfico
# fig.show()