Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 153 additions & 0 deletions src/content/docs/guides/routing/fan-out-with-subpipelines.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
---
title: Fan out with subpipelines
---

This guide shows you how to fan out an event stream into subpipelines with
<Op>each</Op> and <Op>group</Op>. You'll learn when to spawn one subpipeline per
event, when to keep one subpipeline per key, and how these operators differ
from fixed fan-out operators like <Op>fork</Op>, <Op>parallel</Op>, and
<Op>load_balance</Op>.

## Choose a fan-out pattern

Tenzir has several operators that send events into subpipelines. Choose the
operator based on how many subpipelines you need and how events should flow into
them:

| Operator | Subpipelines | Event flow | Use case |
| --------------------- | ------------------------------------- | ------------------------------------------------------------------ | ------------------------------------------------------------- |
| <Op>fork</Op> | One fixed side branch | Every event goes to the main pipeline and the side branch | Archive or publish a copy while continuing processing |
| <Op>parallel</Op> | A fixed number of workers | Each event goes to one worker running the same subpipeline | Speed up CPU-heavy or I/O-heavy work |
| <Op>load_balance</Op> | One branch per configured target | Each event goes to one target | Distribute load across equivalent sinks |
| <Op>each</Op> | One fresh subpipeline per input event | The input event is available as `$this`; it is not passed as input | Run a per-event job, such as a lookup or export |
| <Op>group</Op> | One subpipeline per key | Matching events are passed to the same keyed subpipeline | Keep per-tenant, per-host, or per-session processing isolated |

Use regular transformations when every event can flow through the same linear
pipeline. Use subpipeline fan-out when the pipeline structure itself depends on
each event or key.

## Run one subpipeline per event

Use <Op>each</Op> when every input event describes a job to run. The nested
pipeline must start with a source because `each` does not pass the input event
into the subpipeline. Instead, it binds the current event record to `$this`.

The following pipeline treats incoming cases as lookup requests. Each case
queries the same historical dataset for matching source IPs and annotates the
matches with the case ID:

```tql
from {case_id: "C-1", ip: "10.0.0.5"},
{case_id: "C-2", ip: "10.0.0.7"}
each parallel=4 {
from {ts: 2024-01-01T10:00:00, src_ip: "10.0.0.5", action: "login"},
{ts: 2024-01-01T10:02:00, src_ip: "10.0.0.8", action: "scan"},
{ts: 2024-01-01T10:05:00, src_ip: "10.0.0.7", action: "download"}
where src_ip == $this.ip
case_id = $this.case_id
}
sort case_id, ts
```

```tql
{
ts: 2024-01-01T10:00:00.000000,
src_ip: "10.0.0.5",
action: "login",
case_id: "C-1",
}
{
ts: 2024-01-01T10:05:00.000000,
src_ip: "10.0.0.7",
action: "download",
case_id: "C-2",
}
```

The `parallel` option limits how many per-event jobs run at the same time. When
more events arrive, `each` queues them and applies back pressure upstream until
a running subpipeline finishes. Keep this value bounded for external APIs,
expensive exports, or destinations with rate limits.

## Keep one subpipeline per key

Use <Op>group</Op> when events with the same key must go through the same
subpipeline. Unlike `each`, the nested pipeline receives input: Tenzir sends all
matching events for a key to that key's subpipeline. The key is available as
`$group` inside the subpipeline.

The following pipeline keeps tenant streams separate and computes a summary per
tenant:

```tql
from {tenant: "alpha", bytes: 120},
{tenant: "beta", bytes: 90},
{tenant: "alpha", bytes: 80}
group tenant {
summarize events=count(), bytes=sum(bytes)
tenant = $group
}
sort tenant
```

```tql
{
events: 2,
bytes: 200,
tenant: "alpha",
}
{
events: 1,
bytes: 90,
tenant: "beta",
}
```

For a pure aggregation, <Op>summarize</Op> is usually shorter. Use `group` when
the per-key subpipeline does more than aggregate, such as keeping state,
applying a keyed transformation, or writing to a key-specific sink.

## Write separate outputs per key

A common `group` pattern is to write each tenant, host, or sensor to its own
file. The subpipeline ends with a sink, so `group` itself becomes a sink:

```tql
from {tenant: "alpha", message: "login"},
{tenant: "beta", message: "scan"},
{tenant: "alpha", message: "logout"}
group tenant {
to_file f"/tmp/tenzir/{$group}.json" {
write_ndjson
}
}
```

This creates one subpipeline per tenant and writes matching events to that
subpipeline's file.

## Avoid common mistakes

- Don't use `each` for ordinary per-event transformations. Use regular TQL
statements or <Op>parallel</Op> when every event follows the same processing
steps.
- Don't use `group` only to calculate grouped totals. Use <Op>summarize</Op>
unless you need a full subpipeline per key.
- Don't leave `each` unbounded for external systems. Set `parallel` to match
the concurrency that the downstream service can handle.
- Remember that `each` subpipelines must start with a source, while `group`
subpipelines receive the grouped input stream.
- Neither `each` nor `group` can use subpipelines that produce bytes as output.

## See Also

- <Op>each</Op>
- <Op>group</Op>
- <Op>fork</Op>
- <Op>parallel</Op>
- <Op>load_balance</Op>
- <Op>publish</Op>
- <Op>subscribe</Op>
- <Op>summarize</Op>
- <Guide>routing/split-and-merge-streams</Guide>
- <Guide>routing/load-balance-pipelines</Guide>
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ blocking production pipelines.
## See also

