In [2]:
import logging
import sys
from pathlib import Path

from storage.base import LocalStorage
from etl.orchestrators.ccxt_segment_pipeline import CcxtSegmentPipeline

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

def main():
    # Setup storage
    storage = LocalStorage(base_path="./data")
    
    # Setup pipeline
    pipeline = CcxtSegmentPipeline(
        storage=storage,
        output_base_path="processed/test_ccxt"
    )
    
    # Test file
    test_file = "data/raw/ready/ccxt_coinbaseadvanced/segment_20251206T21_00001.ndjson"
    
    if not Path(test_file).exists():
        print(f"Test file not found: {test_file}")
        return
        
    print(f"Processing {test_file}...")
    # Only run orderbook
    results = pipeline.process_segment(Path(test_file), channels=["orderbook"])
    
    print("\nResults:")
    for channel, stats in results.items():
        print(f"{channel}: {stats}")

main()

2025-12-07 23:07:32,465 - etl.processors.raw_parser - INFO - [RawParser] Initialized for source=ccxt, channel_filter=ticker
2025-12-07 23:07:32,465 - etl.writers.parquet_writer - INFO - [ParquetWriter] Initialized: storage=local, compression=snappy
2025-12-07 23:07:32,465 - etl.orchestrators.pipeline - INFO - [ETLPipeline] Initialized: reader=NDJSONReader, processors=RawParser → CcxtTickerProcessor, writer=ParquetWriter
2025-12-07 23:07:32,465 - etl.processors.raw_parser - INFO - [RawParser] Initialized for source=ccxt, channel_filter=trades
2025-12-07 23:07:32,465 - etl.writers.parquet_writer - INFO - [ParquetWriter] Initialized: storage=local, compression=snappy
2025-12-07 23:07:32,465 - etl.orchestrators.pipeline - INFO - [ETLPipeline] Initialized: reader=NDJSONReader, processors=RawParser → CcxtTradesProcessor, writer=ParquetWriter
2025-12-07 23:07:32,465 - etl.processors.raw_parser - INFO - [RawParser] Initialized for source=ccxt, channel_filter=orderbook
2025-12-07 23:07:32,465 -

Processing data/raw/ready/ccxt_coinbaseadvanced/segment_20251206T21_00001.ndjson...


2025-12-07 23:07:39,580 - etl.readers.ndjson_reader - INFO - [NDJSONReader] Read 249 records from segment_20251206T21_00001.ndjson (0 errors)
2025-12-07 23:07:42,232 - etl.writers.parquet_writer - INFO - [ParquetWriter] Wrote 137 records to processed/test_ccxt/orderbook/exchange=coinbaseadvanced/symbol=BTC-USD/date=2025-12-06/part_20251207T23_2555308e.parquet (49941.4 KB)
2025-12-07 23:07:42,232 - etl.writers.parquet_writer - INFO - [ParquetWriter] Wrote 137 records to processed/test_ccxt/orderbook/exchange=coinbaseadvanced/symbol=BTC-USD/date=2025-12-06/part_20251207T23_2555308e.parquet (49941.4 KB)
2025-12-07 23:07:43,016 - etl.writers.parquet_writer - INFO - [ParquetWriter] Wrote 90 records to processed/test_ccxt/orderbook/exchange=coinbaseadvanced/symbol=ETH-USD/date=2025-12-06/part_20251207T23_572635e9.parquet (17300.1 KB)
2025-12-07 23:07:43,016 - etl.writers.parquet_writer - INFO - [ParquetWriter] Total: 227 records written across 2 files
2025-12-07 23:07:43,016 - etl.orchestrat


Results:
orderbook: {'records_written': 227, 'files_written': 2, 'errors': 0}


In [None]:
# CCXT Implementation Plan
# 1. Add CcxtConfig to config/config.py
# 2. Create ingestion/collectors/ccxt_collector.py
# 3. Update ingestion/orchestrators/ingestion_pipeline.py
# 4. Update config/config.examples.yaml

In [1]:
import sys
import os
import duckdb
import logging
from tqdm import tqdm
import polars as pl
import pandas as pd
import numpy as np
from pathlib import Path
from config import load_config
import pyarrow.parquet as pq
import pyarrow as pa

logger = logging.getLogger(__name__)

