# Combine local dagster partitions

For experiment purposes, we can use dagster to fetch data locally. However, this may sometimes fail for large datasets. This notebook aims to collect the scraps to form a complete dataset.


In [None]:
from glob import glob

import polars as pl

from homelab_pipelines.utils.paths import Paths

In [None]:
bybit_symbols = pl.read_csv(
    Paths.defs_data / "bybit_symbols.csv",
    schema_overrides={"launch_time": pl.Datetime("ns", "UTC")},
)
bybit_symbols

In [None]:
def get_dataset_for_symbol(symbol: str) -> pl.DataFrame:
    weekly_dir = Paths.repo_root / "output" / "raw_bybit_prices_15min_weekly"
    recent_dir = Paths.repo_root / "output" / "raw_bybit_prices_15min_recent"

    result = pl.concat(
        [
            pl.read_parquet(weekly_dir / symbol / p)
            for p in glob(
                "*.parquet",
                root_dir=weekly_dir / symbol,
            )
        ]
    )
    print(f"Loaded weekly partitions into dataframe of shape {result.shape}")

    try:
        recent_data = pl.read_parquet(recent_dir / f"{symbol}.parquet").filter(
            pl.col("start_time_utc") > result.get_column("start_time_utc").max()
        )
        print(f"Adding {len(recent_data)} rows of recent data")
        result = pl.concat([result, recent_data])
    except FileNotFoundError as err:
        print("Could not find recent data")

    return result


def dataset_has_gaps(df: pl.DataFrame) -> bool:
    temp = df.get_column("start_time_utc").sort().diff(1, null_behavior="drop").unique()
    return len(temp) > 1

In [None]:
for row in bybit_symbols.iter_rows(named=True):
    symbol = row["symbol"]
    print()
    print(f"Processing {symbol}...")
    result = get_dataset_for_symbol(symbol)

    if dataset_has_gaps(result):
        print("Dataset has gaps, skipping this symbol")
        continue

    result.write_parquet(Paths.repo_root / "data" / f"{symbol}.parquet")
    print(f"Written dataset of shape {result.shape}")