In [ ]:
import os
os.environ['PYDEVD_DISABLE_FILE_VALIDATION'] = '1'
import dask.dataframe as dd
import datetime
import logging
import numpy as np
import pandas as pd
from dask.distributed import Client, progress
from numba import njit, types
from numba.typed import List
import time
from sqlalchemy import create_engine

# Configuração do logging
logging.basicConfig(level=logging.INFO)

# Configuração do cliente Dask
client = Client(n_workers=10, threads_per_worker=1, memory_limit='6.4GB')
logging.info(client)

# Caminhos dos arquivos - ajustados para o caminho correto
# Detecta o caminho base do projeto
notebook_dir = os.path.dirname(os.path.abspath(__file__)) if '__file__' in globals() else os.getcwd()
project_root = os.path.dirname(os.path.dirname(notebook_dir))

# Configuração de parâmetros
DATA_TYPE = 'futures'  # 'spot' ou 'futures'
FUTURES_TYPE = 'um'    # 'um' ou 'cm' (apenas para futures)
GRANULARITY = 'daily'  # 'daily' ou 'monthly'

# Constrói o caminho correto baseado nos parâmetros
if DATA_TYPE == 'spot':
    raw_dataset_path = os.path.join(project_root, 'datasets', f'dataset-raw-{GRANULARITY}-compressed-optimized', 'spot')
else:
    raw_dataset_path = os.path.join(project_root, 'datasets', f'dataset-raw-{GRANULARITY}-compressed-optimized', f'futures-{FUTURES_TYPE}')

output_base_path = os.path.join(project_root, 'output')
timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
output_path = f'{output_base_path}_v{timestamp}'

# Cria diretório de output se não existir
os.makedirs(output_base_path, exist_ok=True)

print(f"Raw dataset path: {raw_dataset_path}")
print(f"Output path: {output_base_path}")

# Verifica se o diretório existe
if not os.path.exists(raw_dataset_path):
    raise FileNotFoundError(f"O diretório de dados não foi encontrado: {raw_dataset_path}\n"
                          f"Certifique-se de que você executou as etapas anteriores do pipeline.")

# Imbalance Dollar Bars v3

## Descrição
Este notebook gera "imbalance dollar bars" a partir de dados de trades do Bitcoin. Ele usa processamento distribuído com Dask e otimizações com Numba para processar grandes volumes de dados eficientemente.

## Configuração
Antes de executar este notebook, certifique-se de:

1. **Executar o pipeline principal** (`main.py`) e completar as seguintes etapas:
   - Step 1: Download dos dados
   - Step 2: ZIP → CSV → Parquet Pipeline
   - Step 3: Optimize Parquet files

2. **Verificar os parâmetros** na primeira célula:
   - `DATA_TYPE`: 'spot' ou 'futures'
   - `FUTURES_TYPE`: 'um' ou 'cm' (apenas para futures)
   - `GRANULARITY`: 'daily' ou 'monthly'

3. **Dependências necessárias**:
   - dask
   - numba
   - pandas
   - numpy
   - pyarrow
   - openpyxl (para salvar Excel)

## Output
Os resultados são salvos como arquivos Excel no diretório `output/` na raiz do projeto.

In [ ]:
def read_parquet_files_optimized(raw_dataset_path, file):
    """Lê arquivos Parquet de forma otimizada."""
    parquet_pattern = os.path.join(raw_dataset_path, file)
    df_dask = dd.read_parquet(
        parquet_pattern,
        columns=['price', 'qty', 'quoteQty', 'time'],
        engine='pyarrow',
        dtype={'price': 'float32', 'qty': 'float32', 'quoteQty': 'float32'}
    )
    return df_dask

def assign_side_optimized(df):
    """Atribui o lado da negociação com base na mudança de preço."""
    df['side'] = np.where(df['price'].shift() > df['price'], 1,
                          np.where(df['price'].shift() < df['price'], -1, np.nan))
    df['side'] = df['side'].ffill().fillna(1).astype('int8')
    return df

def apply_operations_optimized(df_dask, meta):
    """Aplica operações otimizadas no DataFrame."""
    df_dask = df_dask.map_partitions(assign_side_optimized, meta=meta)
    df_dask['dollar_imbalance'] = df_dask['quoteQty'] * df_dask['side']
    return df_dask

# Função compilada com numba
@njit(
    types.Tuple((
        types.ListType(types.Tuple((
            types.float64,  # start_time
            types.float64,  # end_time
            types.float64,  # open
            types.float64,  # high
            types.float64,  # low
            types.float64,  # close
            types.float64,  # imbalance_col
            types.float64,  # total_volume_buy_usd
            types.float64,  # total_volume_usd
            types.float64   # total_volume
        ))),
        types.float64,  # exp_T
        types.float64,  # exp_dif
        types.Tuple((
            types.float64,  # bar_open
            types.float64,  # bar_high
            types.float64,  # bar_low
            types.float64,  # bar_close
            types.float64,  # bar_start_time
            types.float64,  # bar_end_time
            types.float64,  # current_imbalance
            types.float64,  # buy_volume_usd
            types.float64,  # total_volume_usd
            types.float64   # total_volume
        )),
        types.ListType(types.Tuple((
            types.float64,  # exp_T
            types.float64,  # exp_dif
            types.float64   # thres
        ))),
        types.int64     # warm_up_count
    ))(
        types.float64[:],  # prices
        types.float64[:],  # times
        types.float64[:],  # imbalances
        types.int8[:],     # sides
        types.float64[:],  # qtys
        types.float64,     # init_T
        types.float64,     # init_dif
        types.float64,     # alpha_volume
        types.float64,     # alpha_imbalance
        types.Tuple((
            types.float64,  # bar_open
            types.float64,  # bar_high
            types.float64,  # bar_low
            types.float64,  # bar_close
            types.float64,  # bar_start_time
            types.float64,  # bar_end_time
            types.float64,  # current_imbalance
            types.float64,  # buy_volume_usd
            types.float64,  # total_volume_usd
            types.float64   # total_volume
        )),
        types.int64        # warm_up_count
    )
)
def process_partition_imbalance_numba(
    prices, times, imbalances, sides, qtys,
    init_T, init_dif, alpha_volume, alpha_imbalance, res_init, warm_up_count
):
    """Processa uma partição usando numba para aceleração com período de warm-up."""
    exp_T = init_T
    exp_dif = init_dif
    threshold = exp_T * abs(exp_dif)
    
    # Contador de barras formadas (incluindo warm-up)
    bars_formed = warm_up_count

    bars = List()  # Lista tipada para armazenar as barras formadas
    params = List()

    # Desempacota res_init
    bar_open, bar_high, bar_low, bar_close, bar_start_time, bar_end_time, \
    current_imbalance, buy_volume_usd, total_volume_usd, total_volume = res_init

    # Verifica se res_init está inicializado (usando -1.0 como sentinela para não inicializado)
    if bar_open == -1.0:
        # Reseta as variáveis de agregação
        bar_open = np.nan
        bar_high = -np.inf
        bar_low = np.inf
        bar_close = np.nan
        bar_start_time = np.nan
        bar_end_time = np.nan
        current_imbalance = 0.0
        buy_volume_usd = 0.0
        total_volume_usd = 0.0
        total_volume = 0.0

    for i in range(len(prices)):
        if np.isnan(bar_open):
            bar_open = prices[i]
            bar_start_time = times[i]

        trade_price = prices[i]
        bar_high = max(bar_high, trade_price)
        bar_low = min(bar_low, trade_price)
        bar_close = trade_price

        trade_imbalance = imbalances[i]

        if sides[i] > 0:
            buy_volume_usd += trade_imbalance

        total_volume += qtys[i]
        total_volume_usd += abs(trade_imbalance)
        current_imbalance += trade_imbalance
        imbalance = abs(current_imbalance)

        if imbalance >= threshold:
            bar_end_time = times[i]
            bars_formed += 1

            # Só salva a barra se já passou o período de warm-up (5 barras)
            if bars_formed > 5:
                bars.append((
                    bar_start_time, bar_end_time, bar_open, bar_high, bar_low, bar_close,
                    current_imbalance, buy_volume_usd, total_volume_usd, total_volume
                ))

            # Atualiza os valores exponenciais (sempre, mesmo durante warm-up)
            if exp_dif == 1.0:
                exp_T = total_volume_usd
                exp_dif = abs(2 * buy_volume_usd / total_volume_usd - 1)
            else:
                exp_T += alpha_volume * (total_volume_usd - exp_T)
                exp_dif += alpha_imbalance * (abs(2 * buy_volume_usd / total_volume_usd - 1) - exp_dif)

            threshold = exp_T * abs(exp_dif)

            # Só salva os parâmetros se já passou o período de warm-up
            if bars_formed > 5:
                params.append((
                    exp_T, exp_dif, threshold
                ))

            # Reseta as variáveis de agregação
            bar_open = np.nan
            bar_high = -np.inf
            bar_low = np.inf
            bar_close = np.nan
            bar_start_time = np.nan
            bar_end_time = np.nan
            current_imbalance = 0.0
            buy_volume_usd = 0.0
            total_volume_usd = 0.0
            total_volume = 0.0

    # Prepara o estado final para a próxima partição
    final_state = (
        bar_open, bar_high, bar_low, bar_close,
        bar_start_time, bar_end_time, current_imbalance,
        buy_volume_usd, total_volume_usd, total_volume
    )

    return bars, exp_T, exp_dif, final_state, params, bars_formed

