diff --git a/src/content/docs/guides/routing/fan-out-with-subpipelines.mdx b/src/content/docs/guides/routing/fan-out-with-subpipelines.mdx new file mode 100644 index 000000000..4c8c34d07 --- /dev/null +++ b/src/content/docs/guides/routing/fan-out-with-subpipelines.mdx @@ -0,0 +1,153 @@ +--- +title: Fan out with subpipelines +--- + +This guide shows you how to fan out an event stream into subpipelines with +each and group. You'll learn when to spawn one subpipeline per +event, when to keep one subpipeline per key, and how these operators differ +from fixed fan-out operators like fork, parallel, and +load_balance. + +## Choose a fan-out pattern + +Tenzir has several operators that send events into subpipelines. Choose the +operator based on how many subpipelines you need and how events should flow into +them: + +| Operator | Subpipelines | Event flow | Use case | +| --------------------- | ------------------------------------- | ------------------------------------------------------------------ | ------------------------------------------------------------- | +| fork | One fixed side branch | Every event goes to the main pipeline and the side branch | Archive or publish a copy while continuing processing | +| parallel | A fixed number of workers | Each event goes to one worker running the same subpipeline | Speed up CPU-heavy or I/O-heavy work | +| load_balance | One branch per configured target | Each event goes to one target | Distribute load across equivalent sinks | +| each | One fresh subpipeline per input event | The input event is available as `$this`; it is not passed as input | Run a per-event job, such as a lookup or export | +| group | One subpipeline per key | Matching events are passed to the same keyed subpipeline | Keep per-tenant, per-host, or per-session processing isolated | + +Use regular transformations when every event can flow through the same linear +pipeline. Use subpipeline fan-out when the pipeline structure itself depends on +each event or key. + +## Run one subpipeline per event + +Use each when every input event describes a job to run. The nested +pipeline must start with a source because `each` does not pass the input event +into the subpipeline. Instead, it binds the current event record to `$this`. + +The following pipeline treats incoming cases as lookup requests. Each case +queries the same historical dataset for matching source IPs and annotates the +matches with the case ID: + +```tql +from {case_id: "C-1", ip: "10.0.0.5"}, + {case_id: "C-2", ip: "10.0.0.7"} +each parallel=4 { + from {ts: 2024-01-01T10:00:00, src_ip: "10.0.0.5", action: "login"}, + {ts: 2024-01-01T10:02:00, src_ip: "10.0.0.8", action: "scan"}, + {ts: 2024-01-01T10:05:00, src_ip: "10.0.0.7", action: "download"} + where src_ip == $this.ip + case_id = $this.case_id +} +sort case_id, ts +``` + +```tql +{ + ts: 2024-01-01T10:00:00.000000, + src_ip: "10.0.0.5", + action: "login", + case_id: "C-1", +} +{ + ts: 2024-01-01T10:05:00.000000, + src_ip: "10.0.0.7", + action: "download", + case_id: "C-2", +} +``` + +The `parallel` option limits how many per-event jobs run at the same time. When +more events arrive, `each` queues them and applies back pressure upstream until +a running subpipeline finishes. Keep this value bounded for external APIs, +expensive exports, or destinations with rate limits. + +## Keep one subpipeline per key + +Use group when events with the same key must go through the same +subpipeline. Unlike `each`, the nested pipeline receives input: Tenzir sends all +matching events for a key to that key's subpipeline. The key is available as +`$group` inside the subpipeline. + +The following pipeline keeps tenant streams separate and computes a summary per +tenant: + +```tql +from {tenant: "alpha", bytes: 120}, + {tenant: "beta", bytes: 90}, + {tenant: "alpha", bytes: 80} +group tenant { + summarize events=count(), bytes=sum(bytes) + tenant = $group +} +sort tenant +``` + +```tql +{ + events: 2, + bytes: 200, + tenant: "alpha", +} +{ + events: 1, + bytes: 90, + tenant: "beta", +} +``` + +For a pure aggregation, summarize is usually shorter. Use `group` when +the per-key subpipeline does more than aggregate, such as keeping state, +applying a keyed transformation, or writing to a key-specific sink. + +## Write separate outputs per key + +A common `group` pattern is to write each tenant, host, or sensor to its own +file. The subpipeline ends with a sink, so `group` itself becomes a sink: + +```tql +from {tenant: "alpha", message: "login"}, + {tenant: "beta", message: "scan"}, + {tenant: "alpha", message: "logout"} +group tenant { + to_file f"/tmp/tenzir/{$group}.json" { + write_ndjson + } +} +``` + +This creates one subpipeline per tenant and writes matching events to that +subpipeline's file. + +## Avoid common mistakes + +- Don't use `each` for ordinary per-event transformations. Use regular TQL + statements or parallel when every event follows the same processing + steps. +- Don't use `group` only to calculate grouped totals. Use summarize + unless you need a full subpipeline per key. +- Don't leave `each` unbounded for external systems. Set `parallel` to match + the concurrency that the downstream service can handle. +- Remember that `each` subpipelines must start with a source, while `group` + subpipelines receive the grouped input stream. +- Neither `each` nor `group` can use subpipelines that produce bytes as output. + +## See Also + +- each +- group +- fork +- parallel +- load_balance +- publish +- subscribe +- summarize +- routing/split-and-merge-streams +- routing/load-balance-pipelines diff --git a/src/content/docs/guides/routing/split-and-merge-streams.mdx b/src/content/docs/guides/routing/split-and-merge-streams.mdx index f09a90be4..8d0a7830b 100644 --- a/src/content/docs/guides/routing/split-and-merge-streams.mdx +++ b/src/content/docs/guides/routing/split-and-merge-streams.mdx @@ -164,6 +164,7 @@ blocking production pipelines. ## See also - routing/send-to-destinations +- routing/fan-out-with-subpipelines - routing/load-balance-pipelines - publish - subscribe diff --git a/src/content/docs/reference/operators.mdx b/src/content/docs/reference/operators.mdx index 3144666a8..adc584fe5 100644 --- a/src/content/docs/reference/operators.mdx +++ b/src/content/docs/reference/operators.mdx @@ -207,6 +207,10 @@ operators: description: 'Discards all incoming events.' example: 'discard' path: 'reference/operators/discard' + - name: 'each' + description: 'Spawns a subpipeline for every incoming event, with the event bound to `$this`.' + example: 'each { from $this }' + path: 'reference/operators/each' - name: 'every' description: 'Runs a pipeline periodically at a fixed interval.' example: 'every 10s { summarize sum(amount) }' @@ -215,6 +219,10 @@ operators: description: 'Executes a subpipeline with a copy of the input.' example: 'fork { to "copy.json" }' path: 'reference/operators/fork' + - name: 'group' + description: 'Routes events with the same key through the same subpipeline.' + example: 'group tenant { summarize count() }' + path: 'reference/operators/group' - name: 'load_balance' description: 'Routes the data to one of multiple subpipelines.' example: 'load_balance $over { publish $over }' @@ -1358,6 +1366,14 @@ discard + + +```tql +each { from $this } +``` + + + ```tql @@ -1374,6 +1390,14 @@ fork { to "copy.json" } + + +```tql +group tenant { summarize count() } +``` + + + ```tql diff --git a/src/content/docs/reference/operators/cron.mdx b/src/content/docs/reference/operators/cron.mdx index 93596b8c5..e8ed01e08 100644 --- a/src/content/docs/reference/operators/cron.mdx +++ b/src/content/docs/reference/operators/cron.mdx @@ -66,4 +66,5 @@ publish "api" ## See Also +- each - every diff --git a/src/content/docs/reference/operators/each.mdx b/src/content/docs/reference/operators/each.mdx new file mode 100644 index 000000000..986765ff8 --- /dev/null +++ b/src/content/docs/reference/operators/each.mdx @@ -0,0 +1,100 @@ +--- +title: each +category: Flow Control +example: 'each { from $this }' +--- + +Spawns a subpipeline for every incoming event, with the event bound to `$this`. + +```tql +each [parallel=int] { … } +``` + +## Description + +The `each` operator runs a fresh subpipeline for every incoming event. The +record of the current event is bound to `$this` inside the subpipeline, so the +subpipeline can parametrize its behavior on a per-event basis. + +The subpipeline takes no input from `each`. It either emits events—which are +forwarded as the operator's output—or ends with a sink, in which case `each` +itself becomes a sink. The subpipeline must not produce bytes. + +Use `each` for per-event jobs, such as running a lookup, export, or sink whose +source depends on the incoming event. For keyed streams that should keep one +subpipeline alive per key, use group instead. + +### `parallel = int (optional)` + +The maximum number of subpipelines that may run concurrently. Must be at least +`1`. Excess events queue and start as soon as a slot frees. + +Defaults to `10`. + +### `{ … }` + +The subpipeline to spawn for each event. Must start with a source. + +Inside the subpipeline, `$this` refers to the record of the current input +event. + +## Examples + +### Run a lookup per event + +Use fields from the input event to parametrize a source subpipeline. This +example treats the input as investigation cases and searches a historical event +set for matching source IPs: + +```tql +from {case_id: "C-1", ip: "10.0.0.5"}, + {case_id: "C-2", ip: "10.0.0.7"} +each parallel=4 { + from {ts: 2024-01-01T10:00:00, src_ip: "10.0.0.5", action: "login"}, + {ts: 2024-01-01T10:02:00, src_ip: "10.0.0.8", action: "scan"}, + {ts: 2024-01-01T10:05:00, src_ip: "10.0.0.7", action: "download"} + where src_ip == $this.ip + case_id = $this.case_id +} +sort case_id, ts +``` + +```tql +{ + ts: 2024-01-01T10:00:00.000000, + src_ip: "10.0.0.5", + action: "login", + case_id: "C-1", +} +{ + ts: 2024-01-01T10:05:00.000000, + src_ip: "10.0.0.7", + action: "download", + case_id: "C-2", +} +``` + +### Run a per-event sink + +When the subpipeline ends with a sink, `each` itself becomes a sink. Use this +to write a separate output file per tenant in the input: + +```tql +from {tenant: "alpha"}, {tenant: "beta"} +each { + from {tenant: $this.tenant, status: "ok"}, + {tenant: $this.tenant, status: "fail"} + to_file f"/tmp/tenzir/{$this.tenant}.json" { + write_ndjson + } +} +``` + +## See Also + +- cron +- every +- fork +- group +- parallel +- routing/fan-out-with-subpipelines diff --git a/src/content/docs/reference/operators/every.mdx b/src/content/docs/reference/operators/every.mdx index 90257f9ae..74485061e 100644 --- a/src/content/docs/reference/operators/every.mdx +++ b/src/content/docs/reference/operators/every.mdx @@ -51,6 +51,7 @@ publish "threat-feed" ## See Also - cron +- each - transformation/work-with-time - enrichment/work-with-lookup-tables - write-a-package diff --git a/src/content/docs/reference/operators/group.mdx b/src/content/docs/reference/operators/group.mdx new file mode 100644 index 000000000..13c9a05e6 --- /dev/null +++ b/src/content/docs/reference/operators/group.mdx @@ -0,0 +1,87 @@ +--- +title: group +category: Flow Control +example: 'group tenant { summarize count() }' +--- + +Routes events with the same key through the same subpipeline. + +```tql +group over:expr { … } +``` + +## Description + +The `group` operator evaluates `over` for every incoming event and creates one +subpipeline for every distinct key. Events with the same key are sent to the +same subpipeline. Inside the subpipeline, `$group` refers to the key for that +subpipeline. + +The subpipeline receives grouped events as input. It either emits events—which +are forwarded as the operator's output—or ends with a sink, in which case +`group` itself becomes a sink. The subpipeline must not produce bytes. + +Use `group` when you need a full keyed subpipeline, such as a per-tenant sink or +a per-session stateful transformation. For grouped aggregations only, use +summarize instead. + +### `over: expr` + +The expression that computes the group key for every incoming event. + +### `{ … }` + +The subpipeline to run for every distinct key. The subpipeline receives the +matching events as input. + +Inside the subpipeline, `$group` refers to the current key. + +## Examples + +### Summarize each tenant independently + +```tql +from {tenant: "alpha", bytes: 120}, + {tenant: "beta", bytes: 90}, + {tenant: "alpha", bytes: 80} +group tenant { + summarize events=count(), bytes=sum(bytes) + tenant = $group +} +sort tenant +``` + +```tql +{ + events: 2, + bytes: 200, + tenant: "alpha", +} +{ + events: 1, + bytes: 90, + tenant: "beta", +} +``` + +### Write a file per tenant + +```tql +from {tenant: "alpha", message: "login"}, + {tenant: "beta", message: "scan"}, + {tenant: "alpha", message: "logout"} +group tenant { + to_file f"/tmp/tenzir/{$group}.json" { + write_ndjson + } +} +``` + +## See Also + +- each +- fork +- load_balance +- parallel +- summarize +- routing/fan-out-with-subpipelines diff --git a/src/content/docs/reference/operators/load_balance.mdx b/src/content/docs/reference/operators/load_balance.mdx index b13403a4f..082548c56 100644 --- a/src/content/docs/reference/operators/load_balance.mdx +++ b/src/content/docs/reference/operators/load_balance.mdx @@ -74,5 +74,7 @@ load_balance $cfg { ## See Also - fork +- group - publish +- routing/fan-out-with-subpipelines - routing/load-balance-pipelines diff --git a/src/content/docs/reference/operators/parallel.mdx b/src/content/docs/reference/operators/parallel.mdx index cb1588734..a9c468556 100644 --- a/src/content/docs/reference/operators/parallel.mdx +++ b/src/content/docs/reference/operators/parallel.mdx @@ -102,4 +102,7 @@ parallel 4 { ## See Also +- each +- group - load_balance +- routing/fan-out-with-subpipelines diff --git a/src/content/docs/reference/operators/summarize.mdx b/src/content/docs/reference/operators/summarize.mdx index 3b8719dff..b3fccc9c6 100644 --- a/src/content/docs/reference/operators/summarize.mdx +++ b/src/content/docs/reference/operators/summarize.mdx @@ -171,6 +171,7 @@ summarize count(this), severity, options={frequency: 10s, mode: "update"} ## See Also +- group - rare - top - sum diff --git a/src/sidebar.ts b/src/sidebar.ts index f695f91cb..51d842dbe 100644 --- a/src/sidebar.ts +++ b/src/sidebar.ts @@ -153,6 +153,7 @@ export const guides = [ "guides/routing/send-to-destinations", "guides/routing/expose-data-as-server", "guides/routing/split-and-merge-streams", + "guides/routing/fan-out-with-subpipelines", "guides/routing/load-balance-pipelines", ], },