In [21]:
! pip install polars zarr



In [22]:
"""
Phase 1: ETL per-symbol (CSV -> Parquet + optional Zarr)
- Chuẩn hoá cột, kiểu dữ liệu
- Kiểm tra gaps/duplicates/NaN
- Ghi Parquet "master" (kiểm toán)
- (Tuỳ chọn) Ghi Zarr cache per-symbol: [time, feature] float32
"""

import argparse, json
from pathlib import Path
import polars as pl
import numpy as np
import zarr

COLS_RAW = [
    "open_time","open","high","low","close","volume","close_time",
    "quote_asset_volume","number_of_trades",
    "taker_buy_base_asset_volume","taker_buy_quote_asset_volume","ignore"
]

COLS_STD = [
    "open_time","Open","High","Low","Close","Volume",
    "QuoteVolume","Trades","TakerBuyBase","TakerBuyQuote"
]

In [23]:

def read_csv_standardize(csv_path: Path, tz_utc=True) -> pl.DataFrame:
    df = pl.scan_csv(
        csv_path,
        try_parse_dates=True,
        dtypes={
            "open_time": pl.Utf8,  # sẽ parse tay để chắc UTC
            "open": pl.Float64, "high": pl.Float64, "low": pl.Float64, "close": pl.Float64,
            "volume": pl.Float64, "quote_asset_volume": pl.Float64,
            "number_of_trades": pl.Int64,
            "taker_buy_base_asset_volume": pl.Float64, "taker_buy_quote_asset_volume": pl.Float64,
            "close_time": pl.Utf8,  # không dùng, giữ nguyên nếu cần đối chiếu
            "ignore": pl.Utf8
        }
    ).select([
        pl.col("open_time"),
        pl.col("open").alias("Open"),
        pl.col("high").alias("High"),
        pl.col("low").alias("Low"),
        pl.col("close").alias("Close"),
        pl.col("volume").alias("Volume"),
        pl.col("quote_asset_volume").alias("QuoteVolume"),
        pl.col("number_of_trades").alias("Trades"),
        pl.col("taker_buy_base_asset_volume").alias("TakerBuyBase"),
        pl.col("taker_buy_quote_asset_volume").alias("TakerBuyQuote")
    ]).collect(streaming=True)

    # Parse open_time sang Datetime[μs] UTC
    dt = pl.col("open_time").str.to_datetime(
        time_unit="us", strict=False, time_zone="UTC"
    )
    if not tz_utc:
        # vẫn giữ UTC cho nhất quán, nhưng option hook sẵn
        pass
    df = df.with_columns(dt.alias("open_time")).drop_nulls(["open_time"])

    # Sort & de-dup
    df = df.sort("open_time").unique(subset=["open_time"], keep="first")

    return df

In [24]:

def qc_report(df: pl.DataFrame, candle_ns: int) -> dict:
    """Kiểm tra gaps/NaN/âm/khác thường."""
    # Gaps: so chênh lệch giữa open_time liên tiếp với kỳ vọng (1m = 60_000ms)
    ts = df.get_column("open_time").to_list()
    ts_i64 = np.array([int(t.timestamp()*1e9) for t in ts], dtype=np.int64)
    diffs = np.diff(ts_i64)
    gaps = int(np.sum(diffs != candle_ns))

    # NaN/âm
    nan_counts = {c: int(df.get_column(c).null_count()) for c in ["Open","High","Low","Close","Volume"]}
    neg_counts = {c: int((df.get_column(c) < 0).sum()) for c in ["Open","High","Low","Close","Volume"]}

    return {"rows": df.height, "gaps": gaps, "nan": nan_counts, "neg": neg_counts}


In [41]:
def write_parquet(df: pl.DataFrame, out_path: Path):
    out_path.parent.mkdir(parents=True, exist_ok=True)
    df.write_parquet(out_path, compression="zstd", statistics=True)

def write_zarr(
        df: pl.DataFrame, 
        out_dir: Path, 
        features=("Open","High","Low","Close","Volume", "QuoteVolume","Trades","TakerBuyBase","TakerBuyQuote")
    ):
    out_dir.mkdir(parents=True, exist_ok=True)
    
    # build array [time, feature] float32, + lưu index time ISO để map
    arr = df.select(list(features)).to_numpy().astype("float32")
    z = zarr.open(out_dir.as_posix(), mode="w")
    z.create_dataset(
        "values", 
        data=arr, 
        chunks=(max(1024, min(len(arr), 16384)), len(features))
    )
    
    # time index (ISO8601 string)
    times = np.array(
        [t.isoformat().replace("+00:00","Z") for t in df["open_time"]], 
        dtype=object
        )
    
    z.create_dataset(
        "time", 
        data=times, 
        object_codec=zarr.codecs.VLenUTF8()
        )
    
    z.create_dataset(
        "features", 
        data=np.array(features, dtype=object), 
        object_codec=zarr.codecs.VLenUTF8()
        )


In [26]:
def main(args):
    candle_ns = 60_000_000_000  # 1m
    data_dir = Path(args.data_dir)
    out_root = Path(args.out_root)

    # pick file mới nhất cho symbol (khớp pattern {SYMBOL}_1m_*.csv)
    pattern = f"{args.symbol}_{args.candle_level}_*.csv"
    files = sorted(data_dir.glob(pattern))
    if not files:
        raise FileNotFoundError(f"Không thấy file raw cho {args.symbol} ({pattern})")
    csv_path = files[-1]

    df = read_csv_standardize(csv_path)
    rep = qc_report(df, candle_ns)

    print(f"[QC] {args.symbol}: rows={rep['rows']}, gaps={rep['gaps']}, nan={rep['nan']}, neg={rep['neg']}")

    # write Parquet
    pq_path = out_root / "data" / "raw_parquet" / f"{args.symbol}.parquet"
    write_parquet(df, pq_path)

    # optional Zarr cache
    if args.write_zarr:
        zarr_dir = out_root / "data" / "zarr_per_symbol" / f"{args.symbol}.zarr"
        write_zarr(df, zarr_dir)


In [27]:
import argparse
ap = argparse.ArgumentParser()
ap.add_argument("--data-dir", required=True, help="Thư mục chứa CSV raw (Binance 1m)")
ap.add_argument("--symbol", required=True)
ap.add_argument("--out-root", default=".", help="Root để ghi data/...")
ap.add_argument("--write-zarr", action="store_true")
ap.add_argument("--candle-level", default="1m", choices=["1m"])
# args = ap.parse_args()

args = ap.parse_args([
    "--data-dir", "../../../work/data/binance/spot/1m",
    "--symbol", "BTCUSDT",
    "--out-root", "../../../work/processed/binance",
    "--write-zarr",
    "--candle-level", "1m",
])

args

Namespace(data_dir='../../../work/data/binance/spot/1m', symbol='BTCUSDT', out_root='../../../work/processed/binance', write_zarr=True, candle_level='1m')

In [28]:
main(args)

  df = pl.scan_csv(
  df = pl.scan_csv(


[QC] BTCUSDT: rows=3507322, gaps=22, nan={'Open': 0, 'High': 0, 'Low': 0, 'Close': 0, 'Volume': 0}, neg={'Open': 0, 'High': 0, 'Low': 0, 'Close': 0, 'Volume': 0}


In [42]:
from pathlib import Path
import argparse

data_dir = Path("../../../work/data/binance/spot/1m")
out_root = Path("../../../work/processed/binance")
candle_level = "1m"
write_zarr_flag = True

symbols = sorted({
    csv.name.split("_", 2)[0]
    for csv in data_dir.glob(f"*_{candle_level}_*.csv")
})

print(f"Tìm thấy {len(symbols)} symbols trong {data_dir}")
print(symbols)

print(f"Tổng {len(symbols)} symbols:", symbols[:5], "...")
for symbol in symbols:
    args = ap.parse_args([
        "--data-dir", str(data_dir),
        "--symbol", symbol,
        "--out-root", str(out_root),
        "--candle-level", candle_level,
        *(["--write-zarr"] if write_zarr_flag else [])
    ])
    print(f"\n=== Processing {symbol} ===")
    main(args)


Tìm thấy 4 symbols trong ../../../work/data/binance/spot/1m
['BNBUSDT', 'BTCUSDT', 'ETHUSDT', 'SOLUSDT']
Tổng 4 symbols: ['BNBUSDT', 'BTCUSDT', 'ETHUSDT', 'SOLUSDT'] ...

=== Processing BNBUSDT ===


  df = pl.scan_csv(
  df = pl.scan_csv(


[QC] BNBUSDT: rows=3507365, gaps=22, nan={'Open': 0, 'High': 0, 'Low': 0, 'Close': 0, 'Volume': 0}, neg={'Open': 0, 'High': 0, 'Low': 0, 'Close': 0, 'Volume': 0}

=== Processing BTCUSDT ===
[QC] BTCUSDT: rows=3507322, gaps=22, nan={'Open': 0, 'High': 0, 'Low': 0, 'Close': 0, 'Volume': 0}, neg={'Open': 0, 'High': 0, 'Low': 0, 'Close': 0, 'Volume': 0}

=== Processing ETHUSDT ===
[QC] ETHUSDT: rows=3507344, gaps=22, nan={'Open': 0, 'High': 0, 'Low': 0, 'Close': 0, 'Volume': 0}, neg={'Open': 0, 'High': 0, 'Low': 0, 'Close': 0, 'Volume': 0}

=== Processing SOLUSDT ===
[QC] SOLUSDT: rows=2662968, gaps=10, nan={'Open': 0, 'High': 0, 'Low': 0, 'Close': 0, 'Volume': 0}, neg={'Open': 0, 'High': 0, 'Low': 0, 'Close': 0, 'Volume': 0}


In [43]:
import polars as pl
import zarr
from pathlib import Path

def preview_symbol(symbol: str):
    zarr_path = Path("../../../work/processed/binance/data/zarr_per_symbol") / f"{symbol}.zarr"
    z = zarr.open(zarr_path, mode="r")

    features = z["features"][:].tolist()
    times = z["time"][:]
    values = z["values"][:]

    df_zarr = (
        pl.DataFrame(values, schema=features)
        .with_columns(pl.Series("open_time", times).str.to_datetime(time_zone="UTC"))
        .select(["open_time", *features])
    )

    print(f"\n=== {symbol} ===")
    print("Shape:", df_zarr.shape)
    print("Head:\n", df_zarr.head(3))
    print("Tail:\n", df_zarr.tail(3))
    mins = df_zarr.select(pl.all().min().name.suffix("_min"))
    maxs = df_zarr.select(pl.all().max().name.suffix("_max"))
    summary = pl.concat([mins, maxs], how="horizontal")
    print("Summary:\n", summary)



symbols = ["BNBUSDT", "BTCUSDT", "ETHUSDT", "SOLUSDT"]
for sym in symbols:
    preview_symbol(sym)



=== BNBUSDT ===
Shape: (3507365, 10)
Head:
 shape: (3, 10)
┌──────────────┬────────┬────────┬────────┬───┬──────────────┬────────┬──────────────┬─────────────┐
│ open_time    ┆ Open   ┆ High   ┆ Low    ┆ … ┆ QuoteVolume  ┆ Trades ┆ TakerBuyBase ┆ TakerBuyQuo │
│ ---          ┆ ---    ┆ ---    ┆ ---    ┆   ┆ ---          ┆ ---    ┆ ---          ┆ te          │
│ datetime[μs, ┆ f32    ┆ f32    ┆ f32    ┆   ┆ f32          ┆ f32    ┆ f32          ┆ ---         │
│ UTC]         ┆        ┆        ┆        ┆   ┆              ┆        ┆              ┆ f32         │
╞══════════════╪════════╪════════╪════════╪═══╪══════════════╪════════╪══════════════╪═════════════╡
│ 2019-01-01   ┆ 6.1139 ┆ 6.1228 ┆ 6.1119 ┆ … ┆ 6965.244141  ┆ 29.0   ┆ 691.22998    ┆ 4227.453125 │
│ 00:00:00 UTC ┆        ┆        ┆        ┆   ┆              ┆        ┆              ┆             │
│ 2019-01-01   ┆ 6.1174 ┆ 6.1218 ┆ 6.1007 ┆ … ┆ 12859.867188 ┆ 46.0   ┆ 1303.689941  ┆ 7961.539551 │
│ 00:01:00 UTC ┆        ┆      

In [29]:
from pathlib import Path
import zarr
import polars as pl

symbol = "BTCUSDT"
out_root = Path("../../../work/processed/binance")
zarr_path = out_root / "data" / "zarr_per_symbol" / f"{symbol}.zarr"

z = zarr.open(zarr_path, mode="r")
print(z.tree())                 # xem nhanh cấu trúc
values = z["values"][:]         # numpy array (n_time, n_features)
times = z["time"][:]            # ISO-8601, đuôi 'Z'
features = z["features"][:]     # tên cột


/
 ├── features (5,) object
 ├── time (3507322,) object
 └── values (3507322, 5) float32


In [31]:
df_zarr = (
    pl.DataFrame(values, schema=features.tolist())
    .with_columns(
        pl.Series("open_time", times).str.to_datetime(time_zone="UTC")
    )
    .select(["open_time", *features])
)
print(df_zarr.head())
print(df_zarr.shape)
print(df_zarr.tail())

shape: (5, 6)
┌─────────────────────────┬─────────────┬─────────────┬─────────────┬─────────────┬───────────┐
│ open_time               ┆ Open        ┆ High        ┆ Low         ┆ Close       ┆ Volume    │
│ ---                     ┆ ---         ┆ ---         ┆ ---         ┆ ---         ┆ ---       │
│ datetime[μs, UTC]       ┆ f32         ┆ f32         ┆ f32         ┆ f32         ┆ f32       │
╞═════════════════════════╪═════════════╪═════════════╪═════════════╪═════════════╪═══════════╡
│ 2019-01-01 00:00:00 UTC ┆ 3701.22998  ┆ 3703.719971 ┆ 3701.090088 ┆ 3702.459961 ┆ 17.100109 │
│ 2019-01-01 00:01:00 UTC ┆ 3702.439941 ┆ 3702.629883 ┆ 3695.659912 ┆ 3697.040039 ┆ 23.700603 │
│ 2019-01-01 00:02:00 UTC ┆ 3699.419922 ┆ 3702.040039 ┆ 3696.080078 ┆ 3698.139893 ┆ 14.488615 │
│ 2019-01-01 00:03:00 UTC ┆ 3697.48999  ┆ 3698.189941 ┆ 3695.969971 ┆ 3696.51001  ┆ 8.499966  │
│ 2019-01-01 00:04:00 UTC ┆ 3697.199951 ┆ 3697.620117 ┆ 3695.0      ┆ 3696.320068 ┆ 21.782887 │
└─────────────────────────

In [32]:
df_parquet = pl.read_parquet(
    out_root / "data" / "raw_parquet" / f"{symbol}.parquet"
).select(["open_time", *features])

# kiểm tra số dòng
print(df_parquet.shape == df_zarr.shape)

# so gần đúng do Zarr đang float32
import numpy as np
np.allclose(df_parquet.select(features).to_numpy(),
            df_zarr.select(features).to_numpy(),
            equal_nan=True, atol=1e-6)


True


True