def create_imbalance_dollar_bars_numba(partition, init_T, init_dif, res_init, alpha_volume, alpha_imbalance, warm_up_count):
    """Função wrapper para processar uma partição com numba."""
    # Converte a partição para arrays numpy
    prices = partition['price'].values.astype(np.float64)
    times = partition['time'].values.astype(np.float64)
    imbalances = partition['dollar_imbalance'].values.astype(np.float64)
    sides = partition['side'].values.astype(np.int8)
    qtys = partition['qty'].values.astype(np.float64)

    # Inicializa res_init se vazio ou inválido
    if res_init is None or len(res_init) != 10:
        res_init = (-1.0, -1.0, -1.0, -1.0, -1.0, -1.0, 0.0, 0.0, 0.0, 0.0)

    # Processa a partição usando a função compilada com numba
    bars, exp_T, exp_dif, res_init, params, warm_up_count = process_partition_imbalance_numba(
        prices, times, imbalances, sides, qtys,
        init_T, init_dif, alpha_volume, alpha_imbalance, res_init, warm_up_count
    )

    # Converte as barras para um DataFrame
    if len(bars) > 0:
        bars_df = pd.DataFrame(bars, columns=[
            'start_time', 'end_time', 'open', 'high', 'low', 'close',
            'imbalance_col', 'total_volume_buy_usd', 'total_volume_usd', 'total_volume'
        ])
        params_df = pd.DataFrame(params, columns=['ewma_volume', 'ewma_dif', 'thres'])
    else:
        # Retorna um DataFrame vazio com as colunas apropriadas
        bars_df = pd.DataFrame(columns=[
            'start_time', 'end_time', 'open', 'high', 'low', 'close',
            'imbalance_col', 'total_volume_buy_usd', 'total_volume_usd', 'total_volume'
        ])
        params_df = pd.DataFrame(columns=['ewma_volume', 'ewma_dif', 'thres'])

    return bars_df, exp_T, exp_dif, res_init, params_df, warm_up_count

def batch_create_imbalance_dollar_bars_optimized(df_dask, init_T, init_dif, res_init, alpha_volume, alpha_imbalance, warm_up_count=0):
    """Processa partições em lote para criar barras de desequilíbrio em dólares."""
    results = []
    params_save = []
    for partition in range(df_dask.npartitions):
        logging.info(f'Processando partição {partition+1} de {df_dask.npartitions}')
        part = df_dask.get_partition(partition).compute()

        bars, init_T, init_dif, res_init, params, warm_up_count = create_imbalance_dollar_bars_numba(
            part, init_T, init_dif, res_init, alpha_volume, alpha_imbalance, warm_up_count
        )
        results.append(bars)
        params_save.append(params)
    # Filtra DataFrames vazios
    results = [df for df in results if not df.empty]
    params_save = [df for df in params_save if not df.empty]
    if results:
        results_df = pd.concat(results, ignore_index=True)
        params_df = pd.concat(params_save, ignore_index=True)
    else:
        # Retorna um DataFrame vazio com as colunas apropriadas se não houver resultados
        results_df = pd.DataFrame(columns=[
            'start_time', 'end_time', 'open', 'high', 'low', 'close',
            'imbalance_col', 'total_volume_buy_usd', 'total_volume_usd', 'total_volume'
        ])
        params_df = pd.DataFrame(columns=['ewma_volume', 'ewma_dif', 'thres'])
    return results_df, init_T, init_dif, res_init, params_df, warm_up_count

In [ ]:
# Meta DataFrame para map_partitions
meta = pd.DataFrame({
    'price': pd.Series(dtype='float32'),
    'qty': pd.Series(dtype='float32'),
    'quoteQty': pd.Series(dtype='float32'),
    'time': pd.Series(dtype='float64'),  # Alterado para float64 para compatibilidade com Numba
    'side': pd.Series(dtype='int8')
})

In [ ]:
# Verificar se o diretório existe e listar arquivos disponíveis
if os.path.exists(raw_dataset_path):
    files = [f for f in os.listdir(raw_dataset_path) if f.endswith('.parquet')]
    file_count = len(files)
    print(f"Encontrados {file_count} arquivos Parquet no diretório:")
    for i, f in enumerate(files[:5]):  # Mostra apenas os primeiros 5
        print(f"  {i+1}. {f}")
    if file_count > 5:
        print(f"  ... e mais {file_count - 5} arquivos")
else:
    print(f"ERRO: O diretório {raw_dataset_path} não existe!")
    print("Verifique se você executou as etapas anteriores do pipeline:")
    print("  1. Download dos dados")
    print("  2. Conversão para Parquet")
    print("  3. Otimização dos arquivos Parquet")
    files = []
    file_count = 0

In [ ]:
# Apenas cria initial_state se houver arquivos
if file_count > 0:
    initial_state = [[init_T0, alpha_volume/100, alpha_imbalance/100, number]
                      for init_T0 in range(1_000_000, 1_000_000_000, 200_000_000)
                      for alpha_volume in range(10, 100, 25)
                      for alpha_imbalance in range(10, 100, 25)
                      for number in range(1, file_count)]

    initial_state = initial_state[:file_count-1]
    print(f"Criados {len(initial_state)} estados iniciais para processamento")
    initial_state[:5]  # Mostra apenas os primeiros 5
else:
    initial_state = []
    print("Nenhum arquivo encontrado para processar!")

In [ ]:
processing_times = {}

# Verifica se há estados para processar
if not initial_state:
    print("Nenhum estado inicial para processar. Verifique se há arquivos no diretório de dados.")
