<a href="https://colab.research.google.com/github/mesejo/public-letsql/blob/feat%2Fadd-colab-notebook/notebooks/pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In this tutorial, we'll build an end-to-end machine learning pipeline using xorq expressions to predict the number of comments a Hacker News story will receive based on its title. The pipeline fetches live data, processes text, trains a model, and makes predictions - all expressed as a single composable expression.

## Why Use Expression-Based ML Pipelines?

Traditional ML pipelines typically involve multiple disjointed steps that require manual orchestration, leading to complex codebases and difficult-to-maintain systems. An expression-based approach using xorq offers significant advantages:

- **Multi-engine optimization:** Execute different parts of your pipeline on the most appropriate backends (PostgreSQL, DuckDB, DataFusion)
- **Smart caching:** Automatically cache intermediate results to avoid redundant computations
- **Lazy evaluation:** Define the entire pipeline before execution, enabling optimization across the full workflow

Unlike traditional ETL or ML pipeline frameworks that focus on tasks and scheduling, xorq treats data transformations as first-class expressions, making your pipelines more readable, maintainable, and efficient.

First lets install `xorq` with the examples extra

In [1]:
!pip install 'xorq[examples]'

Collecting xorq[examples]
  Downloading xorq-0.1.16-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.2 kB)
Collecting dask==2025.1.0 (from xorq[examples])
  Downloading dask-2025.1.0-py3-none-any.whl.metadata (3.8 kB)
Collecting structlog<26,>=24.2.0 (from xorq[examples])
  Downloading structlog-25.2.0-py3-none-any.whl.metadata (8.6 kB)
Collecting pytest-mock<4,>=3.14.0 (from xorq[examples])
  Downloading pytest_mock-3.14.0-py3-none-any.whl.metadata (3.8 kB)
Collecting cityhash<1,>=0.4.7 (from xorq[examples])
  Downloading cityhash-0.4.7-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.5 kB)
Collecting pyarrow-hotfix<1,>=0.4 (from xorq[examples])
  Downloading pyarrow_hotfix-0.6-py3-none-any.whl.metadata (3.6 kB)
Collecting geoarrow-types<1,>=0.2 (from xorq[examples])
  Downloading geoarrow_types-0.2.0-py3-none-any.whl.metadata (370 bytes)
Collecting atpublic>=5.1 (from xorq[examples])
  Downloading atpublic-5.1-py3-none-any.whl.metadata (1.8 k

Now lets import the corresponding libraries and set up our environment to fetch the latest stories from Hacker News

In [2]:
import pandas as pd
import toolz
import xgboost as xgb
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import mean_absolute_error

import xorq as xo
import xorq.vendor.ibis.expr.datatypes as dt
from xorq.caching import ParquetStorage
from xorq.common.utils.defer_utils import deferred_read_parquet
from xorq.common.utils.import_utils import import_python
from xorq.expr.ml import (
    deferred_fit_predict,
    deferred_fit_transform_series_sklearn,
    train_test_splits,
)

# Import helper module with UDFs
m = import_python(xo.options.pins.get_path("hackernews_lib"))

# Set up connection and storage for caching
con = xo.connect()
storage = ParquetStorage(source=con)

# Load data and split into train/test sets
train_expr, test_expr = (
    deferred_read_parquet(
        con,
        xo.options.pins.get_path("hn-fetcher-input-small.parquet"),
        "fetcher-input",
    )
    .pipe(m.do_hackernews_fetcher_udxf, inner_name="inner-named-flight-udxf")
    .pipe(
        train_test_splits,
        unique_key="id",
        test_sizes=(0.9, 0.1),
        random_seed=0,
    )
)

Using `deferred_read_parquet` allows us to load the data without immediate execution, keeping everything as expressions that can be optimized.

## Text Vectorization and Model Training

To process the story titles, we use TF-IDF vectorization with a deferred approach:


In [4]:
transform_col = "title"
features = (transformed_col,) = (f"{transform_col}_transformed",)
target = "descendants"
target_predicted = f"{target}_predicted"

# Create deferred transformer for TF-IDF
deferred_fit_transform_tfidf = deferred_fit_transform_series_sklearn(
    col=transform_col,
    cls=TfidfVectorizer,
    return_type=dt.Array(dt.float64),
)

For our XGBoost model, we define custom functions for training and prediction:

In [5]:
@toolz.curry
def fit_xgboost_model(feature_df, target_series, seed=0):
    xgb_r = xgb.XGBRegressor(
        objective="reg:squarederror",
        eval_metric=mean_absolute_error,
        n_estimators=20,
        seed=seed,
    )
    X = pd.DataFrame(feature_df.squeeze().tolist())
    xgb_r.fit(X, target_series)
    return xgb_r

@toolz.curry
def predict_xgboost_model(model, df):
    return model.predict(df.squeeze().tolist())

# Create deferred predictor using our functions
deferred_fit_predict_xgb = deferred_fit_predict(
    target=target,
    features=list(features),
    fit=fit_xgboost_model,
    predict=predict_xgboost_model,
    return_type=dt.float32,
)


## Building the Pipeline Expression

Finally, we compose everything together into a single expressions, that works as a pipeline:

In [7]:
# Fit and transform with TF-IDF
(deferred_tfidf_model, tfidf_udaf, deferred_tfidf_transform) = (
    deferred_fit_transform_tfidf(
        train_expr,
        storage=storage,
    )
)

# Apply the transformation to training data
train_tfidf_transformed = train_expr.mutate(
    **{transformed_col: deferred_tfidf_transform.on_expr}
)

# Fit XGBoost and get prediction function
(deferred_xgb_model, xgb_udaf, deferred_xgb_predict) = deferred_fit_predict_xgb(
    train_tfidf_transformed,
    storage=storage,
)

# Apply transformation and prediction to test data
test_xgb_predicted = (
    test_expr.mutate(**{transformed_col: deferred_tfidf_transform.on_expr})
    .into_backend(xo.connect())
    .mutate(**{target_predicted: deferred_xgb_predict.on_expr})
)

## Executing and Evaluating the Pipeline

To run the pipeline and get predictions:


In [None]:
test_xgb_predicted.execute()