Skip to content

Add proactive window timeout functionality #923

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

tomas-quix
Copy link
Contributor

@tomas-quix tomas-quix commented Jun 6, 2025

Summary

This PR implements comprehensive proactive window timeout functionality that allows windows to expire based on wall-clock time even when no new messages arrive.

Key Features

  • Proactive Expiration: Windows can expire based on wall-clock time without requiring new messages
  • Application Integration: Timeout checking integrated into the consumer polling loop
  • Key Tracking: Efficient tracking of active window keys and their last activity times
  • Memory Safety: Automatic cleanup of expired keys to prevent memory leaks
  • Configurable: Enable/disable timeout checking and adjust check intervals

Changes Made

Core Implementation

  • Application timeout infrastructure (quixstreams/app.py)

    • Added enable_window_timeout_checking and window_timeout_check_interval configuration
    • Implemented key tracking system with track_window_key() and untrack_window_key()
    • Added _check_window_timeouts() method integrated into consumer polling loop
    • Added automatic cleanup of stale keys to prevent memory leaks
  • Window timeout methods (quixstreams/dataframe/windows/time_based.py)

    • Enhanced expire_timeouts_for_key() for proactive timeout checking
    • Improved timeout logic to use last activity time instead of creation time
    • Added proper window state cleanup when timeouts occur
  • Processing integration (quixstreams/dataframe/windows/base.py, quixstreams/processing/context.py)

    • Added callback infrastructure for window registration and key tracking
    • Hooked into window processing pipeline to automatically track active keys
  • Window registration (quixstreams/dataframe/windows/definitions.py, quixstreams/dataframe/dataframe.py)

    • Fixed window registration to happen after aggregation when windows have proper names
    • Added automatic registration of timeout-enabled windows

Fixes

  • Fixed AttributeError: Resolved 'DataFrameRegistry' object has no attribute 'items' error
  • Proper timeout semantics: Changed from creation time to last activity time for timeout calculations
  • Window name handling: Fixed registration timing so windows have proper names for store access
  • Result processing: Fixed MessageContext creation and window result processing

Test Plan

  • Unit tests for Application timeout configuration and key tracking
  • Integration tests for timeout checking logic and window expiration
  • End-to-end testing with real applications
  • Memory leak prevention testing with stale key cleanup
  • Multiple topic and partition scenarios

Usage Example

from quixstreams import Application

# Create application with timeout checking enabled
app = Application(
    broker_address="localhost:9092",
    consumer_group="my-group",
    enable_window_timeout_checking=True,  # Enable timeout checking (default: True)
    window_timeout_check_interval=5.0,    # Check every 5 seconds (default: 5.0)
)

# Create a window with 30-second timeout
topic = app.topic("input-topic")
sdf = app.dataframe(topic)
sdf = sdf.tumbling_window(
    duration_ms=60000,      # 1-minute windows
    timeout_ms=30000        # Expire after 30 seconds of inactivity
).sum()

# Windows will now expire proactively after 30 seconds even without new messages
app.run()

Compatibility

  • ✅ Fully backward compatible - existing code works unchanged
  • ✅ Timeout functionality is opt-in via timeout_ms parameter
  • ✅ No performance impact when timeouts are not used
  • ✅ Configurable timeout checking can be disabled if needed

Files Changed:

The PR includes changes to the following files:

  • quixstreams/app.py - Main timeout infrastructure and Application integration
  • quixstreams/dataframe/windows/time_based.py - Proactive expiration methods
  • quixstreams/dataframe/windows/base.py - Key tracking hooks
  • quixstreams/processing/context.py - Callback infrastructure
  • quixstreams/dataframe/dataframe.py - Window registration integration
  • quixstreams/dataframe/windows/definitions.py - Window creation and registration
  • tests/test_quixstreams/test_dataframe/test_windows/test_tumbling.py - Comprehensive tests

tomas-quix and others added 11 commits June 6, 2025 09:03
…tion flaws

- Update timeout value in test from 5ms to 5000ms for more realistic testing
- Add timeout_ms parameter to tumbling window definitions and API
- Implement basic timeout functionality that tracks window creation time
- Add test demonstrating timeout works when new messages trigger expiration
- Add failing test proving timeouts don't work proactively without new messages

Implementation Issues:
1. Timeouts are only reactive (triggered by new messages) not proactive
2. Uses creation time instead of last activity time for timeout calculation
3. No background mechanism to expire windows during quiet periods

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
Implements proactive expiration of time-based windows, allowing timeouts to be triggered even when no new messages are arriving.

This ensures that windows are closed based on wall clock time, addressing scenarios where idle windows would otherwise remain open indefinitely.

Adds a mechanism to periodically check for and expire timed-out windows, improving resource management and data accuracy.
Implements the ability to proactively expire windows based on a timeout,
rather than only reactively when new messages arrive.

This involves tracking active window keys and periodically checking for
expired windows, triggering their processing through the dataframe pipeline.

Enables configuration of window timeout checking interval and a flag to
enable/disable the feature.
Postpones the implementation of the proactive timeout
expiration mechanism, as the reactive timeout mechanism
is functioning correctly. The full implementation
requires partition-specific state stores, coordination
with the window processing pipeline, and proper
transaction management, which will be addressed in a
future enhancement.
Adds proactive mechanism to expire window timeouts.

This change introduces a background task that periodically checks
for expired windows based on key inactivity. It ensures that
timeouts are handled even if no new messages are received for a
particular key, preventing potential memory leaks and ensuring data
consistency. A cleanup mechanism is implemented to remove stale keys
that have been inactive for a defined duration.
Improves debugging of timeout processing by adding logging statements.

Logs entry to and active keys within the `_expire_dataframe_timeouts` function.
Also logs when no streams are found for a topic, skipping timeout processing.
Adds comprehensive integration and end-to-end tests for the
Application-level window timeout feature.

These tests verify the complete timeout flow, including window
registration, key tracking, timeout checking, window expiration,
and result processing. Also includes a test to verify timeout
checking can be disabled.
Enhances logging within the window timeout processing logic.

This improves debugging and provides greater insight into:
- Stream and topic processing
- Window definitions and store names
- Store transactions
- Key timeout checks
- Expired window results
- Window activity and expiration status
Refactors timeout processing for time windows to improve efficiency and reduce logging noise.

The changes streamline the logic for expiring timed-out windows,
optimizing the scanning range and reducing redundant checks.
It also moves the window registration logic to the window definition to ensure it consistently runs alongside window construction.
Ensures that window timeout events are processed with the
correct timestamp and empty headers, preventing potential
errors in downstream processing logic. It also removes the
size parameter to the synthetic offset.
Removes keys from active tracking when all their associated windows expire.
This prevents unnecessary processing and logging of expired windows,
improving performance and reducing log clutter.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant