In [None]:
# --- Parquet downsampler: keep ~10% rows, write ZSTD-compressed output --- #
# 1) pip install polars
# 2) Edit INPUT_FILES if needed, then run.

from pathlib import Path
import os, math, time
from typing import Sequence, Optional

try:
    import polars as pl
except ImportError as e:
    raise SystemExit(
        "Polars is required. Install via `pip install polars` and re-run this cell."
    ) from e

# --------------------------- CONFIG --------------------------------------- #
# Files to process (edit this to your paths)
INPUT_FILES: Sequence[str] = [
    "",
    "",
]

KEEP_FRACTION: float = 0.10     # keep ~10% of rows (i.e., remove ~90%)
SEED: int = 42                  # for reproducible sampling
COMPRESSION: str = "zstd"       # 'zstd' for best compression, 'snappy' is faster but larger
COMPRESSION_LEVEL: int = 7      # 1-22 for zstd; 5-9 are good tradeoffs
WRITE_STATISTICS: bool = True   # write column stats into Parquet metadata
# Optional: preserve class balance by sampling within groups (e.g., "VendorID")
STRATIFY_BY: Optional[str] = None    # e.g., "VendorID" or None
# ------------------------------------------------------------------------- #

def _fmt_mb(bytes_: int) -> float:
    return round(bytes_ / (1024 * 1024), 2)

def _out_path(p: Path, keep_fraction: float, compression: str) -> Path:
    stem = p.stem
    parent = p.parent
    suffix = p.suffix  # .parquet
    pct = int(keep_fraction * 100)
    return parent / f"{stem}_sample{pct}_{compression}.parquet"

def downsample_parquet(
    in_path: str,
    keep_fraction: float = KEEP_FRACTION,
    seed: int = SEED,
    compression: str = COMPRESSION,
    compression_level: int = COMPRESSION_LEVEL,
    write_statistics: bool = WRITE_STATISTICS,
    stratify_by: Optional[str] = STRATIFY_BY,
):
    p = Path(in_path)
    if not p.exists():
        return {"file": p.name, "status": "ERROR", "message": "File not found."}

    start = time.time()
    orig_size = p.stat().st_size

    # Read entire file (quick for typical monthly NYC Taxi parquet files)
    df = pl.read_parquet(str(p))
    original_rows = df.height
    if original_rows == 0:
        out_path = _out_path(p, keep_fraction, compression)
        # write empty frame preserving schema
        df.write_parquet(str(out_path), compression=compression, compression_level=compression_level, statistics=write_statistics)
        new_size = Path(out_path).stat().st_size
        return {
            "file": p.name,
            "original_rows": 0,
            "new_rows": 0,
            "kept_fraction": 0.0,
            "original_size_mb": _fmt_mb(orig_size),
            "new_size_mb": _fmt_mb(new_size),
            "size_reduction_pct": 0.0,
            "output_file": str(out_path),
            "status": "OK",
            "elapsed_s": round(time.time() - start, 2),
        }

    # Sample ~10% without replacement, reproducible
    if stratify_by and stratify_by in df.columns:
        df_small = df.sample(
            fraction=keep_fraction,
            with_replacement=False,
            shuffle=True,
            seed=seed,
            stratify_by=stratify_by,
        )
    else:
        df_small = df.sample(
            fraction=keep_fraction,
            with_replacement=False,
            shuffle=True,
            seed=seed,
        )

    new_rows = df_small.height
    out_path = _out_path(p, keep_fraction, compression)

    # Write with ZSTD compression
    df_small.write_parquet(
        str(out_path),
        compression=compression,
        compression_level=compression_level,
        statistics=write_statistics,
    )
    new_size = Path(out_path).stat().st_size
    reduction_pct = 100.0 * (orig_size - new_size) / orig_size if orig_size > 0 else 0.0

    return {
        "file": p.name,
        "original_rows": original_rows,
        "new_rows": new_rows,
        "kept_fraction": round(new_rows / original_rows, 4),
        "original_size_mb": _fmt_mb(orig_size),
        "new_size_mb": _fmt_mb(new_size),
        "size_reduction_pct": round(reduction_pct, 2),
        "output_file": str(out_path),
        "status": "OK",
        "elapsed_s": round(time.time() - start, 2),
    }

# --------------------------- RUN ------------------------------------------ #
results = []
for f in INPUT_FILES:
    try:
        res = downsample_parquet(f)
    except Exception as e:
        res = {"file": f, "status": "ERROR", "message": str(e)}
    results.append(res)

# Pretty print a compact summary
try:
    import pandas as pd
    display(pd.DataFrame(results))
except Exception:
    # fallback plain print
    for r in results:
        print(r)

# Notes:
# - To target a *size* reduction precisely (e.g., 90% smaller file bytes),
#   start with KEEP_FRACTION=0.10. If the output is still too big, drop to 0.08 or 0.05,
#   then re-run. Size depends on column entropy and compression.
# - For exact class balancing, set STRATIFY_BY="your_column".
