Skip to content

fix: Validate message fields before protobuf encoding for better error messages#333

Merged
BulkBeing merged 7 commits intomainfrom
validate-message-fields
Mar 12, 2026
Merged

fix: Validate message fields before protobuf encoding for better error messages#333
BulkBeing merged 7 commits intomainfrom
validate-message-fields

Conversation

@BulkBeing
Copy link
Contributor

@BulkBeing BulkBeing commented Mar 12, 2026

#328 (comment)

Shutdown due to UDF failure:

CRITICAL:pynumaflow._constants:UDFError, re-raising the error
Traceback (most recent call last):
  File "/app/.venv/lib/python3.10/site-packages/pynumaflow/reducestreamer/servicer/task_manager.py", line 197, in __invoke_reduce
    _ = await new_instance(keys, request_iterator, output, md)
  File "/app/main.py", line 63, in handler
    await output.put(ReduceMessage(str(count).encode(), keys=[keys], tags=["out"]))
  File "/app/.venv/lib/python3.10/site-packages/pynumaflow/reducestreamer/_dtypes.py", line 274, in __init__
    _validate_message_fields(value, keys, tags)
  File "/app/.venv/lib/python3.10/site-packages/pynumaflow/_validate.py", line 11, in _validate_message_fields
    raise TypeError(f"Message 'keys' must be a list of strings, got {keys!r}")
TypeError: Message 'keys' must be a list of strings, got [['key-1']]
INFO:__main__:Window closed, total count=7
2026-03-12 03:51:13 CRITICAL UDFError, re-raising the error
Traceback (most recent call last):
  File "/app/.venv/lib/python3.10/site-packages/pynumaflow/reducestreamer/servicer/task_manager.py", line 197, in __invoke_reduce
    _ = await new_instance(keys, request_iterator, output, md)
  File "/app/main.py", line 63, in handler
    await output.put(ReduceMessage(str(count).encode(), keys=[keys], tags=["out"]))
  File "/app/.venv/lib/python3.10/site-packages/pynumaflow/reducestreamer/_dtypes.py", line 274, in __init__
    _validate_message_fields(value, keys, tags)
  File "/app/.venv/lib/python3.10/site-packages/pynumaflow/_validate.py", line 11, in _validate_message_fields
    raise TypeError(f"Message 'keys' must be a list of strings, got {keys!r}")
TypeError: Message 'keys' must be a list of strings, got [['key-2']]
CRITICAL:pynumaflow._constants:UDFError, re-raising the error
Traceback (most recent call last):
  File "/app/.venv/lib/python3.10/site-packages/pynumaflow/reducestreamer/servicer/task_manager.py", line 197, in __invoke_reduce
    _ = await new_instance(keys, request_iterator, output, md)
  File "/app/main.py", line 63, in handler
    await output.put(ReduceMessage(str(count).encode(), keys=[keys], tags=["out"]))
  File "/app/.venv/lib/python3.10/site-packages/pynumaflow/reducestreamer/_dtypes.py", line 274, in __init__
    _validate_message_fields(value, keys, tags)
  File "/app/.venv/lib/python3.10/site-packages/pynumaflow/_validate.py", line 11, in _validate_message_fields
    raise TypeError(f"Message 'keys' must be a list of strings, got {keys!r}")
TypeError: Message 'keys' must be a list of strings, got [['key-2']]
2026-03-12 03:51:13 INFO     Stopping event loop...
2026-03-12 03:51:13 INFO     Event loop stopped
INFO:pynumaflow._constants:Stopping event loop...
INFO:pynumaflow._constants:Event loop stopped
INFO:aiorun:Entering shutdown phase.
INFO:aiorun:Executing provided shutdown_callback.
2026-03-12 03:51:13 INFO     Received graceful shutdown signal, shutting down ReduceStream server
INFO:pynumaflow._constants:Received graceful shutdown signal, shutting down ReduceStream server
INFO:aiorun:Cancelling pending tasks.
INFO:aiorun:Running pending tasks till complete
2026-03-12 03:51:13 INFO     ReduceStream post-processing cancelled, returning cleanly
INFO:pynumaflow._constants:ReduceStream post-processing cancelled, returning cleanly
INFO:pynumaflow._constants:ReduceStream post-processing cancelled, returning cleanly
2026-03-12 03:51:13 INFO     ReduceStream post-processing cancelled, returning cleanly
INFO:pynumaflow._constants:ReduceStream post-processing cancelled, returning cleanly
2026-03-12 03:51:13 INFO     ReduceStream post-processing cancelled, returning cleanly
2026-03-12 03:51:13 INFO     ReduceStream post-processing cancelled, returning cleanly
INFO:pynumaflow._constants:ReduceStream post-processing cancelled, returning cleanly
INFO:pynumaflow._constants:ReduceStream post-processing cancelled, returning cleanly
2026-03-12 03:51:13 INFO     ReduceStream post-processing cancelled, returning cleanly
INFO:pynumaflow._constants:ReduceStream post-processing cancelled, returning cleanly
2026-03-12 03:51:13 INFO     ReduceStream post-processing cancelled, returning cleanly
INFO:aiorun:Waiting for executor shutdown.
INFO:aiorun:Shutting down async generators
INFO:aiorun:Closing the loop.
INFO:aiorun:Leaving. Bye!
2026-03-12 03:51:13 CRITICAL Server exiting due to UDF error: Message 'keys' must be a list of strings, got [['key-10']]
CRITICAL:pynumaflow._constants:Server exiting due to UDF error: Message 'keys' must be a list of strings, got [['key-10']]

