Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 79 additions & 9 deletions docs/global-aggregation.md
Original file line number Diff line number Diff line change
@@ -1,33 +1,103 @@
# Global Aggregation

## Overview
## Overview

In Timeplus, we define global aggregation as an aggregation query without using streaming windows like tumble, hop. Unlike streaming window aggregation, global streaming aggregation doesn't slice
the unbound streaming data into windows according to timestamp, instead it processes the unbounded streaming data as one huge big global window. Due to this property, Timeplus for now can't
recycle in-memory aggregation states / results according to timestamp for global aggregation.
**Global aggregation** refers to running an aggregation query **without using streaming windows** such as `TUMBLE`, `HOP`, or `SESSION`.

Unlike windowed aggregations, global aggregation does not slice unbounded streaming data into time-based windows. Instead, it treats the entire unbounded stream as a **single global window**.

With global aggregation:
- The query continuously updates aggregation results over all incoming data.
- Users don’t need to worry about **late events**, since there are no time windows to close.

## Syntax

```sql
SELECT <column_name1>, <column_name2>, <aggr_function>
FROM <stream_name>
[WHERE clause]
EMIT PERIODIC [<n><UNIT>]
[WHERE <condition>]
GROUP BY col1, col2, ...
EMIT PERIODIC <n><UNIT>
```

`PERIODIC <n><UNIT>` tells Timeplus to emit the aggregation periodically. `UNIT` can be ms(millisecond), s(second), m(minute),h(hour),d(day).`<n>` shall be an integer greater than 0.
The `EMIT PERIODIC <n><UNIT>` clause tells Timeplus to periodically emit aggregation results.
- `<n>` must be an integer greater than 0.
- `<UNIT>` can be one of:

Examples
- ms (milliseconds)
- s (seconds)
- m (minutes)
- h (hours)

**Examples:**
```sql
SELECT device, count(*)
FROM device_utils
WHERE cpu_usage > 99
GROUP BY device
EMIT PERIODIC 5s
```

Like in [Streaming Tail](/query-syntax#streaming-tailing), Timeplus continuously monitors new events in the stream `device_utils`, does the filtering and then continuously does **incremental** count aggregation. Whenever the specified delay interval is up, project the current aggregation result to clients.
In this example:
- The query continuously monitors new events in the stream `device_utils`.
- It filters rows where cpu_usage > 99.
- An **incremental count aggregation** is maintained per `device`.
- Every **5 seconds**, the current aggregation result is emitted to clients.

## TTL of Aggregation Keys

Global aggregation does not automatically garbage-collect intermediate aggregation states after each emit.
If the grouping keys grow monotonically over time (for example, when timestamps are part of the key), memory usage can eventually **blow up**.

To address this challenge, you can use a **hybrid hash table** for aggregation states:
- Hot keys are kept in memory.
- Cold keys are spilled to disk using an LRU-like algorithm.
- Combined with a **TTL for keys**, this approach provides the best of both worlds:
- Handles very late events.
- Prevents unbounded memory growth.

**Example:**

```sql
CREATE STREAM device_utils(
location string,
device string,
cpu float32
) SETTINGS shards=3;

SELECT
to_start_of_interval(_tp_time, 5m) AS bucket_window_start,
location,
device,
min(cpu),
max(cpu),
avg(cpu)
FROM device_utils
SHUFFLE BY location
GROUP BY bucket_window_start, location, device
EMIT ON UPDATE WITH BATCH 1s
SETTINGS
num_target_shards=8,
default_hash_table='hybrid',
max_hot_keys=100000,
aggregate_state_ttl_sec=3600;
```

- This query performs a **global aggregation** to calculate CPU metrics in **5-minute buckets per device**.
- The grouping key includes `bucket_window_start`, which increases monotonically with time.
- The hybrid hash table is enabled via `default_hash_table='hybrid'`.
- Keeps up to `100,000` hot keys in memory per substream.
- Cold keys are spilled to disk automatically.
- The TTL is set to `3600` seconds (`aggregate_state_ttl_sec=3600`):
- Keys not updated for an hour are garbage-collected from disk.
- Prevents infinite state accumulation.
- Data shuffling is enabled (SHUFFLE BY location) for better **parallelism and memory efficiency**.
- See [Data Shuffle](/shuffle-data) for more details.

## Emit Policies

Global aggregation supports different `emit policies` to control when you like to get the intermidiate results pushing out.

### EMIT PERIODIC {#emit_periodic}

`PERIODIC <n><UNIT>` tells Timeplus to emit the aggregation periodically. `UNIT` can be ms(millisecond), s(second), m(minute),h(hour),d(day).`<n>` shall be an integer greater than 0.
Expand Down