# Conditional Step Execution (`run_if` / `skip_if`)

Accrue pipelines often need to skip steps for certain rows based on data conditions:
- "Only run credit check for US companies"
- "Skip deep research if the classifier said 'irrelevant'"
- "Only call the expensive LLM step for high-priority leads"

Both `LLMStep` and `FunctionStep` accept `run_if` and `skip_if` predicates.
These are evaluated **per-row** before the step executes.

- `run_if`: Step runs only when the predicate returns `True`
- `skip_if`: Step is skipped when the predicate returns `True`
- They are **mutually exclusive** (use one or the other)
- Skipped rows get `None` values (or field spec `default` where available)
- Skipped rows **never hit cache** or call `step.run()`

**Predicate signature:** `(row: dict, prior_results: dict) -> bool`

> All examples below use `FunctionStep` so they run without an API key.
> `LLMStep` works identically — just add `run_if`/`skip_if` to the constructor.

In [None]:
from accrue import Pipeline, FunctionStep, LLMStep, EnrichmentConfig, EnrichmentHooks

## 1. Basic `run_if` — filter rows by data

The simplest case: only run a step for rows matching a condition.
Rows that don't match get `None` for all output fields.

In [None]:
data = [
    {"company": "Acme Corp", "country": "US"},
    {"company": "Beta GmbH", "country": "DE"},
    {"company": "Gamma Inc", "country": "US"},
    {"company": "Delta Ltd", "country": "UK"},
]

pipeline = Pipeline([
    FunctionStep(
        "us_check",
        fn=lambda ctx: {"credit_score": f"{ctx.row['company']}: AAA"},
        fields=["credit_score"],
        run_if=lambda row, prior: row["country"] == "US",
    )
])

config = EnrichmentConfig(enable_progress_bar=False)
result = await pipeline.run_async(data, config)

for row in result.data:
    status = row['credit_score'] or '(skipped)'
    print(f"  {row['company']:12s} [{row['country']}] -> {status}")

## 2. `skip_if` — the inverse

`skip_if` is the opposite of `run_if`: the step is **skipped** when the predicate returns `True`.
Use whichever reads more naturally for your use case.

In [None]:
pipeline = Pipeline([
    FunctionStep(
        "non_us_enrichment",
        fn=lambda ctx: {"local_reg": f"{ctx.row['country']} regulations apply"},
        fields=["local_reg"],
        skip_if=lambda row, prior: row["country"] == "US",
    )
])

result = await pipeline.run_async(data, config)

for row in result.data:
    status = row['local_reg'] or '(skipped — US company)'
    print(f"  {row['company']:12s} [{row['country']}] -> {status}")

## 3. Predicates using `prior_results`

The predicate receives `(row, prior_results)` — so downstream steps can branch
based on what upstream steps produced. This is the **conditional pipeline** pattern:
classify first, then only run expensive steps for rows that need them.

In [None]:
leads = [
    {"company": "Stripe",    "annual_revenue": 15_000_000_000},
    {"company": "Joe's Deli", "annual_revenue": 500_000},
    {"company": "Notion",    "annual_revenue": 1_000_000_000},
    {"company": "Corner Shop", "annual_revenue": 200_000},
]

pipeline = Pipeline([
    # Step 1: classify every lead (cheap / fast)
    FunctionStep(
        "classify",
        fn=lambda ctx: {
            "tier": "enterprise" if ctx.row["annual_revenue"] > 10_000_000 else "smb"
        },
        fields=["tier"],
    ),
    # Step 2: only run deep research for enterprise leads
    FunctionStep(
        "deep_research",
        fn=lambda ctx: {"research": f"Deep analysis of {ctx.row['company']}"},
        fields=["research"],
        depends_on=["classify"],
        run_if=lambda row, prior: prior.get("tier") == "enterprise",
    ),
])

result = await pipeline.run_async(leads, config)

for row in result.data:
    research = row['research'] or '(skipped — SMB)'
    print(f"  {row['company']:12s} tier={row['tier']:10s} -> {research}")

## 4. Skipped rows flow to downstream steps

Skipped rows produce `None` (or field defaults) as their output values.
Downstream steps see these values in `prior_results` — so you can chain
conditional logic across multiple steps.

In [None]:
pipeline = Pipeline([
    FunctionStep(
        "classify",
        fn=lambda ctx: {"label": "relevant"},
        fields=["label"],
        run_if=lambda row, prior: row["score"] > 5,
    ),
    # This step runs for ALL rows — it sees label=None for skipped rows
    FunctionStep(
        "summarize",
        fn=lambda ctx: {
            "summary": f"label={ctx.prior_results.get('label')}, score={ctx.row['score']}"
        },
        fields=["summary"],
        depends_on=["classify"],
    ),
])

result = await pipeline.run_async(
    [{"score": 10}, {"score": 2}, {"score": 8}], config
)

for row in result.data:
    print(f"  score={row['score']:2d}  label={str(row['label']):>10s}  summary={row['summary']}")

## 5. `rows_skipped` in usage stats

`StepUsage` now includes a `rows_skipped` counter so you can see exactly
how many rows were skipped vs executed vs cached.

In [None]:
pipeline = Pipeline([
    FunctionStep(
        "expensive_step",
        fn=lambda ctx: {"result": "done"},
        fields=["result"],
        run_if=lambda row, prior: row["priority"] == "high",
    )
])

data = [
    {"id": 1, "priority": "high"},
    {"id": 2, "priority": "low"},
    {"id": 3, "priority": "low"},
    {"id": 4, "priority": "high"},
    {"id": 5, "priority": "low"},
]

result = await pipeline.run_async(data, config)

usage = result.cost.steps["expensive_step"]
rows_executed = len(data) - usage.rows_skipped
print(f"Total rows:    {len(data)}")
print(f"rows_skipped:  {usage.rows_skipped}")
print(f"rows_executed: {rows_executed}")
print(f"\nSkipping saved {usage.rows_skipped}/{len(data)} step executions ({usage.rows_skipped/len(data):.0%})")

## 6. Hooks: `skipped` flag on `RowCompleteEvent`

`RowCompleteEvent` now includes `skipped: bool` so observability hooks
can distinguish skipped rows from executed rows.

In [None]:
from accrue.core.hooks import RowCompleteEvent

events: list[RowCompleteEvent] = []

pipeline = Pipeline([
    FunctionStep(
        "check",
        fn=lambda ctx: {"status": "checked"},
        fields=["status"],
        run_if=lambda row, prior: row["country"] == "US",
    )
])

hooks = EnrichmentHooks(on_row_complete=lambda e: events.append(e))
await pipeline.run_async(
    [{"country": "US"}, {"country": "UK"}, {"country": "US"}],
    config,
    hooks=hooks,
)

for e in sorted(events, key=lambda e: e.row_index):
    print(f"  Row {e.row_index}: skipped={e.skipped}, values={e.values}")

## 7. Caching interaction

Skipped rows **never create cache entries**. The skip decision is deterministic
from the predicate — same inputs always produce the same skip decision.
Non-skipped rows are cached normally.

In [None]:
import tempfile

cache_dir = tempfile.mkdtemp()
call_count = 0

def counting_fn(ctx):
    global call_count
    call_count += 1
    return {"result": f"processed {ctx.row['name']}"}

pipeline = Pipeline([
    FunctionStep(
        "process",
        fn=counting_fn,
        fields=["result"],
        run_if=lambda row, prior: row["active"],
    )
])

cache_config = EnrichmentConfig(
    enable_caching=True,
    cache_dir=cache_dir,
    enable_progress_bar=False,
)

rows = [
    {"name": "Alice", "active": True},
    {"name": "Bob",   "active": False},
    {"name": "Carol", "active": True},
]

# First run: only active rows execute
call_count = 0
r1 = await pipeline.run_async(rows, cache_config)
u1 = r1.cost.steps["process"]
print(f"Run 1: fn called {call_count}x, cache_hits={u1.cache_hits}, "
      f"cache_misses={u1.cache_misses}, rows_skipped={u1.rows_skipped}")

# Second run: active rows served from cache, inactive still skipped
call_count = 0
r2 = await pipeline.run_async(rows, cache_config)
u2 = r2.cost.steps["process"]
print(f"Run 2: fn called {call_count}x, cache_hits={u2.cache_hits}, "
      f"cache_misses={u2.cache_misses}, rows_skipped={u2.rows_skipped}")

# Cleanup
import shutil
shutil.rmtree(cache_dir, ignore_errors=True)

## 8. Mutual exclusivity validation

Setting both `run_if` and `skip_if` on the same step raises `PipelineError` at construction time.

In [None]:
from accrue.core.exceptions import PipelineError

try:
    FunctionStep(
        "bad",
        fn=lambda ctx: {},
        fields=["f"],
        run_if=lambda row, prior: True,
        skip_if=lambda row, prior: False,
    )
except PipelineError as e:
    print(f"Caught at construction time (as expected):\n  {e}")

## 9. Async predicates

Both sync and async predicates are supported. Async predicates are awaited automatically.

In [None]:
import asyncio

async def check_eligibility(row, prior):
    """Simulate an async eligibility check (e.g. database lookup)."""
    await asyncio.sleep(0)  # simulate I/O
    return row.get("eligible", False)

pipeline = Pipeline([
    FunctionStep(
        "enroll",
        fn=lambda ctx: {"enrolled": True},
        fields=["enrolled"],
        run_if=check_eligibility,
    )
])

result = await pipeline.run_async(
    [{"name": "Alice", "eligible": True}, {"name": "Bob", "eligible": False}],
    config,
)

for row in result.data:
    print(f"  {row['name']}: enrolled={row['enrolled']}")

## 10. Real-world pattern: classify → conditional deep enrichment

A common pipeline pattern: a cheap classifier decides which rows get expensive enrichment.
This saves API calls and cost for rows that don't need them.

In [None]:
companies = [
    {"company": "Stripe",       "sector": "fintech",      "country": "US"},
    {"company": "Klarna",       "sector": "fintech",      "country": "SE"},
    {"company": "Notion",       "sector": "productivity", "country": "US"},
    {"company": "Spotify",      "sector": "media",        "country": "SE"},
    {"company": "Canva",        "sector": "design",       "country": "AU"},
]

pipeline = Pipeline([
    # Step 1: tag every row (cheap, runs for all)
    FunctionStep(
        "tag",
        fn=lambda ctx: {"is_target": ctx.row["sector"] == "fintech" and ctx.row["country"] == "US"},
        fields=["is_target"],
    ),
    # Step 2: only enrich target companies
    FunctionStep(
        "enrich",
        fn=lambda ctx: {
            "analysis": f"{ctx.row['company']}: US fintech deep-dive complete",
            "risk_score": 42,
        },
        fields=["analysis", "risk_score"],
        depends_on=["tag"],
        run_if=lambda row, prior: prior.get("is_target") is True,
    ),
])

result = await pipeline.run_async(companies, config)

print(f"{'Company':12s} {'Sector':14s} {'Country':8s} {'Target':8s} {'Analysis'}")
print("-" * 75)
for row in result.data:
    analysis = row.get('analysis') or '(skipped)'
    print(f"{row['company']:12s} {row['sector']:14s} {row['country']:8s} "
          f"{str(row['is_target']):8s} {analysis}")

usage = result.cost.steps.get("enrich")
if usage:
    print(f"\nEnrich step: {usage.rows_skipped} of {len(companies)} rows skipped")