- <Guide>routing/send-to-destinations</Guide>
- <Guide>routing/fan-out-with-subpipelines</Guide>
- <Guide>routing/load-balance-pipelines</Guide>
- <Op>publish</Op>
- <Op>subscribe</Op>
Expand Down
24 changes: 24 additions & 0 deletions src/content/docs/reference/operators.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ operators:
description: 'Discards all incoming events.'
example: 'discard'
path: 'reference/operators/discard'
- name: 'each'
description: 'Spawns a subpipeline for every incoming event, with the event bound to `$this`.'
example: 'each { from $this }'
path: 'reference/operators/each'
- name: 'every'
description: 'Runs a pipeline periodically at a fixed interval.'
example: 'every 10s { summarize sum(amount) }'
Expand All @@ -215,6 +219,10 @@ operators:
description: 'Executes a subpipeline with a copy of the input.'
example: 'fork { to "copy.json" }'
path: 'reference/operators/fork'
- name: 'group'
description: 'Routes events with the same key through the same subpipeline.'
example: 'group tenant { summarize count() }'
path: 'reference/operators/group'
- name: 'load_balance'
description: 'Routes the data to one of multiple subpipelines.'
example: 'load_balance $over { publish $over }'
Expand Down Expand Up @@ -1358,6 +1366,14 @@ discard

</ReferenceCard>

<ReferenceCard title="each" description="Spawns a subpipeline for every incoming event, with the event bound to `$this`." href="/reference/operators/each">

```tql
each { from $this }
```

</ReferenceCard>

<ReferenceCard title="every" description="Runs a pipeline periodically at a fixed interval." href="/reference/operators/every">

```tql
Expand All @@ -1374,6 +1390,14 @@ fork { to "copy.json" }

</ReferenceCard>

<ReferenceCard title="group" description="Routes events with the same key through the same subpipeline." href="/reference/operators/group">

```tql
group tenant { summarize count() }
```

</ReferenceCard>

<ReferenceCard title="load_balance" description="Routes the data to one of multiple subpipelines." href="/reference/operators/load_balance">

```tql
Expand Down
1 change: 1 addition & 0 deletions src/content/docs/reference/operators/cron.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,5 @@ publish "api"

## See Also

- <Op>each</Op>
- <Op>every</Op>
100 changes: 100 additions & 0 deletions src/content/docs/reference/operators/each.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
---
title: each
category: Flow Control
example: 'each { from $this }'
---

Spawns a subpipeline for every incoming event, with the event bound to `$this`.

```tql
each [parallel=int] { … }
```

## Description

The `each` operator runs a fresh subpipeline for every incoming event. The
record of the current event is bound to `$this` inside the subpipeline, so the
subpipeline can parametrize its behavior on a per-event basis.

The subpipeline takes no input from `each`. It either emits events—which are
forwarded as the operator's output—or ends with a sink, in which case `each`
itself becomes a sink. The subpipeline must not produce bytes.

Use `each` for per-event jobs, such as running a lookup, export, or sink whose
source depends on the incoming event. For keyed streams that should keep one
subpipeline alive per key, use <Op>group</Op> instead.

### `parallel = int (optional)`

The maximum number of subpipelines that may run concurrently. Must be at least
`1`. Excess events queue and start as soon as a slot frees.

Defaults to `10`.

### `{ … }`

The subpipeline to spawn for each event. Must start with a source.

Inside the subpipeline, `$this` refers to the record of the current input
event.

## Examples

### Run a lookup per event

Use fields from the input event to parametrize a source subpipeline. This
example treats the input as investigation cases and searches a historical event
set for matching source IPs:

```tql
from {case_id: "C-1", ip: "10.0.0.5"},
{case_id: "C-2", ip: "10.0.0.7"}
each parallel=4 {
from {ts: 2024-01-01T10:00:00, src_ip: "10.0.0.5", action: "login"},
{ts: 2024-01-01T10:02:00, src_ip: "10.0.0.8", action: "scan"},
{ts: 2024-01-01T10:05:00, src_ip: "10.0.0.7", action: "download"}
where src_ip == $this.ip
case_id = $this.case_id
}
sort case_id, ts
```

```tql
{
ts: 2024-01-01T10:00:00.000000,
src_ip: "10.0.0.5",
action: "login",
case_id: "C-1",
}
{
ts: 2024-01-01T10:05:00.000000,
src_ip: "10.0.0.7",
action: "download",
case_id: "C-2",
}
```

### Run a per-event sink

When the subpipeline ends with a sink, `each` itself becomes a sink. Use this
to write a separate output file per tenant in the input:

```tql
from {tenant: "alpha"}, {tenant: "beta"}
each {
from {tenant: $this.tenant, status: "ok"},
{tenant: $this.tenant, status: "fail"}
to_file f"/tmp/tenzir/{$this.tenant}.json" {
write_ndjson
}
}
```

## See Also

- <Op>cron</Op>
- <Op>every</Op>
- <Op>fork</Op>
- <Op>group</Op>
- <Op>parallel</Op>
- <Guide>routing/fan-out-with-subpipelines</Guide>
1 change: 1 addition & 0 deletions src/content/docs/reference/operators/every.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ publish "threat-feed"
## See Also

- <Op>cron</Op>
- <Op>each</Op>
- <Guide>transformation/work-with-time</Guide>
- <Guide>enrichment/work-with-lookup-tables</Guide>
- <Tutorial>write-a-package</Tutorial>
Loading
Loading