Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 122 additions & 66 deletions docs/hop-aggregation.md
Original file line number Diff line number Diff line change
@@ -1,120 +1,176 @@
# Hop Window Aggregation {#hop_window}
# Hop Window Aggregation

## Overview

Like [Tumble](#tumble), Hop also slices the unbounded streaming data into smaller windows, and it has an additional sliding step.
A **hop window** (also known as a **sliding window**) is a type of time-based window that allows data to be grouped into **overlapping segments**.

Each hop window is defined by two parameters:
- **Window size** – the total duration of each window.
- **Hop interval** – how often a new window starts.

Because hop windows can overlap, a single event can belong to multiple windows. This is useful when you want to generate **smoother, more continuous aggregations** (e.g., moving averages or rolling counts).

For example, with a 10-minute window size and a 5-minute hop interval, a new window starts every 5 minutes and spans 10 minutes of data — meaning there are always **two overlapping windows** active at any given time.


## Syntax

```sql
SELECT <column_name1>, <column_name2>, <aggr_function>
FROM hop(<stream_name>, [<timestamp_column>], <hop_slide_size>, [hop_windows_size], [<time_zone>])
SELECT <grouping-keys>, <aggr-functions>
FROM hop(<stream-name>, [<timestamp-column>], <hop-interval>, <window-size>)
[WHERE clause]
GROUP BY [<window_start | window_end>], ...
EMIT <window_emit_policy>
SETTINGS <key1>=<value1>, <key2>=<value2>, ...
GROUP BY [<window_start | window_end>], <other-group-keys> ...
EMIT <emit-policy>
```

Hop window is a more generalized window compared to tumble window. Hop window has an additional
parameter called `<hop_slide_size>` which means window progresses this slide size every time. There are 3 cases:
### Parameters

1. `<hop_slide_size>` is less than `<hop_window_size>`. Hop windows have overlaps meaning an event can fall into several hop windows.
2. `<hop_slide_size>` is equal to `<hop_window_size>`. Degenerated to a tumble window.
3. `<hop_slide_size>` is greater than `<hop_window_size>`. Windows has a gap in between. Usually not useful, hence not supported so far.
- `<stream-name>` : the source stream the hop window applies to. **Required**
- `<timestamp-column>` : the event timestamp column which is used to calculate window starts / ends and internal watermark. You can use `now()` or `now64(3)` to enable processing time hop window. Default is `_tp_time` if absent. **Optional**
- `<hop-interval>` : how frequently new windows start (must be less than or equal to the window size). Supported interval units are listed below. **Required**
- `s` : second
- `m` : miniute
- `h` : hour
- `d` : day
- `w` : week
- `<window-size>` : hop window interval size. Supported interval units are listed below. **Required**
- `s` : second
- `m` : miniute
- `h` : hour
- `d` : day
- `w` : week

Please note, at this point, you need to use the same time unit in `<hop_slide_size>` and `<hop_window_size>`, for example `hop(device_utils, 1s, 60s)` instead of `hop(device_utils, 1s, 1m)`.
### Example

Here is one hop window example which has 2 seconds slide and 5 seconds hop window.
The following query calculates the average CPU usage of each device in **10-second hop windows** that start every **4 second**.

```sql
CREATE STREAM device_metrics (
device string,
cpu_usage float,
event_time datetime64(3)
);

SELECT
window_start,
window_end,
device,
max(cpu_usage)
FROM hop(device_metrics, event_time, 4s, 10s)
GROUP BY window_start, window_end, device;
```
["2020-01-01 00:00:00", "2020-01-01 00:00:05]
["2020-01-01 00:00:02", "2020-01-01 00:00:07]
["2020-01-01 00:00:04", "2020-01-01 00:00:09]
["2020-01-01 00:00:06", "2020-01-01 00:00:11]
...
```

Except that the hop window can have overlaps, other semantics are identical to the tumble window.
This allows you to track metrics like CPU usage in a rolling fashion, providing near-real-time insight into recent activity rather than discrete, non-overlapping periods.

## Emit Policies

Emit policies define **when** Timeplus should output results from **time-windowed** aggregations such as **tumble**, **hop**, **session** and **global-windowed** aggregation.

These policies control whether results are emitted **only after the window closes**, **after a delay** to honor late events, or **incrementally during the window**.

### `EMIT AFTER WINDOW CLOSE`

This is the **default behavior** for all time window aggregations. Timeplus emits the aggregated results once the window closes.

**Example**:

```sql
SELECT device, max(cpu_usage)
FROM hop(device_utils, 2s, 5s)
GROUP BY device, window_end
EMIT AFTER WINDOW CLOSE;
SELECT window_start, device, max(cpu_usage)
FROM hop(device_metrics, 4s, 10s)
GROUP BY window_start, device;
```

The above example SQL continuously aggregates max cpu usage per device per hop window for stream `device_utils`. Every time a window is closed, Timeplus emits the aggregation results.

## Emit Policies
This query continuously computes the **maximum CPU usage** per device in every 10-second hop window from the stream `device_metrics` and every 4 second, it starts a new hop window.
Each time a window closes (as determined by the internal watermark), Timeplus emits the results once.

:::info
A watermark is an internal timestamp that advances monotonically per stream, determining when a window can be safely closed.
:::

### EMIT AFTER WINDOW CLOSE {#emit_after}
### `EMIT AFTER WINDOW CLOSE WITH DELAY`

You can omit `EMIT AFTER WINDOW CLOSE`, since this is the default behavior for time window aggregations. For example:
Adds a **delay period** before emitting window results, allowing **late events** to be included.

**Example**:

```sql
SELECT device, max(cpu_usage)
FROM tumble(device_utils, 5s)
GROUP BY device, window_end
SELECT window_start, device, max(cpu_usage)
FROM hop(device_metrics, 4s, 10s)
GROUP BY window_start, device
EMIT AFTER WINDOW CLOSE WITH DELAY 2s;
```

The above example SQL continuously aggregates max cpu usage per device per tumble window for the stream `devices_utils`. Every time a window is closed, Timeplus Proton emits the aggregation results. How to determine the window should be closed? This is done by [Watermark](/understanding-watermark), which is an internal timestamp. It is guaranteed to be increased monotonically per stream query.
This query aggregates the **maximum CPU usage** for each device per 10-second hop window, then waits 2 additional seconds after the window closes before emitting results. This helps capture any late-arriving events that fall within the window period.

### `EMIT TIMEOUT`

### EMIT AFTER WINDOW CLOSE WITH DELAY {#emit_after_with_delay}
In some streaming scenarios, the **last hop windows** might remain open because no new events arrive to advance the watermark (i.e., the event time progress).
The **`EMIT TIMEOUT`** clause helps forcefully close such idle windows after a specified period of inactivity.

Example:
**Example**:

```sql
SELECT device, max(cpu_usage)
FROM tumble(device_utils, 5s)
GROUP BY device, widnow_end
EMIT AFTER WINDOW CLOSE WITH DELAY 2s;
SELECT window_start, device, max(cpu_usage)
FROM hop(device_metrics, 4s, 10s)
GROUP BY window_start, device
EMIT TIMEOUT 3s;
```

The above example SQL continuously aggregates max cpu usage per device per tumble window for the stream `device_utils`. Every time a window is closed, Timeplus Proton waits for another 2 seconds and then emits the aggregation results.
In this example:

- The query continuously aggregates the maximum CPU usage per device in **10-second hop windows**.
- If **no new events** arrive for **3 seconds**, Timeplus will **force-close** the most recent windows and emit the final results.
- Once the window is closed, the **internal watermark** (event time) advances as well.
- Any **late events** that belong to this now-closed window will be discarded.

### `EMIT ON UPDATE`

### EMIT ON UPDATE {#emit_on_update}
Emits **intermediate aggregation updates** whenever the results change within an open window.
This is useful for near real-time visibility into evolving metrics.

You can apply `EMIT ON UPDATE` in time windows, such as tumble/hop/session, with `GROUP BY` keys. For example:
**Example**:

```sql
SELECT
window_start, cid, count() AS cnt
window_start, device, max(cpu_usage)
FROM
tumble(car_live_data, 5s)
WHERE
cid IN ('c00033', 'c00022')
hop(device_devices, 4s, 10s)
GROUP BY
window_start, cid
EMIT ON UPDATE
window_start, device
EMIT ON UPDATE;
```

During the 5 second tumble window, even the window is not closed, as long as the aggregation value(`cnt`) for the same `cid` is different , the results will be emitted.
Here, during each 10-second window, the system emits updates whenever there are new events flowing into the open window, even before the window closes.

### EMIT ON UPDATE WITH DELAY {#emit_on_update_with_delay}
### `EMIT ON UPDATE WITH BATCH`

Adding the `WITH DELAY` to `EMIT ON UPDATE` will allow late event for the window aggregation.
Combines **periodic emission** with **update-based** triggers.
Timeplus checks the intermediate aggregation results at regular intervals and emits them if they have changed which can significally improve the emit efficiency and throughput compared with `EMIT ON UPDATE`.

**Example**:

```sql
SELECT
window_start, cid, count() AS cnt
window_start, device, max(cpu_usage)
FROM
tumble(car_live_data, 5s)
WHERE
cid IN ('c00033', 'c00022')
hop(device_metrics, 4s, 10s)
GROUP BY
window_start, cid
EMIT ON UPDATE WITH DELAY 2s
window_start, device
EMIT ON UPDATE WITH BATCH 1s;
```

### EMIT ON UPDATE WITH BATCH {#emit_on_update_with_batch}
### `EMIT ON UPDATE WITH DELAY`

Similar to **`EMIT ON UPDATE`**, but includes a delay to allow late events before emitting incremental updates.

**Example**:

You can combine `EMIT PERIODIC` and `EMIT ON UPDATE` together. In this case, even the window is not closed, Timeplus will check the intermediate aggregation result at the specified interval and emit rows if the result is changed.
```sql
SELECT
window_start, cid, count() AS cnt
window_start, device, max(cpu_usage)
FROM
tumble(car_live_data, 5s)
WHERE
cid IN ('c00033', 'c00022')
hop(device_metrics, 4s, 10s)
GROUP BY
window_start, cid
EMIT ON UPDATE WITH BATCH 2s
window_start, device
EMIT ON UPDATE WITH DELAY 2s;
```
27 changes: 13 additions & 14 deletions docs/tumble-aggregation.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,16 @@ This makes them simple, deterministic, and ideal for producing periodic reports
## Syntax

```sql
SELECT <grouping-keys>, <aggr_functions>
SELECT <grouping-keys>, <aggr-functions>
FROM tumble(<stream-name>, [<timestamp-column>], <window-size>])
[WHERE clause]
GROUP BY [window_start | window_end], <other-group-keys> ...
EMIT <emit-policy>
SETTINGS <key1>=<value1>, <key2>=<value2>, ...
```

### Parameters

- `<stream-name>` : the source stream the tumble window applied to. **Required**
- `<stream-name>` : the source stream the tumble window applies to. **Required**
- `<timestamp-column>` : the event timestamp column which is used to calculate window starts / ends and internal watermark. You can use `now()` or `now64(3)` to enable processing time tumble window. Default is `_tp_time` if absent. **Optional**
- `<window-size>` : tumble window interval size. Supported interval units are listed below. **Required**
- `s` : second
Expand All @@ -35,26 +34,26 @@ SETTINGS <key1>=<value1>, <key2>=<value2>, ...

```
CREATE STREAM device_metrics (
device_id string,
device string,
cpu_usage float,
event_time datetime64(3)
);

SELECT
window_start,
window_end,
device_id,
device,
avg(cpu_usage) AS avg_cpu
FROM tumble(device_metrics, event_time, 5s)
GROUP BY
window_start,
device_id
device
EMIT AFTER WINDOW CLOSE;
```

**Explanation**:
- Events are grouped into **5-second, non-overlapping windows** based on their `event_time`.
- Each `device_id`’s events are aggregated independently within each window.
- Each `device`’s events are aggregated independently within each window.
- When a window closes, the system emits one aggregated result per device with the computed `avg_cpu`.

**Example timeline**:
Expand Down Expand Up @@ -83,11 +82,11 @@ This is the **default behavior** for all time window aggregations. Timeplus emit

```sql
SELECT window_start, device, max(cpu_usage)
FROM tumble(device_utils, 5s)
FROM tumble(device_metrics, 5s)
GROUP BY window_start, device;
```

This query continuously computes the **maximum CPU usage** per device in every 5-second tumble window from the stream `device_utils`.
This query continuously computes the **maximum CPU usage** per device in every 5-second tumble window from the stream `device_metrics`.
Each time a window closes (as determined by the internal watermark), Timeplus emits the results once.

:::info
Expand All @@ -102,7 +101,7 @@ Adds a **delay period** before emitting window results, allowing **late events**

```sql
SELECT window_start, device, max(cpu_usage)
FROM tumble(device_utils, 5s)
FROM tumble(device_metrics, 5s)
GROUP BY window_start, device
EMIT AFTER WINDOW CLOSE WITH DELAY 2s;
```
Expand All @@ -118,7 +117,7 @@ The **`EMIT TIMEOUT`** clause helps forcefully close such idle windows after a s

```sql
SELECT window_start, device, max(cpu_usage)
FROM tumble(device_utils, 5s)
FROM tumble(device_metrics, 5s)
GROUP BY window_start, device
EMIT TIMEOUT 3s;
```
Expand All @@ -141,7 +140,7 @@ This is useful for near real-time visibility into evolving metrics.
SELECT
window_start, device, max(cpu_usage)
FROM
tumble(device_utils, 5s)
tumble(device_metrics, 5s)
GROUP BY
window_start, device
EMIT ON UPDATE;
Expand All @@ -160,7 +159,7 @@ Timeplus checks the intermediate aggregation results at regular intervals and em
SELECT
window_start, device, max(cpu_usage)
FROM
tumble(device_utils, 5s)
tumble(device_metrics, 5s)
GROUP BY
window_start, device
EMIT ON UPDATE WITH BATCH 1s;
Expand All @@ -176,7 +175,7 @@ Similar to **`EMIT ON UPDATE`**, but includes a delay to allow late events befor
SELECT
window_start, device, max(cpu_usage)
FROM
tumble(device_utils, 5s)
tumble(device_metrics, 5s)
GROUP BY
window_start, device
EMIT ON UPDATE WITH DELAY 2s;
Expand Down