config = load_config("config/config.yaml")

logging.basicConfig(
        level=getattr(logging, config.log_level),
        format=config.log_format
    )

  class DatabentoConfig(BaseModel):


In [None]:
pl_df = pl.scan_parquet("F:/processed/coinbase/ticker/**/*.parquet")
df = pl_df.filter(pl.col("product_id") == "BTC-USD").collect()
df

## Compaction

In [None]:
from etl.repartitioner import ParquetCompactor

compactor = ParquetCompactor(
    dataset_dir="F:/processed/coinbase/level2/",  # Directory containing the dataset
    target_file_size_mb=100,  # Target 100MB files
)

stats = compactor.compact(
    min_file_count=2,          # Only compact partitions with 2+ files
    target_file_count=1,       # Consolidate to exactly 1 file per partition
    delete_source_files=True,  # Delete original files after compaction
    dry_run=False,
)
stats

## Syncing Local & Cloud

In [None]:
import logging
from pathlib import Path
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
from config import load_config
from storage.factory import (
    create_etl_storage_input,
    create_etl_storage_output,
)
from etl.job import ETLJob

logger = logging.getLogger(__name__)

config = load_config("config/config.yaml")

logging.basicConfig(
        level=getattr(logging, config.log_level),
        format=config.log_format
    )

# Create storage backends
storage_input = create_etl_storage_input(config)
storage_output = create_etl_storage_output(config)

logger.info(f"Storage Input:  {storage_input.backend_type} @ {storage_input.base_path}")
logger.info(f"Storage Output: {storage_output.backend_type} @ {storage_output.base_path}")


SOURCE_PATHS = [
    "processed/coinbase/market_trades/",
    "processed/coinbase/ticker/",
    "processed/coinbase/level2/",
]

for upload_path in SOURCE_PATHS:
    print(f"\nUploading files from {upload_path}...")
    files_to_upload = storage_input.list_files(upload_path, pattern="**/*.parquet")
    print(f"Found {len(files_to_upload)} files to upload.")

    def upload_file(file_info):
        """Upload a single file and delete it locally"""
        fpath = file_info['path']
        try:
            storage_output.write_file(
                local_path=storage_input.get_full_path(fpath),
                remote_path=fpath
            )
            storage_input.delete(fpath)
            return {'success': True, 'path': fpath}
        except Exception as e:
            return {'success': False, 'path': fpath, 'error': str(e)}

    # Parallelize uploads with thread pool
    max_workers = 10  # Adjust based on your bandwidth and system
    failed_uploads = []

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(upload_file, f): f for f in files_to_upload}
        
        for future in tqdm(as_completed(futures), total=len(files_to_upload), desc="Uploading files"):
            result = future.result()
            if not result['success']:
                failed_uploads.append(result)
                print(f"Failed to upload {result['path']}: {result['error']}")

    if failed_uploads:
        print(f"\nFailed to upload {len(failed_uploads)} files")
    else:
        print(f"\nSuccessfully uploaded all {len(files_to_upload)} files")

## ETL

In [14]:
import logging
import sys
from pathlib import Path
from datetime import datetime, timedelta
import os

from config import load_config
from storage.factory import (
    create_etl_storage_input,
    create_etl_storage_output,
    get_etl_input_path,
    get_etl_output_path,
    get_processing_path
)
from etl.job import ETLJob

logger = logging.getLogger(__name__)

config = load_config("config/config.yaml")

logging.basicConfig(
        level=getattr(logging, config.log_level),
        format=config.log_format
    )

In [15]:
# Create storage backends
storage_input = create_etl_storage_input(config)
storage_output = create_etl_storage_output(config)

# Get paths from config
input_path = get_etl_input_path(config, "coinbase")
output_path = get_etl_output_path(config, "coinbase")
processing_path = get_processing_path(config, "coinbase")

logger.info(f"Storage Input:  {storage_input.backend_type} @ {storage_input.base_path}")
logger.info(f"Storage Output: {storage_output.backend_type} @ {storage_output.base_path}")
logger.info(f"Input path: {input_path}")
logger.info(f"Output path: {output_path}")
logger.info(f"Processing path: {processing_path}")