Clean shutdown ("kubectl delete pod")

INFO:__main__:datum #3: event_time=2026-03-12 03:46:51.124054, watermark=2026-03-12 03:46:49.123000
INFO:__main__:datum #4: event_time=2026-03-12 03:46:51.124054, watermark=2026-03-12 03:46:49.123000
INFO:__main__:datum #10: event_time=2026-03-12 03:46:51.124054, watermark=2026-03-12 03:46:49.123000
INFO:__main__:datum #13: event_time=2026-03-12 03:46:51.124054, watermark=2026-03-12 03:46:49.123000

INFO:__main__:Window closed, total count=1
INFO:__main__:Window closed, total count=1
INFO:__main__:Window closed, total count=1
INFO:__main__:Window closed, total count=3
INFO:pynumaflow._constants:Shutdown signal received, stopping server gracefully...
2026-03-12 03:47:10 INFO     Shutdown signal received, stopping server gracefully...
INFO:__main__:Window closed, total count=29
INFO:__main__:Window closed, total count=25
INFO:__main__:Window closed, total count=3

INFO:pynumaflow._constants:Stopping event loop...
2026-03-12 03:47:10 INFO     Stopping event loop...
2026-03-12 03:47:10 INFO     Event loop stopped
INFO:pynumaflow._constants:Event loop stopped
INFO:aiorun:Entering shutdown phase.
INFO:aiorun:Executing provided shutdown_callback.
2026-03-12 03:47:10 INFO     Received graceful shutdown signal, shutting down ReduceStream server
INFO:pynumaflow._constants:Received graceful shutdown signal, shutting down ReduceStream server
INFO:aiorun:Cancelling pending tasks.
INFO:aiorun:Running pending tasks till complete
2026-03-12 03:47:10 INFO     ReduceStream post-processing cancelled, returning cleanly
INFO:pynumaflow._constants:ReduceStream post-processing cancelled, returning cleanly
INFO:pynumaflow._constants:ReduceStream post-processing cancelled, returning cleanly
2026-03-12 03:47:10 INFO     ReduceStream post-processing cancelled, returning cleanly
2026-03-12 03:47:10 INFO     ReduceStream post-processing cancelled, returning cleanly
INFO:pynumaflow._constants:ReduceStream post-processing cancelled, returning cleanly
INFO:pynumaflow._constants:ReduceStream post-processing cancelled, returning cleanly
2026-03-12 03:47:10 INFO     ReduceStream post-processing cancelled, returning cleanly
INFO:pynumaflow._constants:ReduceStream post-processing cancelled, returning cleanly
2026-03-12 03:47:10 INFO     ReduceStream post-processing cancelled, returning cleanly
2026-03-12 03:47:10 INFO     ReduceStream post-processing cancelled, returning cleanly
INFO:pynumaflow._constants:ReduceStream post-processing cancelled, returning cleanly
INFO:pynumaflow._constants:ReduceStream post-processing cancelled, returning cleanly
2026-03-12 03:47:10 INFO     ReduceStream post-processing cancelled, returning cleanly
INFO:aiorun:Waiting for executor shutdown.
INFO:aiorun:Shutting down async generators
INFO:aiorun:Closing the loop.
INFO:aiorun:Leaving. Bye!

…sages

Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Branch-Creation-Time: 2026-03-12T00:42:31+0000

Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
@codecov
Copy link

codecov bot commented Mar 12, 2026

Codecov Report

❌ Patch coverage is 94.11765% with 3 lines in your changes missing coverage. Please review.
✅ Project coverage is 94.43%. Comparing base (b37f25f) to head (a471525).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...pynumaflow/reducestreamer/servicer/task_manager.py 83.33% 2 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #333      +/-   ##
==========================================
+ Coverage   94.10%   94.43%   +0.33%     
==========================================
  Files          66       67       +1     
  Lines        3137     3182      +45     
  Branches      168      179      +11     
==========================================
+ Hits         2952     3005      +53     
+ Misses        151      142       -9     
- Partials       34       35       +1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@BulkBeing BulkBeing force-pushed the validate-message-fields branch from f9402ca to 7035915 Compare March 12, 2026 00:54
@vigith vigith linked an issue Mar 12, 2026 that may be closed by this pull request
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
@BulkBeing BulkBeing marked this pull request as ready for review March 12, 2026 04:37
@BulkBeing BulkBeing merged commit 5b458ad into main Mar 12, 2026
12 checks passed
@BulkBeing BulkBeing deleted the validate-message-fields branch March 12, 2026 05:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

UDFs do not seem to propagate Watermarks

2 participants