diff --git a/docs/global-aggregation.md b/docs/global-aggregation.md index 85057acd..7fdc4858 100644 --- a/docs/global-aggregation.md +++ b/docs/global-aggregation.md @@ -1,35 +1,27 @@ # Global Aggregation -## Overview +## Overview -**Global aggregation** refers to running an aggregation query **without using streaming windows** such as `TUMBLE`, `HOP`, or `SESSION`. +**Global aggregation** refers to running an aggregation query **without using time-based windows** such as [tumble](/tumble-aggregation), [hop](/hop-aggregation), or [session](/session-aggregation). -Unlike windowed aggregations, global aggregation does not slice unbounded streaming data into time-based windows. Instead, it treats the entire unbounded stream as a **single global window**. +Unlike windowed aggregations that slice unbounded streams into discrete windows, **global aggregation** treats the entire stream as **a single continuous window**. -With global aggregation: -- The query continuously updates aggregation results over all incoming data. -- Users don’t need to worry about **late events**, since there are no time windows to close. +With global aggregation: +- The query continuously updates aggregation results as new events arrive. +- There is **no concept of window close**, so late events are naturally handled without additional logic. +- It is ideal for tracking long-running (life-time) metrics such as total counts, averages, or unique users across an entire stream against all historical data. ## Syntax ```sql -SELECT , , +SELECT , FROM [WHERE ] -GROUP BY col1, col2, ... -EMIT PERIODIC +GROUP BY , , ... +EMIT ``` -The `EMIT PERIODIC ` clause tells Timeplus to periodically emit aggregation results. -- `` must be an integer greater than 0. -- `` can be one of: - -- `ms` (milliseconds) -- `s` (seconds) -- `m` (minutes) -- `h` (hours) - -**Examples:** +**Example**: ```sql SELECT device, count(*) FROM device_utils @@ -38,25 +30,26 @@ GROUP BY device EMIT PERIODIC 5s ``` -In this example: -- The query continuously monitors new events in the stream `device_utils`. -- It filters rows where cpu_usage > 99. -- An **incremental count aggregation** is maintained per `device`. -- Every **5 seconds**, the current aggregation result is emitted to clients. +**Explanation**: +- The query monitors new events from the stream `device_utils`. +- It filters rows where `cpu_usage > 99`. +- An **incremental count** is maintained per device. +- Every **5 seconds**, the latest count per device is emitted to clients. -## TTL of Aggregation Keys +## TTL of Aggregation Keys -Global aggregation does not automatically garbage-collect intermediate aggregation states after each emit. -If the grouping keys grow monotonically over time (for example, when timestamps are part of the key), memory usage can eventually **blow up**. +Global aggregations do not automatically garbage-collect intermediate states after each emission by default. +If the grouping keys increase continuously (for example, by including timestamps), the aggregation state can grow indefinitely. -To address this challenge, you can use a **hybrid hash table** for aggregation states: -- Hot keys are kept in memory. -- Cold keys are spilled to disk using an LRU-like algorithm. -- Combined with a **TTL for keys**, this approach provides the best of both worlds: - - Handles very late events. - - Prevents unbounded memory growth. +To handle this, Timeplus supports a **hybrid hash table** that combines in-memory and on-disk state management: -**Example:** +- **Hot keys** (recently active) are stored in memory. +- **Cold keys** (inactive or rarely updated) are spilled to disk using an LRU-like algorithm. +- Combined with **TTL-based cleanup**, this approach has these benifits: + - Support for very late events. + - Controlled memory usage for long-running queries. + +**Example**: ```sql CREATE STREAM device_utils( @@ -83,180 +76,169 @@ SETTINGS aggregate_state_ttl_sec=3600; ``` -- This query performs a **global aggregation** to calculate CPU metrics in **5-minute buckets per device**. -- The grouping key includes `bucket_window_start`, which increases monotonically with time. -- The hybrid hash table is enabled via `default_hash_table='hybrid'`. - - Keeps up to `100,000` hot keys in memory per substream. - - Cold keys are spilled to disk automatically. -- The TTL is set to `3600` seconds (`aggregate_state_ttl_sec=3600`): - - Keys not updated for an hour are garbage-collected from disk. - - Prevents infinite state accumulation. -- Data shuffling is enabled (SHUFFLE BY location) for better **parallelism and memory efficiency**. - - See [Data Shuffle](/shuffle-data) for more details. +**Explanation**: + +- This query performs a **global aggregation** that computes CPU metrics for each device in 5-minute intervals. +- The grouping key includes `bucket_window_start`, `location`, and `device`. The grouping key cardinality is monotoincally increasing as time goes. +- The hybrid hash table manages the monotoincally increasing state efficiently: + - Up to `100,000` hot keys per substream remain in memory. + - Inactive keys are spilled to disk automatically. +- Aggregation states are cleaned up after 1 hour (aggregate_state_ttl_sec=3600) if they are inactive. This effectively honors 1 hour late events. +- `SHUFFLE BY location` improves **parallelism and memory efficiency**. See [Data Shuffle](/shuffle-data) for more details. + +**Internal Pipeline**: -The internal query plan for this hybrid global aggregation looks like: +The internal execution plan for hybrid global aggregation is shown below: ![HybridAggregationPipeline](/img/hybrid-aggregation-pipeline.svg) ## Emit Policies -Global aggregation supports different `emit policies` to control when you like to get the intermidiate results pushing out. +Global aggregation supports multiple **emit policies** that define **when intermediate results** are pushed out. -### EMIT PERIODIC {#emit_periodic} +### `EMIT PERIODIC` -`PERIODIC ` tells Timeplus to emit the aggregation periodically. `UNIT` can be ms(millisecond), s(second), m(minute),h(hour),d(day).`` shall be an integer greater than 0. +Emits aggregation results periodically **when new events arrive**. +This is the **default** emit policy for global aggregation, with a **default interval of 2 seconds**. -Example: +**Syntax** ```sql -SELECT device, count(*) -FROM device_utils -WHERE cpu_usage > 99 -EMIT PERIODIC 5s +EMIT PERIODIC ``` -For [Global Streaming Aggregation](#global) the default periodic emit interval is `2s`, i.e. 2 seconds. - -You can also apply `EMIT PERIODIC` in time windows, such as tumble/hop/session. - -When you run a tumble window aggregation, by default Timeplus will emit results when the window is closed. So `tumble(stream,5s)` will emit results every 5 seconds, unless there is no event in the window to progress the watermark. +**Parameters**: +- `` — positive integer (interval length) +- `` can be one of: + - `ms` (milliseconds) + - `s` (seconds) + - `m` (minutes) + - `h` (hours) -In some cases, you may want to get aggregation results even the window is not closed, so that you can get timely alerts. For example, the following SQL will run a 5-second tumble window and every 1 second, if the number of event is over 300, a row will be emitted. +**Example**: ```sql -SELECT window_start, count() AS cnt -FROM tumble(car_live_data, 5s) -GROUP BY window_start -HAVING cnt > 300 -EMIT PERIODIC 1s +SELECT device, count(*) +FROM device_utils +WHERE cpu_usage > 99 +GROUP BY device +EMIT PERIODIC 5s; ``` -### EMIT PERIODIC REPEAT {#emit_periodic_repeat} +This query emits updated results every 5 seconds if new events are received. -Starting from Timeplus Proton 1.6.2, you can optionally add `REPEAT` to the end of `EMIT PERIODIC `. For global aggregations, by default every 2 seconds, the aggregation result will be emitted. But if there is no new event since last emit, no result will be emitted. With the `REPEAT` at the end of the emit policy, Timeplus will emit results at the fixed interval, even there is no new events since last emit. For example: -```sql -SELECT count() FROM t -EMIT PERIODIC 3s REPEAT -``` +### `EMIT PERIODIC REPEAT` -### EMIT TIMEOUT +For `EMIT PERIODIC`, no results are emitted if there are **no new events** since the last emit. +With the `REPEAT` modifier, Timeplus **emits at a fixed interval**, even when no new data arrives. -You can apply `EMIT TIMEOUT` on global aggregation, e.g. +**Example**: ```sql -SELECT count() FROM t EMIT TIMEOUT 1s; +SELECT device, count(*) +FROM device_utils +WHERE cpu_usage > 99 +GROUP BY device +EMIT PERIODIC 5s REPEAT ``` -It also can be applied to window aggregations and `EMIT AFTER WINDOW CLOSE` is automatically appended, e.g. -```sql -SELECT count() FROM tumble(t,5s) GROUP BY window_start EMIT TIMEOUT 1s; -``` - -### EMIT ON UPDATE {#emit_on_update} +If no new events appear, the last results are still emitted every 5 seconds. + +### `EMIT ON UPDATE` -You can apply `EMIT ON UPDATE` in time windows, such as tumble/hop/session, with `GROUP BY` keys. For example: +Emits intermediate results **immediately** when new events change any aggregation value. This is useful for near real-time visibility into evolving metrics. ```sql -SELECT - window_start, cid, count() AS cnt -FROM - tumble(car_live_data, 5s) -WHERE - cid IN ('c00033', 'c00022') -GROUP BY - window_start, cid -EMIT ON UPDATE +SELECT device, count(*) +FROM device_utils +WHERE cpu_usage > 99 +GROUP BY device +EMIT ON UPDATE; ``` -During the 5 second tumble window, even the window is not closed, as long as the aggregation value(`cnt`) for the same `cid` is different , the results will be emitted. +Each time new events with `cpu_usage > 99` arrive, updated counts are emitted. -### EMIT ON UPDATE WITH BATCH {#emit_on_update_with_batch} - -You can combine `EMIT PERIODIC` and `EMIT ON UPDATE` together. In this case, even the window is not closed, Timeplus will check the intermediate aggregation result at the specified interval and emit rows if the result is changed. -```sql -SELECT - window_start, cid, count() AS cnt -FROM - tumble(car_live_data, 5s) -WHERE - cid IN ('c00033', 'c00022') -GROUP BY - window_start, cid -EMIT ON UPDATE WITH BATCH 2s -``` +### `EMIT ON UPDATE WITH BATCH` -### EMIT AFTER KEY EXPIRE IDENTIFIED BY .. WITH MAXSPAN .. AND TIMEOUT .. {#emit_after_key_expire} +Combines **periodic emission** with **update-based** triggers. +Timeplus checks the intermediate aggregation results at regular intervals and emits them if they have changed which can significally improve the emit efficiency and throughput compared with `EMIT ON UPDATE`. -The syntax is: ```sql -EMIT AFTER KEY EXPIRE [IDENTIFIED BY ] WITH [ONLY] MAXSPAN [AND TIMEOUT ] +SELECT device, count(*) +FROM device_utils +WHERE cpu_usage > 99 +GROUP BY device +EMIT ON UPDATE WITH BATCH 1s; ``` -Note: -* `EMIT AFTER KEY EXPIRE` will emit results when the keys are expired. This EMIT policy ought to be applied to a global aggregation with a primary key as `GROUP BY`, usually using an ID for multiple tracing events. -* `IDENTIFIED BY col` will calculate the span of the trace, usually you can set `IDENTIFIED BY _tp_time`. -* `MAXSPAN interval` to identify whether the span of the related events over a certain interval, for example `MAXSPAN 500ms` to flag those events with same tracing ID but over 0.5 second span. -* `ONLY`: if you add this keyword, then only those events over the `MAXSPAN` will be emitted, other events less than the `MAXSPAN` will be omitted, so that you can focus on those events over the SLA. -* `AND TIMEOUT interval` to avoid waiting for late events for too long. If there is no more events with the same key (e.g. tracing ID) after this interval, Timeplus will close the session for the key and emit results. +This query checks for changes every second and emits results only when updates occur. -It's required to use `SETTINGS default_hash_table='hybrid'` with this emit policy to avoid putting too much data in memory. +### `EMIT AFTER KEY EXPIRE` -Here is an example to get the log streams and only show the events with over 0.5 second as the end-to-end latency. -```sql -WITH grouped AS( - SELECT - trace_id, - min(start_time) AS start_ts, - max(end_time) AS end_ts, - date_diff('ms', start_ts, end_ts) AS span_ms, - group_array(json_encode(span_id, parent_span_id, name, start_time, end_time, attributes)) AS trace_events - FROM otel_traces - GROUP BY trace_id - EMIT AFTER KEY EXPIRE IDENTIFIED BY end_time WITH MAXSPAN 500ms AND TIMEOUT 2s -) -SELECT json_encode(trace_id, start_ts, end_ts, span_ms, trace_events) AS event FROM grouped -SETTINGS default_hash_table='hybrid', max_hot_keys=1000000, allow_independent_shard_processing=true; -``` +Designed for **OpenTelemetry trace analysis** and other similar use cases where you need to track **key lifetimes** across high-cardinality datasets (e.g., trace spans). -### EMIT PER EVENT -This emit policy allows you to emit results for every event in the stream, which can be useful for debugging or monitoring purposes. +This policy emits aggregation results once a key is considered **expired**. -For example, if you create a random stream `market_data` and run: -```sql -select count() from market_data -``` -You will get the count of all events in the stream, every 2 seconds by default. Such as 10, 20, 30, etc. +**Syntax**: -If you want to emit results for every event, you can use: ```sql -select count() from market_data emit per event +EMIT AFTER KEY EXPIRE [IDENTIFIED BY ] WITH [ONLY] MAXSPAN [AND TIMEOUT ] ``` -You will get the count of all events in the stream, every time a new event is added to the stream. Such as 1, 2, 3, 4, 5, etc. -This new emit policy is useful for specific use cases where you want to see the results of your query for every event in the stream. It can be particularly useful for debugging or monitoring purposes, as it allows you to see the results of your query in real-time as new events are added to the stream. +**Parameters**: +* `EMIT AFTER KEY EXPIRE` - enables per-key lifetime tracking. +* `IDENTIFIED BY ` - column used to compute span duration (defaults to **_tp_time** if omitted). +* `MAXSPAN ` - maximum allowed span before emission. +* `ONLY` - emit results only if span exceeds MAXSPAN. +* `TIMEOUT ` - forces emission after inactivity to avoid waiting indefinitely. -For high throughput streams, you may want to use this emit policy with caution, as it can generate a lot of output and may impact the performance of your query. +:::info +Currently must be used with `SETTINGS default_hash_table='hybrid'` to prevent excessive memory usage. +::: -There are some limitations for this emit policy: +**Example**: -It does not support parallel processing, so it may not be suitable for high throughput streams. If there are multiple partitions for the Kafka external stream or multiple shards for the Timeplus stream, this emit policy will not work. - -One workaround is to use `SHUFFLE BY` to shuffle the data into one partition or shard, but this may impact the performance of your query. For example, you can use: ```sql -select type, count() from github_events shuffle by type group by type emit per event; +WITH grouped AS +( + SELECT + trace_id, + min(start_time) AS start_ts, + max(end_time) AS end_ts, + date_diff('ms', start_ts, end_ts) AS span_ms, + group_array(json_encode(span_id, parent_span_id, name, start_time, end_time, attributes)) AS trace_events + FROM otel_traces + SHUFFLE BY trace_id + GROUP BY trace_id + EMIT AFTER KEY EXPIRE IDENTIFIED BY end_time WITH ONLY MAXSPAN 500ms AND TIMEOUT 2s +) +SELECT json_encode(trace_id, start_ts, end_ts, span_ms, trace_events) AS event +FROM grouped +SETTINGS + default_hash_table='hybrid', + max_hot_keys=1000000; ``` -The other possible workaround if the stream's sharding expression is based on id, for example: -```sql -create stream multi_shards_stream(id int, ...) settings shards=3, sharding_expr='weak_hash32(id)'; -``` -In this case, you can set `allow_independent_shard_processing=true` to process in parallel. +**Explanation**: -```sql -SELECT id, count() FROM multi_shards_stream GROUP BY id EMIT PER EVENT -SETTINGS allow_independent_shard_processing=true; -``` +- Tracks `trace_id` events with start/end times. +- Emits results when: + - The span exceeds `MAXSPAN` (500 ms), or + - No new events arrive for `TIMEOUT` (2 s). +- The `ONLY` modifier ensures only traces exceeding the span threshold (500ms) are emitted. +- Expired keys are garbage-collected after emission. -The other limitation is that it does not support substream processing. For example, the following query will not work: +### `EMIT PER EVENT` + +Emits results for **every incoming event**. +This policy is mainly for debugging or low-volume streams, as it can produce very high output. + +**Example**: ```sql -SELECT id, count() FROM single_shard_stream partition by id EMIT PER EVENT +SELECT count() +FROM market_data +EMIT PER EVENT; ``` +Each new event triggers an immediate emission of the updated count: +`1, 2, 3, 4, 5, …` + +Use this mode carefully in high-throughput environments. diff --git a/docs/hop-aggregation.md b/docs/hop-aggregation.md index ced8b25c..5b2fa68e 100644 --- a/docs/hop-aggregation.md +++ b/docs/hop-aggregation.md @@ -125,7 +125,7 @@ In this example: ### `EMIT ON UPDATE` -Emits **intermediate aggregation updates** whenever the results change within an open window. +Emits **intermediate aggregation updates** whenever there are new events flowing in an open window. This is useful for near real-time visibility into evolving metrics. **Example**: diff --git a/docs/tumble-aggregation.md b/docs/tumble-aggregation.md index 324b88ae..6f6d4f3e 100644 --- a/docs/tumble-aggregation.md +++ b/docs/tumble-aggregation.md @@ -131,7 +131,7 @@ In this example: ### `EMIT ON UPDATE` -Emits **intermediate aggregation updates** whenever the results change within an open window. +Emits **intermediate aggregation updates** whenever there are new events flowing in an open window. This is useful for near real-time visibility into evolving metrics. **Example**: