# Interactive Data Science with xorq Caching

This notebook demonstrates how xorq's input-addressed caching accelerates iterative
data science workflows. We build an ML pipeline on the bank marketing dataset and
show what happens when you tweak configurations between runs:

- **Without caching**: every change rebuilds the entire pipeline from scratch
- **With caching**: only the stages whose inputs changed are recomputed

Run each cell in order and watch the timings.

In [1]:
import shutil
import time
from pathlib import Path

from sklearn.compose import ColumnTransformer
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.impute import SimpleImputer
from sklearn.metrics import accuracy_score, roc_auc_score
from sklearn.neighbors import KNeighborsClassifier
from sklearn.pipeline import Pipeline as SklearnPipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler

import xorq.api as xo
from xorq.caching import ParquetCache
from xorq.common.utils.defer_utils import deferred_read_csv
from xorq.expr.ml import train_test_splits
from xorq.expr.ml.pipeline_lib import Pipeline

CACHE_DIR = Path("interactive-notebook-cache")
TARGET = "deposit"

# Clean slate
if CACHE_DIR.exists():
    shutil.rmtree(CACHE_DIR)
CACHE_DIR.mkdir(exist_ok=True)

# Persistent connection and cache — shared across all cells
con = xo.connect()
cache = ParquetCache.from_kwargs(
    source=con,
    relative_path=str(CACHE_DIR),
    base_path=Path(".").absolute(),
)

timings = []

In [2]:
def make_preprocessor(numeric_features, categorical_features):
    return ColumnTransformer([
        ("num", SklearnPipeline([
            ("imputer", SimpleImputer(strategy="median")),
            ("scaler", StandardScaler()),
        ]), numeric_features),
        ("cat", SklearnPipeline([
            ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
            ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False)),
        ]), categorical_features),
    ])


def build_pipeline(classifier, numeric_features, categorical_features, train, test):
    """Build a cached prediction expression for the given config."""
    all_features = numeric_features + categorical_features
    preprocessor = make_preprocessor(numeric_features, categorical_features)
    sklearn_pipeline = SklearnPipeline([
        ("preprocessor", preprocessor),
        ("classifier", classifier),
    ])
    xorq_pipeline = Pipeline.from_instance(sklearn_pipeline)
    fitted = xorq_pipeline.fit(
        train, features=tuple(all_features), target=TARGET, cache=cache
    )
    return fitted.predict(test).cache(cache=cache)


def execute_and_report(label, expr):
    """Execute an expression, print timing and metrics."""
    t0 = time.perf_counter()
    df = expr.execute()
    elapsed = time.perf_counter() - t0

    y_true, y_pred = df[TARGET], df["predicted"]
    acc = accuracy_score(y_true, y_pred)
    auc = roc_auc_score(y_true, y_pred)
    timings.append((label, elapsed))

    cached = "  CACHED" if elapsed < 0.5 else ""
    print(f"  {elapsed:5.2f}s{cached}   Accuracy: {acc:.4f}   ROC AUC: {auc:.4f}")
    return df

## Pre-defined configurations

All classifiers and feature sets are ready to use. Each cell below
picks one combination.

In [3]:
# --- Classifiers ---
gb_clf   = GradientBoostingClassifier(n_estimators=200, random_state=42)
rf_clf   = RandomForestClassifier(n_estimators=200, random_state=42)
knn_clf  = KNeighborsClassifier(n_neighbors=5)

# --- Feature sets ---
num_full  = ["age", "balance", "day", "duration", "campaign", "pdays", "previous"]
cat_full  = ["job", "marital", "education", "default", "housing", "loan", "contact", "month", "poutcome"]

num_small = ["age", "balance", "duration"]
cat_small = ["job", "marital", "education"]

---
## Load data (cached)

Read the CSV and encode the target column. This expression is cached so
every pipeline below shares the same materialized data.

In [4]:
data_expr = (
    deferred_read_csv(path=xo.options.pins.get_path("bank-marketing"), con=con)
    .mutate(**{TARGET: (xo._[TARGET] == "yes").cast("int")})
    .cache(cache=cache)
)

train, test = data_expr.pipe(
    train_test_splits, test_sizes=[0.7, 0.3], random_seed=42
)

print(f"  Data ready: {data_expr.count().execute()} rows")

  Data ready: 11162 rows


---
## Step 1: First run — GradientBoosting + full features (cold cache)

Everything runs from scratch: data loading, preprocessing, model training, prediction.

In [5]:
gb_full = build_pipeline(gb_clf, num_full, cat_full, train, test)
_ = execute_and_report("1. Cold cache (GB full)", gb_full)

   2.17s   Accuracy: 0.8503   ROC AUC: 0.8508


---
## Step 2: Re-run the same pipeline — warm cache

Same expression, same data. xorq recognizes the identical input hash
and returns the cached Parquet result.

In [6]:
_ = execute_and_report("2. Warm cache (GB full)", gb_full)

   0.12s  CACHED   Accuracy: 0.8503   ROC AUC: 0.8508


---
## Step 3: The old way — rebuild WITHOUT caching

