> Reminder: Use the ```rapids-sk``` kernel to run this notebook  

# Accelerate ML with GPUs

This notebook demonstrates 20-50× speedups by migrating CPU workflows (pandas, scikit-learn) to GPU (cuDF, cuML) on Avazu CTR dataset with minimal code changes.

**Objectives:**
- Compare CPU vs GPU performance on ETL and ML tasks
- Measure speedups for read, ETL, fit, predict stages
- Verify model parity (AUC/logloss within ±0.5%)
- Demonstrate minimal migration effort (≤5 lines changed)

## Setup and Configuration

In [None]:
import os
import pyarrow as pa
import pyarrow.parquet as pq
import numpy as np
import zipfile
import gzip

import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_auc_score

import cudf
_ = cudf.Series([1,2,3]).sum() # warmup
from cudf.api.types import is_string_dtype
from cuml.ensemble import RandomForestClassifier as cuRF
from cuml.metrics import roc_auc_score as cuml_roc_auc_score
from cuml.model_selection import train_test_split as cuml_train_test_split

import cupy as cp
import rmm.mr as mr
from rmm.allocators.cupy import rmm_cupy_allocator
from cuml import set_global_output_type

from utils.timing import set_cpu_threads, run_timed

# Set reproducible seed
np.random.seed(123)

# Configure CPU threads for fair comparison
set_cpu_threads(8)

In [None]:
# Use CUDA async allocator (fast, no manual pool sizing)
mr.set_current_device_resource(mr.CudaAsyncMemoryResource())
cp.cuda.set_allocator(rmm_cupy_allocator)
set_global_output_type("cudf")

# Disable RAPIDS initialization
os.environ.setdefault("RAPIDS_NO_INITIALIZE", "1")

# Data Setup

In [None]:
USE_SAMPLE = True
extract_dir = os.path.join(os.getcwd(), "data", "avazu")  # Current directory is classification/
parquet_path = os.path.join(os.getcwd(), "data", "avazu", "avazu_train.parquet")

os.makedirs(extract_dir, exist_ok=True)

if USE_SAMPLE:
    FILE = os.path.join(os.getcwd(), "data", "avazu-ctr-50k.zip")  # File is in classification/data/
    with zipfile.ZipFile(FILE, 'r') as zip_ref:
        csv_file = [f for f in zip_ref.namelist() if f.endswith('.csv')][0]
        zip_ref.extract(csv_file, extract_dir)
        extracted_csv = os.path.join(extract_dir, csv_file)
    print(f"Extracted SAMPLE data - {os.path.getsize(extracted_csv) / (1024 ** 3):.2f} GB")
else:
    FILE = os.path.join(os.getcwd(), "data", "avazu-ctr.gz")  # File is in classification/data/
    extracted_csv = os.path.join(extract_dir, "avazu-ctr.csv")
    with gzip.open(FILE, 'rb') as f_in, open(extracted_csv, 'wb') as f_out:
        while True:
            chunk = f_in.read(1024 * 1024)
            if not chunk:
                break
            f_out.write(chunk)
    print(f"Extracted FULL data - {os.path.getsize(extracted_csv) / (1024 ** 3):.2f} GB")

def csv_to_parquet(src_csv, dst_parquet, chunksize=500_000):
    writer = None
    for chunk in pd.read_csv(src_csv, chunksize=chunksize):
        table = pa.Table.from_pandas(chunk, preserve_index=False)
        if writer is None:
            writer = pq.ParquetWriter(dst_parquet, table.schema)
        writer.write_table(table)
    if writer:
        writer.close()

csv_to_parquet(extracted_csv, parquet_path)
print(f"Converted CSV to Parquet - {parquet_path}")
print(f"Shape: {pd.read_parquet(parquet_path).shape}")
os.remove(extracted_csv)

## CPU Pipeline - pandas + scikit-learn

In [None]:
def cpu_pipeline():
    """ETL pipeline using CPU (pandas) with timing."""
    # 1. Load data
    df_cpu, load_time = run_timed(
        "CPU: Load Parquet with pandas",
        lambda: pd.read_parquet(parquet_path),
        use_gpu=False
    )

    # 2. Basic preprocessing
    df_processed, preprocess_time = run_timed(
        "CPU: Preprocess data (handle missing, encode categories)",
        lambda: preprocess_avazu_data(df_cpu),
        use_gpu=False
    )

    # 3. Feature engineering
    (X_cpu, y_cpu), features_time = run_timed(
        "CPU: Feature engineering",
        lambda: engineer_features(df_processed),
        use_gpu=False
    )

    # 4. Train/test split
    (X_train_cpu, X_test_cpu, y_train_cpu, y_test_cpu), split_time = run_timed(
        "CPU: Train/test split",
        lambda: train_test_split(X_cpu, y_cpu, test_size=0.2, random_state=42),
        use_gpu=False
    )

    # 5. Fit Random Forest Classifier
    rf_model, rf_time = run_timed(
        "CPU: Fit Random Forest",
        lambda: RandomForestClassifier(
            n_estimators=100,
            max_depth=16,            # align with GPU
            max_features="sqrt",     # align with GPU
            bootstrap=True,          # align with GPU
            criterion="gini",        # explicit; matches cuML default
            random_state=42,
            n_jobs=-1                # CPU parallelism; no effect on model definition
        ).fit(X_train_cpu, y_train_cpu),
        use_gpu=False
    )

    # 6. Predict probabilities
    y_pred_cpu, pred_time = run_timed(
        "CPU: Predict probabilities",
        lambda: rf_model.predict_proba(X_test_cpu)[:, 1],
        use_gpu=False
    )

    # 7. Calculate AUC
    auc_cpu, auc_time = run_timed(
        "CPU: Calculate AUC",
        lambda: roc_auc_score(y_test_cpu, y_pred_cpu),
        use_gpu=False
    )

    return {
        'X_train': X_train_cpu,
        'X_test': X_test_cpu,
        'y_train': y_train_cpu,
        'y_test': y_test_cpu,
        'model': rf_model,
        'predictions': y_pred_cpu,
        'auc': auc_cpu,
        'times': {
            'load': load_time,
            'preprocess': preprocess_time,
            'features': features_time,
            'split': split_time,
            'fit': rf_time,
            'predict': pred_time,
            'auc': auc_time,
            'total': load_time + preprocess_time + features_time + split_time + rf_time + pred_time + auc_time
        }
    }

# Helper functions for preprocessing
def preprocess_avazu_data(df):
    """Basic preprocessing for Avazu dataset."""
    df_clean = df.copy()
    df_clean = df_clean.fillna('missing')
    categorical_cols = [col for col in df_clean.columns if col not in ['click', 'id']]
    for col in categorical_cols:
        if df_clean[col].dtype == 'object':
            df_clean[col] = pd.Categorical(df_clean[col]).codes
    return df_clean

def engineer_features(df):
    """Basic feature engineering."""
    feature_cols = [col for col in df.columns if col not in ['click', 'id']]
    X = df[feature_cols]
    y = df['click'] if 'click' in df.columns else df.iloc[:, 0]
    return X, y

print("🖥️  Running CPU Pipeline...")
cpu_results = cpu_pipeline()

## GPU Pipeline - cuDF + cuML

In [None]:
def gpu_pipeline():
    """ML pipeline using GPU (cuDF/cuML) with timing."""
    # 1) Load data — skip 'id' at read
    df_gpu, load_time = run_timed(
        "GPU: Load Parquet with cuDF (selected columns)",
        lambda: read_parquet_gpu_selected(parquet_path),
        use_gpu=True
    )

    # 2) Preprocess
    df_processed, preprocess_time = run_timed(
        "GPU: Preprocess data (missing, encode categories)",
        lambda: preprocess_avazu_data_gpu(df_gpu),
        use_gpu=True
    )

    # 3) Features
    (X_gpu, y_gpu), features_time = run_timed(
        "GPU: Feature engineering",
        lambda: engineer_features_gpu(df_processed),
        use_gpu=True
    )

    # 4) Split on device
    (X_train_gpu, X_test_gpu, y_train_gpu, y_test_gpu), split_time = run_timed(
        "GPU: Train/test split",
        lambda: cuml_train_test_split(X_gpu, y_gpu, test_size=0.2, random_state=42),
        use_gpu=True
    )

    # 5) Fit RF — params tuned for T4 throughput; adjust for parity vs speed
    rf_params = dict(
        n_estimators=100,        # raise for accuracy, lower for speed
        max_depth=16,            # T4 sweet spot; 12 is faster, may reduce AUC
        max_features="sqrt",     # parity with sklearn default behavior
        bootstrap=True,          # set False for extra speed (changes model)
        n_bins=64,               # <=128 speeds split finding
        n_streams=8,             # T4: 4–8; default is good, 8 saturates
        random_state=42,
    )
    rf_model, rf_time = run_timed(
        "GPU: Fit Random Forest",
        lambda: cuRF(**rf_params).fit(X_train_gpu, y_train_gpu),
        use_gpu=True
    )

    # 6) Predict probabilities — stay on device
    def predict_gpu():
        proba = rf_model.predict_proba(X_test_gpu)
        # cuML returns cudf.DataFrame for cudf input
        return proba.iloc[:, 1] if hasattr(proba, "iloc") else proba[:, 1]

    y_pred_gpu, pred_time = run_timed(
        "GPU: Predict probabilities",
        predict_gpu,
        use_gpu=True
    )

    # 7) AUC fully on GPU (no to_pandas)
    auc_gpu, auc_time = run_timed(
        "GPU: Calculate AUC",
        lambda: cuml_roc_auc_score(y_test_gpu, y_pred_gpu),
        use_gpu=True
    )

    return {
        "X_train": X_train_gpu,
        "X_test": X_test_gpu,
        "y_train": y_train_gpu,
        "y_test": y_test_gpu,
        "model": rf_model,
        "predictions": y_pred_gpu,
        "auc": auc_gpu,
        "times": {
            "load": load_time,
            "preprocess": preprocess_time,
            "features": features_time,
            "split": split_time,
            "fit": rf_time,
            "predict": pred_time,
            "auc": auc_time,
            "total": load_time + preprocess_time + features_time + split_time + rf_time + pred_time + auc_time,
        },
    }

# Helper functions for GPU preprocessing
def read_parquet_gpu_selected(path):
    # Avoid reading unused columns to cut IO and device memory
    schema = pq.read_schema(path)
    cols = [name for name in schema.names if name != "id"]
    return cudf.read_parquet(path, columns=cols)

def preprocess_avazu_data_gpu(df):
    """Basic preprocessing for Avazu dataset using cuDF."""
    df = df.copy(deep=False)
    df = df.fillna("missing")
    # Encode string columns to int32 codes on device
    str_cols = [c for c in df.columns if c != "click" and is_string_dtype(df[c])]
    for c in str_cols:
        df[c] = cudf.factorize(df[c])[0].astype("int32")
    return df

def engineer_features_gpu(df):
    """Basic feature engineering using cuDF."""
    feature_cols = [col for col in df.columns if col not in ["click", "id"]]
    X = df[feature_cols]  # keep int32; cuML handles it
    y = df["click"] if "click" in df.columns else df.iloc[:, 0]
    return X, y

print("🚀 Running GPU Pipeline...")
gpu_results = gpu_pipeline()

## Performance Comparison and Analysis

In [None]:
# Comparison: AUC (higher is better), times (lower is better)
metrics = [
    "AUC",
    "Preprocess Time (s)",
    "Feature Time (s)",
    "Fit Time (s)",
    "Predict Time (s)",
    "AUC Time (s)",
    "Total Time (s)",
]
cpu_vals = [
    cpu_results["auc"],
    cpu_results["times"]["preprocess"],
    cpu_results["times"]["features"],
    cpu_results["times"]["fit"],
    cpu_results["times"]["predict"],
    cpu_results["times"]["auc"],
    cpu_results["times"]["total"],
]
gpu_vals = [
    gpu_results["auc"],
    gpu_results["times"]["preprocess"],
    gpu_results["times"]["features"],
    gpu_results["times"]["fit"],
    gpu_results["times"]["predict"],
    gpu_results["times"]["auc"],
    gpu_results["times"]["total"],
]

import numpy as np
rows = []
for metric, cpu, gpu in zip(metrics, cpu_vals, gpu_vals):
    if metric == "AUC":
        # Accuracy metric: higher is better
        diff = (gpu - cpu) if cpu is not None else np.nan
        pct_change = ((gpu - cpu) / abs(cpu) * 100) if isinstance(cpu, (int, float, np.floating)) and cpu not in (0, None) else np.nan
        speedup = np.nan  # not meaningful for accuracy
    else:
        # Time metrics: lower is better
        diff = (cpu - gpu) if None not in (cpu, gpu) else np.nan
        speedup = (cpu / gpu) if all(isinstance(x, (int, float, np.floating)) for x in (cpu, gpu)) and gpu not in (0, None) else np.nan
        pct_change = ((1 - (gpu / cpu)) * 100) if isinstance(cpu, (int, float, np.floating)) and cpu not in (0, None) else np.nan
    rows.append({
        "Metric": metric,
        "CPU": cpu,
        "GPU": gpu,
        "Diff": diff,
        "Speedup (CPU/GPU)": speedup,
        "% Change/Improvement": pct_change
    })

results_table = pd.DataFrame(rows)

# Formatting: round to two decimals; add % sign to percent column
def fmt_float(v):
    try:
        if v is None or (isinstance(v, float) and (np.isnan(v) or np.isinf(v))):
            return ""
        return f"{v:.2f}"
    except Exception:
        return ""

def fmt_pct(v):
    try:
        if v is None or (isinstance(v, float) and np.isnan(v)):
            return ""
        return f"{v:.2f}%"
    except Exception:
        return ""

display_table = results_table.copy()
for col in ["CPU", "GPU", "Diff", "Speedup (CPU/GPU)"]:
    display_table[col] = display_table[col].apply(fmt_float)
display_table["% Change/Improvement"] = display_table["% Change/Improvement"].apply(fmt_pct)

print(display_table.to_string(index=False))