Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 118 additions & 76 deletions docs/tumble-aggregation.md
Original file line number Diff line number Diff line change
@@ -1,141 +1,183 @@
# Tumble Window Aggregation {#tumble_window}
# Tumble Window Aggregation

## Overview
Tumble slices the unbounded data into different windows according to its parameters. Internally, Timeplus observes the data streaming and automatically decides when to close a sliced window and emit the final results for that window.

A **tumbling window** divides an unbounded data stream into **fixed-size, non-overlapping intervals** based on event time or processing time.

Each event belongs to **exactly one** window. This is useful for periodic aggregations such as per-minute averages, hourly counts, or daily summaries.

Unlike **hopping** or **session** windows, tumbling windows do not overlap — once a window closes, a new one immediately starts.
This makes them simple, deterministic, and ideal for producing periodic reports or metrics.

## Syntax

```sql
SELECT <column_name1>, <column_name2>, <aggr_function>
FROM tumble(<stream_name>, [<timestamp_column>], <tumble_window_size>, [<time_zone>])
SELECT <grouping-keys>, <aggr_functions>
FROM tumble(<stream-name>, [<timestamp-column>], <window-size>])
[WHERE clause]
GROUP BY [window_start | window_end], ...
EMIT <window_emit_policy>
GROUP BY [window_start | window_end], <other-group-keys> ...
EMIT <emit-policy>
SETTINGS <key1>=<value1>, <key2>=<value2>, ...
```

Tumble window means a fixed non-overlapped time window. Here is one example for a 5 seconds tumble window:
### Parameters

- `<stream-name>` : the source stream the tumble window applied to. **Required**
- `<timestamp-column>` : the event timestamp column which is used to calculate window starts / ends and internal watermark. You can use `now()` or `now64(3)` to enable processing time tumble window. Default is `_tp_time` if absent. **Optional**
- `<window-size>` : tumble window interval size. Supported interval units are listed below. **Required**
- `s` : second
- `m` : miniute
- `h` : hour
- `d` : day
- `w` : week

### Example

```
["2020-01-01 00:00:00", "2020-01-01 00:00:05]
["2020-01-01 00:00:05", "2020-01-01 00:00:10]
["2020-01-01 00:00:10", "2020-01-01 00:00:15]
...
CREATE STREAM device_metrics (
device_id string,
cpu_usage float,
event_time datetime64(3)
);

SELECT
window_start,
window_end,
device_id,
avg(cpu_usage) AS avg_cpu
FROM tumble(device_metrics, event_time, 5s)
GROUP BY
window_start,
device_id
EMIT AFTER WINDOW CLOSE;
```

`tumble` window in Timeplus is left closed and right open `[)` meaning it includes all events which have timestamps **greater or equal** to the **lower bound** of the window, but **less** than the **upper bound** of the window.
**Explanation**:
- Events are grouped into **5-second, non-overlapping windows** based on their `event_time`.
- Each `device_id`’s events are aggregated independently within each window.
- When a window closes, the system emits one aggregated result per device with the computed `avg_cpu`.

`tumble` in the above SQL spec is a table function whose core responsibility is assigning tumble window to each event in a streaming way. The `tumble` table function will generate 2 new columns: `window_start, window_end` which correspond to the low and high bounds of a tumble window.
**Example timeline**:

`tumble` table function accepts 4 parameters: `<timestamp_column>` and `<time-zone>` are optional, the others are mandatory.
| Window | Time Range | Events Included |
| :----: | :------------------ | :----------------------------- |
| W1 | 00:00:00 → 00:00:05 | Events in [00:00:00, 00:00:05) |
| W2 | 00:00:05 → 00:00:10 | Events in [00:00:05, 00:00:10) |
| W3 | 00:00:10 → 00:00:15 | Events in [00:00:10, 00:00:15) |

When the `<timestamp_column>` parameter is omitted from the query, the stream's default event timestamp column which is `_tp_time` will be used.
![TumbleWindow](/img/tumble-window.png)

When the `<time_zone>` parameter is omitted the system's default timezone will be used. `<time_zone>` is a string type parameter, for example `UTC`.
Each event falls into exactly one window, ensuring deterministic aggregation and predictable output intervals.

`<tumble_window_size>` is an interval parameter: `<n><UNIT>` where `<UNIT>` supports `s`, `m`, `h`, `d`, `w`.
It doesn't yet support `M`, `q`, `y`. For example, `tumble(my_stream, 5s)`.
## Emit Policies

More concrete examples:
Emit policies define **when** Timeplus should output results from **time-windowed** aggregations such as **tumble**, **hop**, **session** and **global-windowed** aggregation.

```sql
SELECT device, max(cpu_usage)
FROM tumble(device_utils, 5s)
GROUP BY device, window_end
```
These policies control whether results are emitted **only after the window closes**, **after a delay** to honor late events, or **incrementally during the window**.

### `EMIT AFTER WINDOW CLOSE`

The above example SQL continuously aggregates max cpu usage per device per tumble window for the stream `devices_utils`. Every time a window is closed, Timeplus Proton emits the aggregation results.
This is the **default behavior** for all time window aggregations. Timeplus emits the aggregated results once the window closes.

Let's change `tumble(stream, 5s)` to `tumble(stream, timestmap, 5s)` :
**Example**:

```sql
SELECT device, max(cpu_usage)
FROM tumble(devices, timestamp, 5s)
GROUP BY device, window_end
EMIT AFTER WINDOW CLOSE WITH DELAY 2s;
SELECT window_start, device, max(cpu_usage)
FROM tumble(device_utils, 5s)
GROUP BY window_start, device;
```

Same as the above delayed tumble window aggregation, except in this query, user specifies a **specific time column** `timestamp` for tumble windowing.
This query continuously computes the **maximum CPU usage** per device in every 5-second tumble window from the stream `device_utils`.
Each time a window closes (as determined by the internal watermark), Timeplus emits the results once.

The example below is so called processing time processing which uses wall clock time to assign windows. Timeplus internally processes `now/now64` in a streaming way.
:::info
A watermark is an internal timestamp that advances monotonically per stream, determining when a window can be safely closed.
:::

```sql
SELECT device, max(cpu_usage)
FROM tumble(devices, now64(3, 'UTC'), 5s)
GROUP BY device, window_end
EMIT AFTER WINDOW CLOSE WITH DELAY 2s;
```
### `EMIT AFTER WINDOW CLOSE WITH DELAY`

## Emit Policies

### EMIT AFTER WINDOW CLOSE {#emit_after}
Adds a **delay period** before emitting window results, allowing **late events** to be included.

You can omit `EMIT AFTER WINDOW CLOSE`, since this is the default behavior for time window aggregations. For example:
**Example**:

```sql
SELECT device, max(cpu_usage)
SELECT window_start, device, max(cpu_usage)
FROM tumble(device_utils, 5s)
GROUP BY device, window_end
GROUP BY window_start, device
EMIT AFTER WINDOW CLOSE WITH DELAY 2s;
```

The above example SQL continuously aggregates max cpu usage per device per tumble window for the stream `devices_utils`. Every time a window is closed, Timeplus Proton emits the aggregation results. How to determine the window should be closed? This is done by [Watermark](/understanding-watermark), which is an internal timestamp. It is guaranteed to be increased monotonically per stream query.
This query aggregates the **maximum CPU usage** for each device per 5-second tumble window, then waits 2 additional seconds after the window closes before emitting results. This helps capture any late-arriving events that fall within the window period.

### EMIT AFTER WINDOW CLOSE WITH DELAY {#emit_after_with_delay}
### `EMIT TIMEOUT`

Example:
In some streaming scenarios, the **last tumble window** might remain open because no new events arrive to advance the watermark (i.e., the event time progress).
The **`EMIT TIMEOUT`** clause helps forcefully close such idle windows after a specified period of inactivity.

**Example**:

```sql
SELECT device, max(cpu_usage)
SELECT window_start, device, max(cpu_usage)
FROM tumble(device_utils, 5s)
GROUP BY device, widnow_end
EMIT AFTER WINDOW CLOSE WITH DELAY 2s;
GROUP BY window_start, device
EMIT TIMEOUT 3s;
```

The above example SQL continuously aggregates max cpu usage per device per tumble window for the stream `device_utils`. Every time a window is closed, Timeplus Proton waits for another 2 seconds and then emits the aggregation results.
In this example:

- The query continuously aggregates the maximum CPU usage per device in **5-second tumble windows**.
- If **no new events** arrive for **3 seconds**, Timeplus will **force-close** the most recent window and emit the final results.
- Once the window is closed, the **internal watermark** (event time) advances as well.
- Any **late events** that belong to this now-closed window will be discarded.

### `EMIT ON UPDATE`

### EMIT ON UPDATE {#emit_on_update}
Emits **intermediate aggregation updates** whenever the results change within an open window.
This is useful for near real-time visibility into evolving metrics.

You can apply `EMIT ON UPDATE` in time windows, such as tumble/hop/session, with `GROUP BY` keys. For example:
**Example**:

```sql
SELECT
window_start, cid, count() AS cnt
window_start, device, max(cpu_usage)
FROM
tumble(car_live_data, 5s)
WHERE
cid IN ('c00033', 'c00022')
tumble(device_utils, 5s)
GROUP BY
window_start, cid
EMIT ON UPDATE
window_start, device
EMIT ON UPDATE;
```

During the 5 second tumble window, even the window is not closed, as long as the aggregation value(`cnt`) for the same `cid` is different , the results will be emitted.
Here, during each 5-second window, the system emits updates whenever there are new events flowing into the open window, even before the window closes.

### EMIT ON UPDATE WITH DELAY {#emit_on_update_with_delay}
### `EMIT ON UPDATE WITH BATCH`

Adding the `WITH DELAY` to `EMIT ON UPDATE` will allow late event for the window aggregation.
Combines **periodic emission** with **update-based** triggers.
Timeplus checks the intermediate aggregation results at regular intervals and emits them if they have changed which can significally improve the emit efficiency and throughput compared with `EMIT ON UPDATE`.

**Example**:

```sql
SELECT
window_start, cid, count() AS cnt
window_start, device, max(cpu_usage)
FROM
tumble(car_live_data, 5s)
WHERE
cid IN ('c00033', 'c00022')
tumble(device_utils, 5s)
GROUP BY
window_start, cid
EMIT ON UPDATE WITH DELAY 2s
window_start, device
EMIT ON UPDATE WITH BATCH 1s;
```

### EMIT ON UPDATE WITH BATCH {#emit_on_update_with_batch}
### `EMIT ON UPDATE WITH DELAY`

Similar to **`EMIT ON UPDATE`**, but includes a delay to allow late events before emitting incremental updates.

**Example**:

You can combine `EMIT PERIODIC` and `EMIT ON UPDATE` together. In this case, even the window is not closed, Timeplus will check the intermediate aggregation result at the specified interval and emit rows if the result is changed.
```sql
SELECT
window_start, cid, count() AS cnt
window_start, device, max(cpu_usage)
FROM
tumble(car_live_data, 5s)
WHERE
cid IN ('c00033', 'c00022')
tumble(device_utils, 5s)
GROUP BY
window_start, cid
EMIT ON UPDATE WITH BATCH 2s
window_start, device
EMIT ON UPDATE WITH DELAY 2s;
```
Binary file added static/img/tumble-window.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.