Skip to content

Cannot Process Windowed Sessions from Kafka - Crash with "key missing in output table" on streaming retractions #232

@deuscapturus

Description

@deuscapturus

Bug Description

Session windows (pw.temporal.session()) crash with KeyError: 'key missing in output table' when Kafka messages arrive in separate minibatches (streaming epochs), triggering window retractions.

Environment

  • Pathway version: 0.30.1 (also tested 0.29.0, 0.28.0 - all crash)
  • Python version: 3.12
  • OS: Fedora Linux

Error Message

KeyError: 'key missing in output table: ^EB0V6Z1XTG3XXAPBR0F1K8573G'
Occurred here:
    Line: windowed = parsed.windowby(
    File: poc_sessions.py:55

Minimal Reproducible Example

⚠️ Requires Kafka running on localhost:9092

1. POC Application (poc_sessions.py)

"""
Minimal POC to reproduce Pathway session window crash.

Crash: KeyError 'key missing in output table' when using session windows with Kafka.
Bug is in Rust engine's session window implementation with Kafka connector.
"""

from datetime import timedelta
import pathway as pw


def main():
    # Kafka settings
    kafka_settings = {
        "bootstrap.servers": "localhost:9092",
        "group.id": "poc-sessions-minimal",
        "auto.offset.reset": "earliest",
    }

    # Read from Kafka
    raw_messages = pw.io.kafka.read(
        kafka_settings,
        topic="test.poc.sessions",
        format="raw",
        autocommit_duration_ms=1000,
    )

    # Parse timestamp from JSON
    @pw.udf
    def extract_timestamp(data: bytes) -> float:
        import json
        return json.loads(data).get("timestamp", 0.0)
    
    parsed = raw_messages.select(
        ts_float=extract_timestamp(pw.this.data),
    ).select(
        ts=pw.this.ts_float.dt.utc_from_timestamp(unit="s"),
    )

    # Session window - crashes here
    windowed = parsed.windowby(
        parsed.ts,
        window=pw.temporal.session(max_gap=timedelta(seconds=2)),
    ).reduce(
        count=pw.reducers.count(),
    )

    # Write to null sink
    pw.io.null.write(windowed)
    
    print("Starting computation (will crash)...")
    pw.run()


if __name__ == "__main__":
    main()

2. Test Script (test_poc_sessions.py)

#!/usr/bin/env python3
"""
Produce test messages to trigger session window crash.

Sends 3 rapid messages that should merge into one session window.
This triggers a retraction in Pathway's Rust engine that causes the crash.
"""

import json
import time
from confluent_kafka import Producer


def main():
    producer = Producer({"bootstrap.servers": "localhost:9092"})
    topic = "test.poc.sessions"
    key = b"test-key-1"
    
    print(f"Sending 3 messages to {topic}...")
    
    # Send 3 rapid messages (100ms apart)
    for i in range(3):
        msg = json.dumps({"test": f"msg-{i+1}", "timestamp": time.time()})
        producer.produce(topic, key=key, value=msg.encode())
        print(f"  {i+1}. {msg}")
        time.sleep(0.1)
    
    producer.flush()
    print("\n✓ Messages sent. Run poc_sessions.py to trigger crash.")


if __name__ == "__main__":
    main()

3. Steps to Reproduce

python3 test_poc_sessions.py && python3 poc_sessions.py

4. Expected vs Actual Behavior

Expected: Session windows should handle messages arriving in separate minibatches, extending the window when new messages arrive within max_gap.

Actual: Crashes with KeyError: 'key missing in output table' when trying to retract and update the session window.

Traceback (most recent call last):
  File "/home/pathway-container/src/iris_pathway_batcher/poc_sessions.py", line 57, in <module>
    main()
  File "/home/pathway-container/src/iris_pathway_batcher/poc_sessions.py", line 53, in main
    pw.run()
  File "/home/pathway-container/.venv/lib64/python3.12/site-packages/pathway/internals/runtime_type_check.py", line 19, in with_type_validation
    return beartype.beartype(f)(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<@beartype(pathway.internals.run.run) at 0x7f151776bd80>", line 162, in run
  File "/home/pathway-container/.venv/lib64/python3.12/site-packages/pathway/internals/run.py", line 62, in run
    ).run_outputs()
      ^^^^^^^^^^^^^
  File "/home/pathway-container/.venv/lib64/python3.12/site-packages/pathway/internals/graph_runner/__init__.py", line 123, in run_outputs
    self.run_nodes(self._graph.global_scope.output_nodes, after_build=after_build)
  File "/home/pathway-container/.venv/lib64/python3.12/site-packages/pathway/internals/graph_runner/__init__.py", line 99, in run_nodes
    self._run(all_nodes, after_build=after_build)
  File "/home/pathway-container/.venv/lib64/python3.12/site-packages/pathway/internals/graph_runner/__init__.py", line 257, in _run
    return run_with_event_loop(event_loop)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/pathway-container/.venv/lib64/python3.12/site-packages/pathway/internals/graph_runner/__init__.py", line 238, in run_with_event_loop
    raise error from None
KeyError: 'key missing in output table: ^EB0V6Z1XTG3XXAPBR0F1K8573G'
Occurred here:
    Line: windowed = parsed.windowby(
    File: /home/pathway-container/src/iris_pathway_batcher/poc_sessions.py:42

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions