Skip to content

Feature: Interval joins #924

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Jun 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6891a20
Move JoinHow to base module
gwaramadze Jun 3, 2025
b904446
Move OnOverlap to base module
gwaramadze Jun 3, 2025
dcf4663
Use existing topic_manager_topic_factory fixture
gwaramadze Jun 4, 2025
4dedac2
Move join tests to a separate package
gwaramadze Jun 4, 2025
10b2fa9
Move join fixtures to a dedicated module
gwaramadze Jun 4, 2025
445fdea
Rename class JoinAsOf to AsOfJoin
gwaramadze Jun 4, 2025
a80abad
Eliminate the need for a cast call
gwaramadze Jun 4, 2025
03f3bca
Save processing_context to an outer scope var to reuse
gwaramadze Jun 4, 2025
cc7c1d4
Save merger to an outer scope variable
gwaramadze Jun 6, 2025
17945d1
Move append_integer
gwaramadze Jun 9, 2025
b690a16
Move _get_items
gwaramadze Jun 9, 2025
47f54d1
Create base Join
gwaramadze Jun 9, 2025
2b589f3
Refactor joins
gwaramadze Jun 9, 2025
09ccf07
Minor AsOfJoin refactor
gwaramadze Jun 9, 2025
85826b0
Add IntervalJoin
gwaramadze Jun 9, 2025
9a23fa8
Create base Join tests
gwaramadze Jun 10, 2025
570e5f6
Add docstring and documentation section
gwaramadze Jun 16, 2025
9d845e1
Introduce right and outer joins
gwaramadze Jun 17, 2025
5c687f0
Add png
gwaramadze Jun 17, 2025
346183c
Fix rebase mistake
gwaramadze Jun 18, 2025
6ee4e5c
Join subclasses ABC
gwaramadze Jun 19, 2025
cc5a31c
Drop headers
gwaramadze Jun 19, 2025
1d8700c
Add note on performance limitations
gwaramadze Jun 19, 2025
f1e2d8b
Apply suggestions from code review
gwaramadze Jun 19, 2025
5c3b847
Cleanup `how` params type hints
gwaramadze Jun 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added docs/img/join-interval.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
101 changes: 95 additions & 6 deletions docs/joins.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,8 @@ Here is a description of the as-of join algorithm:
#### Joining strategies
As-of join supports the following joining strategies:

- `inner` - emit the output for the left record only when the match is found (default).
- `left` - emit the output for the left record even without a match.

- `inner` - emit the output for the left record only when the match is found (default)
- `left` - emit the result for each left record even without matches on the right side

#### Merging records together
When the match is found, the two records are merged according to the `on_merge` parameter.
Expand Down Expand Up @@ -116,7 +115,6 @@ if __name__ == '__main__':
```



#### State expiration
`StreamingDataFrame.join_asof` stores the right records to the state.
The `grace_ms` parameter regulates the state's lifetime (default - 7 days) to prevent it from growing in size forever.
Expand All @@ -132,8 +130,8 @@ Adjust `grace_ms` based on the expected time gap between the left and the right
### Limitations

- Joining dataframes belonging to the same topics (aka "self-join") is not supported.
- As-of join preserves headers only for the left dataframe.
If you need headers of the right side records, consider adding them to the value.
- Join types "right" and "outer" are not supported.
- As-of join preserves headers only for the left dataframe. If you need headers of the right side records, consider adding them to the value.

### Message ordering between partitions
Streaming joins use [`StreamingDataFrame.concat()`](concatenating.md) under the hood, which means that the application's internal consumer goes into a special "buffered" mode
Expand Down Expand Up @@ -199,3 +197,94 @@ if __name__ == '__main__':
- For each record in the dataframe, a user-defined lookup strategy (a subclass of `BaseLookup`) is called with a mapping of field names to field definitions (subclasses of `BaseField`).
- The lookup strategy fetches or computes enrichment data based on the provided key and fields, and updates the record in-place.
- The enrichment can come from external sources such as configuration topics, databases, or in-memory data.

## Interval join

> _New in [3.17.0](https://github.com/quixio/quix-streams/releases/tag/v3.17.0)_

Use `StreamingDataFrame.join_interval()` to join two topics into a new stream where each record is merged with records from the other topic that fall within a specified time interval.

This join is useful for cases where you need to match records that occur within a specific time window of each other, rather than just the latest record (as in as-of join).

![img.png](img/join-interval.png)

### Example

Join records from the topic "measurements" with records from the topic "events" that occur within a 5-minute window before and after each measurement:

```python
from datetime import timedelta

from quixstreams import Application

app = Application(...)

