-
Notifications
You must be signed in to change notification settings - Fork 83
Add proactive window timeout functionality #923
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
tomas-quix
wants to merge
11
commits into
main
Choose a base branch
from
fix-window-timeout-implementation
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…tion flaws - Update timeout value in test from 5ms to 5000ms for more realistic testing - Add timeout_ms parameter to tumbling window definitions and API - Implement basic timeout functionality that tracks window creation time - Add test demonstrating timeout works when new messages trigger expiration - Add failing test proving timeouts don't work proactively without new messages Implementation Issues: 1. Timeouts are only reactive (triggered by new messages) not proactive 2. Uses creation time instead of last activity time for timeout calculation 3. No background mechanism to expire windows during quiet periods 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
Implements proactive expiration of time-based windows, allowing timeouts to be triggered even when no new messages are arriving. This ensures that windows are closed based on wall clock time, addressing scenarios where idle windows would otherwise remain open indefinitely. Adds a mechanism to periodically check for and expire timed-out windows, improving resource management and data accuracy.
Implements the ability to proactively expire windows based on a timeout, rather than only reactively when new messages arrive. This involves tracking active window keys and periodically checking for expired windows, triggering their processing through the dataframe pipeline. Enables configuration of window timeout checking interval and a flag to enable/disable the feature.
Postpones the implementation of the proactive timeout expiration mechanism, as the reactive timeout mechanism is functioning correctly. The full implementation requires partition-specific state stores, coordination with the window processing pipeline, and proper transaction management, which will be addressed in a future enhancement.
Adds proactive mechanism to expire window timeouts. This change introduces a background task that periodically checks for expired windows based on key inactivity. It ensures that timeouts are handled even if no new messages are received for a particular key, preventing potential memory leaks and ensuring data consistency. A cleanup mechanism is implemented to remove stale keys that have been inactive for a defined duration.
Improves debugging of timeout processing by adding logging statements. Logs entry to and active keys within the `_expire_dataframe_timeouts` function. Also logs when no streams are found for a topic, skipping timeout processing.
Adds comprehensive integration and end-to-end tests for the Application-level window timeout feature. These tests verify the complete timeout flow, including window registration, key tracking, timeout checking, window expiration, and result processing. Also includes a test to verify timeout checking can be disabled.
Enhances logging within the window timeout processing logic. This improves debugging and provides greater insight into: - Stream and topic processing - Window definitions and store names - Store transactions - Key timeout checks - Expired window results - Window activity and expiration status
Refactors timeout processing for time windows to improve efficiency and reduce logging noise. The changes streamline the logic for expiring timed-out windows, optimizing the scanning range and reducing redundant checks. It also moves the window registration logic to the window definition to ensure it consistently runs alongside window construction.
Ensures that window timeout events are processed with the correct timestamp and empty headers, preventing potential errors in downstream processing logic. It also removes the size parameter to the synthetic offset.
Removes keys from active tracking when all their associated windows expire. This prevents unnecessary processing and logging of expired windows, improving performance and reducing log clutter.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Summary
This PR implements comprehensive proactive window timeout functionality that allows windows to expire based on wall-clock time even when no new messages arrive.
Key Features
Changes Made
Core Implementation
Application timeout infrastructure (
quixstreams/app.py
)enable_window_timeout_checking
andwindow_timeout_check_interval
configurationtrack_window_key()
anduntrack_window_key()
_check_window_timeouts()
method integrated into consumer polling loopWindow timeout methods (
quixstreams/dataframe/windows/time_based.py
)expire_timeouts_for_key()
for proactive timeout checkingProcessing integration (
quixstreams/dataframe/windows/base.py
,quixstreams/processing/context.py
)Window registration (
quixstreams/dataframe/windows/definitions.py
,quixstreams/dataframe/dataframe.py
)Fixes
'DataFrameRegistry' object has no attribute 'items'
errorTest Plan
Usage Example
Compatibility
Files Changed:
The PR includes changes to the following files:
quixstreams/app.py
- Main timeout infrastructure and Application integrationquixstreams/dataframe/windows/time_based.py
- Proactive expiration methodsquixstreams/dataframe/windows/base.py
- Key tracking hooksquixstreams/processing/context.py
- Callback infrastructurequixstreams/dataframe/dataframe.py
- Window registration integrationquixstreams/dataframe/windows/definitions.py
- Window creation and registrationtests/test_quixstreams/test_dataframe/test_windows/test_tumbling.py
- Comprehensive tests