else:
    for init_T0, alpha_volume, alpha_imbalance, number in initial_state:
        if number == 1:
            start_time = time.time()
            output_file = f'imbalance_dolar_{init_T0}-{alpha_volume}-{alpha_imbalance}'
            results = pd.DataFrame()
            params = pd.DataFrame()
            init_dif = 1.0
            res_init = (-1.0, -1.0, -1.0, -1.0, -1.0, -1.0, 0.0, 0.0, 0.0, 0.0)  # Estado inicial
            init_T = init_T0
            warm_up_count = 0  # Inicializa o contador de warm-up
            logging.info(f"params_{output_file}")

        number = str(number).zfill(3)
        file = f'BTCUSDT-Trades-Optimized-{number}.parquet'

        logging.info(f"Dask n{number} de {file_count-1}")

        if not os.path.exists(os.path.join(raw_dataset_path, file)):
            logging.warning(f"Arquivo {file} não encontrado. Pulando para o próximo.")
            continue

        df_dask = read_parquet_files_optimized(raw_dataset_path, file)

        df_dask = apply_operations_optimized(df_dask, meta)

        bars, init_T, init_dif, res_init, params_df, warm_up_count = batch_create_imbalance_dollar_bars_optimized(
            df_dask, init_T, init_dif, res_init, alpha_volume, alpha_imbalance, warm_up_count
        )
        results = pd.concat([results, bars], ignore_index=True)
        params = pd.concat([params, params_df], ignore_index=True)

        if number == str(file_count - 1):
            bar_open, bar_high, bar_low, bar_close, bar_start_time, bar_end_time, \
            current_imbalance, buy_volume_usd, total_volume_usd, total_volume = res_init

            bar_end_time = df_dask['time'].tail().iloc[-1]

            lastbar = [[bar_start_time, bar_end_time, bar_open, bar_high, bar_low, bar_close,
                            current_imbalance, buy_volume_usd, total_volume_usd, total_volume]]

            lastbar = pd.DataFrame(lastbar, columns=['start_time', 'end_time', 'open', 'high', 'low', 'close', 'imbalance_col', 'total_volume_buy_usd', 'total_volume_usd', 'total_volume'])

            results = pd.concat([results, lastbar], ignore_index=True)

            results_ = results.copy()

            # results_['start_time'] = pd.to_datetime(results_['start_time'], unit='ns')  # Ajuste para 's' se 'time' era em segundos
            # results_['end_time'] = pd.to_datetime(results_['end_time'], unit='ns')

            results_['start_time'] = pd.to_datetime(results_['start_time'])  # Ajuste para 's' se 'time' era em segundos
            results_['end_time'] = pd.to_datetime(results_['end_time'])
            results_.drop(columns=['start_time'], inplace=True)

            results_['params'] = output_file

            results_['time_trial'] = timestamp
            
            # Salva o arquivo Excel no diretório de output
            output_excel_path = os.path.join(output_base_path, f'{output_file}.xlsx')
            results_.to_excel(output_excel_path, index=False)
            logging.info(f"Arquivo salvo em: {output_excel_path}")
            logging.info(f"Total de barras formadas (incluindo warm-up): {warm_up_count}")
            logging.info(f"Total de barras salvas (excluindo warm-up): {len(results_)}")

            # df_eq = pd.DataFrame([[init_T, init_dif, output_file]], columns=['v-ewma', 'imbalance-ewma', 'params'])

            # # host = "superset-postgresql.default.svc.cluster.local"  # Nome do serviço no Kubernetes÷
            # host = "localhost"  # Agora o PostgreSQL está acessível via localhost
            # port = 5432  # Porta do PostgreSQL
            # dbname = "superset"  # Nome do banco de dados
            # user = "superset"  # Usuário do banco de dados
            # password = "superset"  # Senha do banco de dados


            # # Conectar ao banco de dados PostgreSQL usando SQLAlchemy
            # connection_string = f"postgresql://{user}:{password}@{host}:{port}/{dbname}"

            # with create_engine(connection_string).connect() as connection:
            #     # Enviar o DataFrame para o PostgreSQL
            #     results_.to_sql('imbalance-bars-start', connection, if_exists='append', index=False)

            #     print("Dados enviados para o banco de dados com sucesso!")
            #     # Finaliza a medição do tempo

            end_time = time.time()
            elapsed_time = end_time - start_time
            elapsed_time_minutes = elapsed_time / 60  # Converte para minutos
            # Armazena o tempo de processamento
            processing_times[file] = elapsed_time_minutes
            logging.info(f"Tempo de processamento para {file}: {elapsed_time_minutes:.2f} minutos")

In [ ]:
processing_times = {}

for init_T0, alpha_volume, alpha_imbalance, number in initial_state:
    if number == 1:
        start_time = time.time()
        output_file = f'imbalance_dolar_{init_T0}-{alpha_volume}-{alpha_imbalance}'
        results = pd.DataFrame()
        params = pd.DataFrame()
        init_dif = 1.0
        res_init = (-1.0, -1.0, -1.0, -1.0, -1.0, -1.0, 0.0, 0.0, 0.0, 0.0)  # Estado inicial
        init_T = init_T0
        warm_up_count = 0  # Inicializa o contador de warm-up
        logging.info(f"params_{output_file}")

    number = str(number).zfill(3)
    file = f'BTCUSDT-Trades-Optimized-{number}.parquet'

    logging.info(f"Dask n{number} de {file_count-1}")

    if not os.path.exists(os.path.join(raw_dataset_path, file)):
        logging.warning(f"Arquivo {file} não encontrado. Pulando para o próximo.")
        continue

    df_dask = read_parquet_files_optimized(raw_dataset_path, file)

    df_dask = apply_operations_optimized(df_dask, meta)

    bars, init_T, init_dif, res_init, params_df, warm_up_count = batch_create_imbalance_dollar_bars_optimized(
        df_dask, init_T, init_dif, res_init, alpha_volume, alpha_imbalance, warm_up_count
    )
    results = pd.concat([results, bars], ignore_index=True)
    params = pd.concat([params, params_df], ignore_index=True)

    if number == str(file_count - 1):
        bar_open, bar_high, bar_low, bar_close, bar_start_time, bar_end_time, \
        current_imbalance, buy_volume_usd, total_volume_usd, total_volume = res_init

        bar_end_time = df_dask['time'].tail().iloc[-1]

        lastbar = [[bar_start_time, bar_end_time, bar_open, bar_high, bar_low, bar_close,
                        current_imbalance, buy_volume_usd, total_volume_usd, total_volume]]

        lastbar = pd.DataFrame(lastbar, columns=['start_time', 'end_time', 'open', 'high', 'low', 'close', 'imbalance_col', 'total_volume_buy_usd', 'total_volume_usd', 'total_volume'])

        results = pd.concat([results, lastbar], ignore_index=True)

        results_ = results.copy()

        # results_['start_time'] = pd.to_datetime(results_['start_time'], unit='ns')  # Ajuste para 's' se 'time' era em segundos
        # results_['end_time'] = pd.to_datetime(results_['end_time'], unit='ns')

        results_['start_time'] = pd.to_datetime(results_['start_time'])  # Ajuste para 's' se 'time' era em segundos
        results_['end_time'] = pd.to_datetime(results_['end_time'])
        results_.drop(columns=['start_time'], inplace=True)

        results_['params'] = output_file

        results_['time_trial'] = timestamp
        results_.to_excel(f'../output/{output_file}.xlsx', index=False)
        logging.info(f"Arquivo salvo em: ../output/{output_file}.xlsx")
        logging.info(f"Total de barras formadas (incluindo warm-up): {warm_up_count}")
        logging.info(f"Total de barras salvas (excluindo warm-up): {len(results_)}")

        # df_eq = pd.DataFrame([[init_T, init_dif, output_file]], columns=['v-ewma', 'imbalance-ewma', 'params'])

        # # host = "superset-postgresql.default.svc.cluster.local"  # Nome do serviço no Kubernetes÷
        # host = "localhost"  # Agora o PostgreSQL está acessível via localhost
        # port = 5432  # Porta do PostgreSQL
        # dbname = "superset"  # Nome do banco de dados
        # user = "superset"  # Usuário do banco de dados
        # password = "superset"  # Senha do banco de dados


        # # Conectar ao banco de dados PostgreSQL usando SQLAlchemy
        # connection_string = f"postgresql://{user}:{password}@{host}:{port}/{dbname}"

        # with create_engine(connection_string).connect() as connection:
        #     # Enviar o DataFrame para o PostgreSQL
        #     results_.to_sql('imbalance-bars-start', connection, if_exists='append', index=False)

        #     print("Dados enviados para o banco de dados com sucesso!")
        #     # Finaliza a medição do tempo

        end_time = time.time()
        elapsed_time = end_time - start_time
        elapsed_time_minutes = elapsed_time / 60  # Converte para minutos
        # Armazena o tempo de processamento
        processing_times[file] = elapsed_time_minutes
        logging.info(f"Tempo de processamento para {file}: {elapsed_time_minutes:.2f} minutos")