sdf_measurements = app.dataframe(app.topic("measurements"))
sdf_events = app.dataframe(app.topic("events"))

# Join records from the topic "measurements"
# with records from "events" that occur within a 5-minute window
# before and after each measurement
sdf_joined = sdf_measurements.join_interval(
right=sdf_events,
how="inner", # Emit updates only if matches are found
on_merge="keep-left", # Prefer the columns from the left dataframe if they overlap
grace_ms=timedelta(days=7), # Keep the state for 7 days
backward_ms=timedelta(minutes=5), # Look for events up to 5 minutes before
forward_ms=timedelta(minutes=5), # Look for events up to 5 minutes after
)

if __name__ == '__main__':
app.run()
```

### How it works

The interval join algorithm works as follows:

- Records from both sides are stored in the state store
- For each record on the left side:
- Look for matching records on the right side that fall within the specified time interval
- If matches are found, merge the records according to the `on_merge` logic
- For inner joins, only emit if matches are found
- For left joins, emit even without matches
- For each record on the right side:
- Look for matching records on the left side that fall within the specified time interval
- Merge all matching records according to the `on_merge` logic

#### Time intervals
The join uses two time intervals to determine matches:

- `backward_ms`: How far back in time to look for matches from the right side
- `forward_ms`: How far forward in time to look for matches from the right side

> **Note:** When both `backward_ms` and `forward_ms` are set to 0 (default), the join will only match records with exactly the same timestamp.

The `grace_ms` parameter controls how long records are kept in the state store, similar to other join types.

#### Joining strategies
Interval join supports the following joining strategies:

- `inner` - emit the output for the left record only when the match is found (default)
- `left` - emit the result for each left record even without matches on the right side
- `right` - emit the result for each right record even without matches on the left side
- `outer` - emit the output for both left and right records even without matches

#### Merging records
The merging behavior is controlled by the `on_merge` parameter, which works the same way as in other join types:

- `raise` - merge records and raise an exception if keys overlap (default)
- `keep-left` - prefer keys from the left record in case of overlap
- `keep-right` - prefer keys from the right record in case of overlap
- custom callback - use a custom function to merge records

> **Warning:** Custom merge functions must not mutate the input values as this will lead to
> unexpected exceptions or incorrect data in the joined stream. Always return a new object instead.

### Limitations

- Joining dataframes belonging to the same topic (aka "self-join") is not supported.
- The `backward_ms` must not be greater than the `grace_ms` to avoid losing data.
- Interval join does not preserve any headers. If you need headers from any side, consider adding them to the value.
- The performance of the interval join depends on the density of the data.
If both streams have too many matching messages falling within the interval, the performance may drop significantly due to the large number of produced outputs.
117 changes: 112 additions & 5 deletions quixstreams/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
)
from quixstreams.utils.stream_id import stream_id_from_strings

from .joins import JoinAsOf, JoinAsOfHow, OnOverlap
from .joins import AsOfJoin, AsOfJoinHow, IntervalJoin, IntervalJoinHow, OnOverlap
from .joins.lookups import BaseField, BaseLookup
from .registry import DataFrameRegistry
from .series import StreamingSeries
Expand Down Expand Up @@ -1661,7 +1661,7 @@ def concat(self, other: "StreamingDataFrame") -> "StreamingDataFrame":
def join_asof(
self,
right: "StreamingDataFrame",
how: JoinAsOfHow = "inner",
how: AsOfJoinHow = "inner",
on_merge: Union[OnOverlap, Callable[[Any, Any], Any]] = "raise",
grace_ms: Union[int, timedelta] = timedelta(days=7),
name: Optional[str] = None,
Expand All @@ -1688,8 +1688,8 @@ def join_asof(
:param right: a StreamingDataFrame to join with.

:param how: the join strategy. Can be one of:
- "inner" - emits the result when the match on the right side is found for the left record.
- "left" - emits the result for each left record even if there is no match on the right side.
- "inner" - emit the output for the left record only when the match is found (default)
- "left" - emit the result for each left record even without matches on the right side
Default - `"inner"`.

:param on_merge: how to merge the matched records together assuming they are dictionaries:
Expand All @@ -1698,6 +1698,8 @@ def join_asof(
- "keep-right" - prefer the keys from the right record
- callback - a callback in form "(<left>, <right>) -> <new record>" to merge the records manually.
Use it to customize the merging logic or when one of the records is not a dictionary.
WARNING: Custom merge functions must not mutate the input values as this will lead to
inconsistencies in the state store. Always return a new object instead.

:param grace_ms: how long to keep the right records in the store in event time.
(the time is taken from the records' timestamps).
Expand Down Expand Up @@ -1728,10 +1730,115 @@ def join_asof(
```

"""
return JoinAsOf(
return AsOfJoin(
how=how, on_merge=on_merge, grace_ms=grace_ms, store_name=name
).join(self, right)

def join_interval(
self,
right: "StreamingDataFrame",
how: IntervalJoinHow = "inner",
on_merge: Union[OnOverlap, Callable[[Any, Any], Any]] = "raise",
grace_ms: Union[int, timedelta] = timedelta(days=7),
name: Optional[str] = None,
backward_ms: Union[int, timedelta] = 0,
forward_ms: Union[int, timedelta] = 0,
) -> "StreamingDataFrame":
"""
Join the left dataframe with records from the right dataframe that fall within
specified time intervals. This join is useful for matching records that occur
within a specific time window of each other, rather than just the latest record.

To be joined, the underlying topics of the dataframes must have the same number of partitions
and use the same partitioner (all keys should be distributed across partitions using the same algorithm).

Joining dataframes belonging to the same topics (aka "self-join") is not supported.

Note:
When both `backward_ms` and `forward_ms` are set to 0 (default), the join will only match
records with exactly the same timestamp.

How it works:
- Records from both sides are stored in the state store
- For each record on the left side:
- Look for matching records on the right side that fall within the specified time interval
- If matches are found, merge the records according to the `on_merge` logic
- For inner joins, only emit if matches are found
- For left joins, emit even without matches
- For each record on the right side:
- Look for matching records on the left side that fall within the specified time interval
- Merge all matching records according to the `on_merge` logic

:param right: a StreamingDataFrame to join with.

:param how: the join strategy. Can be one of:
- "inner" - emit the output for the left record only when the match is found (default)
- "left" - emit the result for each left record even without matches on the right side
- "right" - emit the result for each right record even without matches on the left side
- "outer" - emit the output for both left and right records even without matches
Default - `"inner"`.

:param on_merge: how to merge the matched records together assuming they are dictionaries:
- "raise" - fail with an error if the same keys are found in both dictionaries
- "keep-left" - prefer the keys from the left record
- "keep-right" - prefer the keys from the right record
- callback - a callback in form "(<left>, <right>) -> <new record>" to merge the records manually.
Use it to customize the merging logic or when one of the records is not a dictionary.
WARNING: Custom merge functions must not mutate the input values as this will lead to
unexpected exceptions or incorrect data in the joined stream. Always return a new object instead.

:param grace_ms: how long to keep records in the store in event time.
(the time is taken from the records' timestamps).
It can be specified as either an `int` representing milliseconds or as a `timedelta` object.
The records are expired per key when the new record gets added.
Default - 7 days.

:param name: The unique identifier of the underlying state store.
If not provided, it will be generated based on the underlying topic names.
Provide a custom name if you need to join the same right dataframe multiple times
within the application.

:param backward_ms: How far back in time to look for matches from the right side.
Can be specified as either an `int` representing milliseconds or as a `timedelta` object.
Must not be greater than `grace_ms`. Default - 0.

:param forward_ms: How far forward in time to look for matches from the right side.
Can be specified as either an `int` representing milliseconds or as a `timedelta` object.
Default - 0.

Example:

```python
from datetime import timedelta
from quixstreams import Application

app = Application()

sdf_measurements = app.dataframe(app.topic("measurements"))
sdf_events = app.dataframe(app.topic("events"))

# Join records from the topic "measurements"
# with records from "events" that occur within a 5-minute window
# before and after each measurement
sdf_joined = sdf_measurements.join_interval(
right=sdf_events,
how="inner",
on_merge="keep-left",
grace_ms=timedelta(days=7),
backward_ms=timedelta(minutes=5),
forward_ms=timedelta(minutes=5)
)
```
"""
return IntervalJoin(
how=how,
on_merge=on_merge,
grace_ms=grace_ms,
store_name=name,
backward_ms=backward_ms,
forward_ms=forward_ms,
).join(self, right)

def join_lookup(
self,
lookup: BaseLookup,
Expand Down
14 changes: 11 additions & 3 deletions quixstreams/dataframe/joins/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
from .join_asof import JoinAsOf as JoinAsOf
from .join_asof import JoinAsOfHow as JoinAsOfHow
from .join_asof import OnOverlap as OnOverlap
from .base import OnOverlap
from .join_asof import AsOfJoin, AsOfJoinHow
from .join_interval import IntervalJoin, IntervalJoinHow

__all__ = (
"AsOfJoin",
"AsOfJoinHow",
"IntervalJoin",
"IntervalJoinHow",
"OnOverlap",
)
Loading