From af1c05cb342346d339cac28141bae78b1bda6549 Mon Sep 17 00:00:00 2001 From: Ken Chen Date: Wed, 1 Oct 2025 09:14:48 -0700 Subject: [PATCH] refine global aggregation --- docs/global-aggregation.md | 88 ++++++++++++++++++++++++++++++++++---- 1 file changed, 79 insertions(+), 9 deletions(-) diff --git a/docs/global-aggregation.md b/docs/global-aggregation.md index f67e9a66..2a3bc07b 100644 --- a/docs/global-aggregation.md +++ b/docs/global-aggregation.md @@ -1,33 +1,103 @@ # Global Aggregation -## Overview +## Overview -In Timeplus, we define global aggregation as an aggregation query without using streaming windows like tumble, hop. Unlike streaming window aggregation, global streaming aggregation doesn't slice -the unbound streaming data into windows according to timestamp, instead it processes the unbounded streaming data as one huge big global window. Due to this property, Timeplus for now can't -recycle in-memory aggregation states / results according to timestamp for global aggregation. +**Global aggregation** refers to running an aggregation query **without using streaming windows** such as `TUMBLE`, `HOP`, or `SESSION`. + +Unlike windowed aggregations, global aggregation does not slice unbounded streaming data into time-based windows. Instead, it treats the entire unbounded stream as a **single global window**. + +With global aggregation: +- The query continuously updates aggregation results over all incoming data. +- Users don’t need to worry about **late events**, since there are no time windows to close. + +## Syntax ```sql SELECT , , FROM -[WHERE clause] -EMIT PERIODIC [] +[WHERE ] +GROUP BY col1, col2, ... +EMIT PERIODIC ``` -`PERIODIC ` tells Timeplus to emit the aggregation periodically. `UNIT` can be ms(millisecond), s(second), m(minute),h(hour),d(day).`` shall be an integer greater than 0. +The `EMIT PERIODIC ` clause tells Timeplus to periodically emit aggregation results. +- `` must be an integer greater than 0. +- `` can be one of: -Examples +- ms (milliseconds) +- s (seconds) +- m (minutes) +- h (hours) +**Examples:** ```sql SELECT device, count(*) FROM device_utils WHERE cpu_usage > 99 +GROUP BY device EMIT PERIODIC 5s ``` -Like in [Streaming Tail](/query-syntax#streaming-tailing), Timeplus continuously monitors new events in the stream `device_utils`, does the filtering and then continuously does **incremental** count aggregation. Whenever the specified delay interval is up, project the current aggregation result to clients. +In this example: +- The query continuously monitors new events in the stream `device_utils`. +- It filters rows where cpu_usage > 99. +- An **incremental count aggregation** is maintained per `device`. +- Every **5 seconds**, the current aggregation result is emitted to clients. + +## TTL of Aggregation Keys + +Global aggregation does not automatically garbage-collect intermediate aggregation states after each emit. +If the grouping keys grow monotonically over time (for example, when timestamps are part of the key), memory usage can eventually **blow up**. + +To address this challenge, you can use a **hybrid hash table** for aggregation states: +- Hot keys are kept in memory. +- Cold keys are spilled to disk using an LRU-like algorithm. +- Combined with a **TTL for keys**, this approach provides the best of both worlds: + - Handles very late events. + - Prevents unbounded memory growth. + +**Example:** + +```sql +CREATE STREAM device_utils( + location string, + device string, + cpu float32 +) SETTINGS shards=3; + +SELECT + to_start_of_interval(_tp_time, 5m) AS bucket_window_start, + location, + device, + min(cpu), + max(cpu), + avg(cpu) +FROM device_utils +SHUFFLE BY location +GROUP BY bucket_window_start, location, device +EMIT ON UPDATE WITH BATCH 1s +SETTINGS + num_target_shards=8, + default_hash_table='hybrid', + max_hot_keys=100000, + aggregate_state_ttl_sec=3600; +``` + +- This query performs a **global aggregation** to calculate CPU metrics in **5-minute buckets per device**. +- The grouping key includes `bucket_window_start`, which increases monotonically with time. +- The hybrid hash table is enabled via `default_hash_table='hybrid'`. + - Keeps up to `100,000` hot keys in memory per substream. + - Cold keys are spilled to disk automatically. +- The TTL is set to `3600` seconds (`aggregate_state_ttl_sec=3600`): + - Keys not updated for an hour are garbage-collected from disk. + - Prevents infinite state accumulation. +- Data shuffling is enabled (SHUFFLE BY location) for better **parallelism and memory efficiency**. + - See [Data Shuffle](/shuffle-data) for more details. ## Emit Policies +Global aggregation supports different `emit policies` to control when you like to get the intermidiate results pushing out. + ### EMIT PERIODIC {#emit_periodic} `PERIODIC ` tells Timeplus to emit the aggregation periodically. `UNIT` can be ms(millisecond), s(second), m(minute),h(hour),d(day).`` shall be an integer greater than 0.