What does every iteration look like without xorq? We build the same
pipeline but skip the `.cache()` call — everything recomputes.

In [7]:
# Build WITHOUT cache for comparison
data_nocache = (
    deferred_read_csv(path=xo.options.pins.get_path("bank-marketing"), con=con)
    .mutate(**{TARGET: (xo._[TARGET] == "yes").cast("int")})
)
train_nc, test_nc = data_nocache.pipe(
    train_test_splits, test_sizes=[0.7, 0.3], random_seed=42
)
all_f = num_full + cat_full
preprocessor = make_preprocessor(num_full, cat_full)
sk = SklearnPipeline([("preprocessor", preprocessor), ("classifier", gb_clf)])
p = Pipeline.from_instance(sk)
f = p.fit(train_nc, features=tuple(all_f), target=TARGET)  # no cache!
pred_nc = f.predict(test_nc)  # no cache!

_ = execute_and_report("3. No cache (GB full)", pred_nc)

   1.96s   Accuracy: 0.8503   ROC AUC: 0.8508


---
## Step 4: Swap classifier — RandomForest

Change the classifier from GradientBoosting to RandomForest.
The data loading is still cached from earlier, so only the model
fitting and prediction run from scratch.

In [8]:
rf_full = build_pipeline(rf_clf, num_full, cat_full, train, test)
_ = execute_and_report("4. New clf (RF full)", rf_full)

   1.67s   Accuracy: 0.8506   ROC AUC: 0.8515


---
## Step 5: Re-run RandomForest — cached

In [9]:
_ = execute_and_report("5. Warm cache (RF full)", rf_full)

   0.12s  CACHED   Accuracy: 0.8506   ROC AUC: 0.8515


---
## Step 6: Try KNN

KNN trains much faster than GradientBoosting. With the data already
cached, the total time drops significantly.

In [10]:
knn_full = build_pipeline(knn_clf, num_full, cat_full, train, test)
_ = execute_and_report("6. New clf (KNN full)", knn_full)

   0.77s   Accuracy: 0.8017   ROC AUC: 0.8000


---
## Step 7: Shrink the feature set

Use 3 numeric + 3 categorical features instead of 7 + 9.
The data loading expression is unchanged, so that cache entry is reused.

In [11]:
rf_small = build_pipeline(rf_clf, num_small, cat_small, train, test)
_ = execute_and_report("7. Fewer features (RF small)", rf_small)

   1.42s   Accuracy: 0.7556   ROC AUC: 0.7555


---
## Step 8: Re-run the small-feature pipeline — cached

In [12]:
_ = execute_and_report("8. Warm cache (RF small)", rf_small)

   0.11s  CACHED   Accuracy: 0.7556   ROC AUC: 0.7555


---
## Step 9: Go back to GradientBoosting — still cached from Step 1!

In [13]:
_ = execute_and_report("9. Revisit GB (still cached)", gb_full)

   0.12s  CACHED   Accuracy: 0.8503   ROC AUC: 0.8508


---
## Summary

In [14]:
print(f"{'Step':<40} {'Time':>7}")
print("=" * 49)
for label, t in timings:
    bar = '\u2588' * int(t * 20)
    tag = " CACHED" if t < 0.5 else ""
    print(f"{label:<40} {t:>5.2f}s  {bar}{tag}")
print()
cold = timings[0][1]
warm = timings[1][1]
print(f"Speedup (identical re-run): {cold / warm:.0f}x")

Step                                        Time
1. Cold cache (GB full)                   2.17s  ███████████████████████████████████████████
2. Warm cache (GB full)                   0.12s  ██ CACHED
3. No cache (GB full)                     1.96s  ███████████████████████████████████████
4. New clf (RF full)                      1.67s  █████████████████████████████████
5. Warm cache (RF full)                   0.12s  ██ CACHED
6. New clf (KNN full)                     0.77s  ███████████████
7. Fewer features (RF small)              1.42s  ████████████████████████████
8. Warm cache (RF small)                  0.11s  ██ CACHED
9. Revisit GB (still cached)              0.12s  ██ CACHED

Speedup (identical re-run): 18x


---
## How it works

xorq's `ParquetCache` is **input-addressed**: the cache key is a deterministic
hash of the expression graph. Two expressions with identical logic and identical
upstream data always produce the same hash — and hit the same cache entry.

```python
# This .cache() call creates a cache boundary.
# Its hash captures the CSV path, the .mutate() logic, and all upstream deps.
data = deferred_read_csv(...).mutate(...).cache(cache=cache)

# The final prediction is also cached.
predicted = fitted.predict(test).cache(cache=cache)

# First .execute() computes and caches. Second .execute() loads from Parquet.
predicted.execute()  # slow
predicted.execute()  # instant
```

When you change a downstream step (like the classifier), all upstream cached
stages are unaffected. When you change an upstream step (like adding a new
feature to the data expression), its hash changes and it recomputes —
along with everything downstream.

This is the same principle behind build systems like Make and Bazel:
**only rebuild what changed.**

In [15]:
if CACHE_DIR.exists():
    shutil.rmtree(CACHE_DIR)
    print("Cache cleaned up.")

Cache cleaned up.