2025-12-04 10:56:44,371 - storage.factory - INFO - [etl_input] Initializing local storage: F:/
2025-12-04 10:56:44,373 - storage.factory - INFO - [etl_output] Initializing S3 storage: market-data-vault
2025-12-04 10:56:44,455 - __main__ - INFO - Storage Input:  local @ F:/
2025-12-04 10:56:44,457 - __main__ - INFO - Storage Output: s3 @ market-data-vault
2025-12-04 10:56:44,457 - __main__ - INFO - Input path: raw/ready/coinbase
2025-12-04 10:56:44,460 - __main__ - INFO - Output path: processed/coinbase
2025-12-04 10:56:44,461 - __main__ - INFO - Processing path: raw/processing/coinbase


In [16]:
channel_config = None
if hasattr(config.etl, 'channels') and config.etl.channels:
    channel_config = {
        channel_name: {
            "partition_cols": channel_cfg.partition_cols,
            "processor_options": channel_cfg.processor_options,
        }
        for channel_name, channel_cfg in config.etl.channels.items()
        if channel_cfg.enabled
    }
channel_config

{'level2': {'partition_cols': ['product_id', 'date'],
  'processor_options': {'reconstruct_lob': False,
   'compute_features': False,
   'add_derived_fields': True}},
 'market_trades': {'partition_cols': ['product_id', 'date'],
  'processor_options': {'add_derived_fields': True, 'infer_aggressor': False}},
 'ticker': {'partition_cols': ['product_id', 'date'],
  'processor_options': {'add_derived_fields': True}}}

In [17]:
job = ETLJob(
    storage_input=storage_input,
    storage_output=storage_output,
    input_path=input_path,
    output_path=output_path,
    delete_after_processing=config.etl.delete_after_processing,
    processing_path=processing_path,
    channel_config=channel_config,
    )

2025-12-04 10:57:14,545 - etl.processors.coinbase.level2_processor - INFO - [CoinbaseLevel2Processor] Initialized: reconstruct_lob=False, compute_features=False
2025-12-04 10:57:14,546 - etl.processors.raw_parser - INFO - [RawParser] Initialized for source=coinbase, channel_filter=level2
2025-12-04 10:57:14,546 - etl.writers.parquet_writer - INFO - [ParquetWriter] Initialized: storage=s3, compression=snappy
2025-12-04 10:57:14,548 - etl.orchestrators.pipeline - INFO - [ETLPipeline] Initialized: reader=NDJSONReader, processors=RawParser → Level2Processor, writer=ParquetWriter
2025-12-04 10:57:14,548 - etl.processors.coinbase.trades_processor - INFO - [CoinbaseTradesProcessor] Initialized: add_derived_fields=True, infer_aggressor=False
2025-12-04 10:57:14,550 - etl.processors.raw_parser - INFO - [RawParser] Initialized for source=coinbase, channel_filter=market_trades
2025-12-04 10:57:14,551 - etl.writers.parquet_writer - INFO - [ParquetWriter] Initialized: storage=s3, compression=snappy

In [18]:
job.process_all()

2025-12-04 10:58:46,494 - etl.job - INFO - [ETLJob] Scanning for segments in raw/ready/coinbase
2025-12-04 10:58:46,496 - etl.job - INFO - [ETLJob] Found 3 segment(s) to process
2025-12-04 10:58:46,509 - etl.job - INFO - [ETLJob] Processing segment: segment_20251204T18_00011.ndjson
2025-12-04 10:58:46,510 - etl.orchestrators.coinbase_segment_pipeline - INFO - [CoinbaseSegmentPipeline] Processing segment: segment_20251204T18_00011.ndjson
2025-12-04 10:58:46,510 - etl.orchestrators.pipeline - INFO - [ETLPipeline] Executing: F:\raw\processing\coinbase\segment_20251204T18_00011.ndjson → processed/coinbase/level2 (partition_cols=['product_id', 'date'])
2025-12-04 10:58:56,576 - etl.readers.ndjson_reader - INFO - [NDJSONReader] Read 107200 records from segment_20251204T18_00011.ndjson (0 errors)
2025-12-04 10:59:00,366 - etl.writers.parquet_writer - INFO - [ParquetWriter] Wrote 10755 records to processed/coinbase/level2/product_id=ADA-USD/date=2025-12-04/part_20251204T10_114a4663.parquet (91