# COLLAGE Pipeline Demo

All logic lives in the `pipeline/` package. Each step follows the
**BaseStep** contract: `ingest → transform → request → validate → end`.

A single `items` list flows through the pipeline. Items are cumulative
dicts — each step's output is merged onto the item via `to_state_dict()`.
Downstream steps read prior outputs via `Model.from_state_dict(item)`.

```
load_products → room_recommendation → style_recommendation
```

| Step | Reads from item | Adds to item |
|------|----------------|--------------|
| `load_products()` | — | `Product` |
| `RoomRecommendationStep` | `Product` | `Room` |
| `StyleRecommendationStep` | `Product`, `Room` | `Style` |

In [1]:
# My home LLM Client uses my own Anthropic API key and makes all requests in "interactive" mode
# In prod, we will write an LLM Client that uses gcloud and makes batch requests instead
# the contract of BaseLLMClient is exactly the same either way.
import os

from dotenv import load_dotenv

load_dotenv()

if not os.getenv("ANTHROPIC_API_KEY"):
    raise EnvironmentError(
        "ANTHROPIC_API_KEY is not set. "
        "Add ANTHROPIC_API_KEY=sk-ant-... to a .env file in the repo root."
    )

print("API key loaded.")

API key loaded.


In [2]:
import logging

logging.basicConfig(level=logging.INFO, format="%(levelname)s %(name)s — %(message)s")
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)

from pipeline import (
    InteractiveAnthropicClient,
    RoomRecommendationStep,
    StyleRecommendationStep,
    load_products,
)

---

## Build and Run

In [None]:
llm_client = InteractiveAnthropicClient(model="claude-3-5-haiku-20241022", max_workers=15)

# Current design is barebones, just a linear pipeline.
# In the future, we could add branches, loops, etc. But no reason for that yet.
graph = [
    RoomRecommendationStep(name="room_rec", llm_client=llm_client),
    StyleRecommendationStep(name="style_rec", llm_client=llm_client),
]

In [None]:
# initialize state with a list of products to use
state = load_products()

# run pipeline
for step in graph:
    state.update(step(state))

INFO pipeline.steps.load_products — [load_products] run_id=19c1dbc0-ea07-4de1-826f-00c8d7920b53, 5 products loaded
INFO pipeline.steps.base_step — [room_rec] === starting ===
INFO pipeline.steps.base_step — [room_rec] ingest: 5 items, keys: ['Product']
INFO pipeline.steps.base_step — [room_rec] transform: 5 requests
INFO pipeline.steps.base_step — [room_rec] request: 5 responses
INFO pipeline.steps.base_step — [room_rec] validate: 15 outputs
INFO pipeline.steps.base_step — [room_rec] end: 15 items, keys: ['Product', 'Room']
INFO pipeline.steps.base_step — [room_rec] === done ===
INFO pipeline.steps.base_step — [style_rec] === starting ===
INFO pipeline.steps.base_step — [style_rec] ingest: 15 items, keys: ['Product', 'Room']
INFO pipeline.steps.base_step — [style_rec] transform: 15 requests
INFO pipeline.steps.base_step — [style_rec] request: 15 responses
INFO pipeline.steps.base_step — [style_rec] validate: 44 outputs
INFO pipeline.steps.base_step — [style_rec] end: 44 items, keys: ['

In [5]:
# summary of LLM usage
llm_client

InteractiveAnthropicClient(
	model='claude-3-5-haiku-20241022',
	max_workers=15,
	requests=20,
	input_tokens=11435,
	output_tokens=8621,
	total_tokens=20056
)

In [19]:
# examine final state - this is what is passed between each stage
from pipeline import ITEM_DATA, METADATA
itemdata_keys = ", ".join(list(state[ITEM_DATA][0].keys()))

print("FINAL STATE")
print("`state` is just a dict. It has two keys:")
print(f"\t\"{ITEM_DATA}\": {len(state[ITEM_DATA])}x elements, each is: ({itemdata_keys})")
print(f"\t\"{METADATA}\": flat dict of metadata: {state[METADATA]}")

FINAL STATE
`state` is just a dict. It has two keys:
	"item_data": 44x elements, each is: (Product, Room, Style)
	"metadata": flat dict of metadata: {'run_id': '19c1dbc0-ea07-4de1-826f-00c8d7920b53'}


In [8]:
# examine just one element. It is a unique combination of outputs from each step.
# NOTE that our pipeline started with 5x SKUs, but
#    - RoomRecommendationStep fanned out to 1-3x Rooms per SKU
#    - StyleRecommendationStep fanned out to 1-3x Styles per Room
# 5 x 3 x 3 = 45 unique results

first_item = state[ITEM_DATA][0]
print("first result:")
first_item

first result:


{'Product': {'sku': 'OAK-TBL-001',
  'name': 'Oakwood Dining Table',
  'category': 'furniture',
  'material': 'solid oak',
  'price': 1299.0},
 'Room': {'name': 'Dining Room',
  'reasoning': 'The Oakwood Dining Table is perfectly suited for a dining room as it is specifically designed for dining purposes. Made of solid oak, it offers a classic, timeless look that can anchor the dining space with a high-quality, natural wood aesthetic. Its substantial material and craftsmanship justify the premium price point and make it an ideal centerpiece for formal or casual dining areas.'},
 'Style': {'name': 'Scandinavian Minimalist',
  'color_palette': ['white', 'light gray', 'natural wood tones'],
  'reasoning': "The solid oak dining table perfectly embodies the Scandinavian design philosophy of showcasing natural materials and clean lines. This style emphasizes simplicity, functionality, and the beauty of wood grain. The light, neutral color palette will highlight the oak's warm tones, creating

In [None]:
# although state is a dict, each step accessed it in a nicely structured way - letting us do static type checking
from pipeline.models import Room
room_obj = Room.from_state_dict(first_item)
print(room_obj)
print(room_obj.name)


name='Dining Room' reasoning='The Oakwood Dining Table is perfectly suited for a dining room as it is specifically designed for dining purposes. Made of solid oak, it offers a classic, timeless look that can anchor the dining space with a high-quality, natural wood aesthetic. Its substantial material and craftsmanship justify the premium price point and make it an ideal centerpiece for formal or casual dining areas.'
Dining Room


In [None]:
# state is JSON-serializable out of the box. If we want to save it to disk/gcs to checkpoint, it's trivial
import json
import tempfile

with tempfile.TemporaryDirectory() as tmp_dir:
    state_path = os.path.join(tmp_dir, "state.json")
    with open(state_path, "w") as f:
        json.dump(state, f)
    size = os.path.getsize(state_path)
    size_mb = size / 1024 / 1024    
    print(f"state.json: ({size_mb:.3f} MB)")
    print(f"extended to 100k SKUs, this would be {size_mb * 100_000 / len(state[ITEM_DATA]):.3f} MB")

state.json: (0.048 MB)
extended to 100k SKUs, this would be 109.029 MB


---

## Extending the Pipeline

- **Add 2x new dataclasses**: One which is the expected format of the LLM response, and one which represents a singular result, if this step fans out. The final output of the stage (`Room`) must subclass `StepOutput`
    - For example, a `RoomResponse` contains 1-3 `Room`, and we fan out. This stage thus ~3x's the input
- **Add a new step**: subclass `BaseStep`, implement `transform` and `validate`. These should use the dataclasses you just wrote
    - if you dont need to use an LLM for any stage, that's fine! You can override what the `request()` method does.
- **Fan-out in `validate`**: decapsulate the LLM response into individual `StepOutput` objects — one per unit of work for the next step. Return `(source_item, output)` pairs.
    - Validate must discard invalid results, but you're welcome to log them somewhere if you'd like - so we can retry later
- **Typed downstream access**: use `Model.from_state_dict(item)` in `transform` or `validate` to get typed access to any prior step's output. No need to access dict keys directly
- **Disk serialization**: `to_state_dict()` writes `model_dump()` dicts, so state is JSON-serializable out of the box. `BaseStep.ingest()` and `BaseStep.end()` just hand off the raw dict. Users may get involved to serialize/deserialize this if they'd like.