In [None]:
import os
import uuid
import time
import psutil
import pandas as pd
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor, as_completed

from transform import transform_chunk


def _write_parquet(df: pd.DataFrame, out_dir: Path, chunk_id: int) -> Path:
    out_dir.mkdir(parents=True, exist_ok=True)
    fname = out_dir / f"chunk_{chunk_id:07d}_{uuid.uuid4().hex}.parquet"
    df.to_parquet(fname, index=False)
    return fname


def _process_and_write(chunk: pd.DataFrame, chunk_id: int, config) -> Path:
    df_t = transform_chunk(
        chunk,
        required_cols=config.REQUIRED_COLUMNS,
        dtypes=config.DTYPES,
        enable_features=config.ENABLE_FEATURES,
        drop_duplicates=config.DROP_DUPLICATES,
        drop_na_rows=config.DROP_NA_ROWS
    )
    return _write_parquet(df_t, config.PARQUET_DIR, chunk_id)


def extract_chunks(input_file: Path, chunk_size: int, dtypes=None):
    # Iterator yields dataframes per chunk
    reader = pd.read_csv(input_file, chunksize=chunk_size, dtype=dtypes, low_memory=False)
    for i, chunk in enumerate(reader):
        yield i, chunk


def load_merged(parquet_dir: Path, merged_parquet_path: Path) -> Path:
    # Merge all parquet parts into single parquet file
    parts = sorted(parquet_dir.glob("chunk_*.parquet"))
    if not parts:
        raise FileNotFoundError(f"No parquet chunks found in {parquet_dir}")
    df = pd.concat((pd.read_parquet(p) for p in parts), ignore_index=True)
    df.to_parquet(merged_parquet_path, index=False)
    return merged_parquet_path


def run_pipeline(config):
    start_time = time.perf_counter()
    os.makedirs(config.OUTPUT_DIR, exist_ok=True)

    # Collect system metrics baseline
    cpu_start = psutil.cpu_percent(interval=None)
    mem_start = psutil.virtual_memory().percent

    futures = []
    written_files = []

    with ProcessPoolExecutor(max_workers=config.MAX_WORKERS) as executor:
        for chunk_id, chunk in extract_chunks(config.INPUT_FILE, config.CHUNK_SIZE, dtypes=config.DTYPES):
            futures.append(executor.submit(_process_and_write, chunk, chunk_id, config))

        for fut in as_completed(futures):
            written_files.append(fut.result())

    merged_path = None
    if config.WRITE_MERGED_PARQUET:
        merged_path = load_merged(config.PARQUET_DIR, config.MERGED_PARQUET)

    end_time = time.perf_counter()
    cpu_end = psutil.cpu_percent(interval=None)
    mem_end = psutil.virtual_memory().percent

    metrics = {
        "chunks_written": len(written_files),
        "merged_output": str(merged_path) if merged_path else None,
        "elapsed_seconds": round(end_time - start_time, 3),
        "cpu_percent_start": cpu_start,
        "cpu_percent_end": cpu_end,
        "mem_percent_start": mem_start,
        "mem_percent_end": mem_end,
    }
    return metrics