Skip to content

fix: Avoid UDF call in map streaming from starving eventloop#349

Open
BulkBeing wants to merge 2 commits intomainfrom
mapstreaming-eventloop-starvation
Open

fix: Avoid UDF call in map streaming from starving eventloop#349
BulkBeing wants to merge 2 commits intomainfrom
mapstreaming-eventloop-starvation

Conversation

@BulkBeing
Copy link
Copy Markdown
Contributor

@BulkBeing BulkBeing commented Apr 4, 2026

Fixes #342

Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
@codecov
Copy link
Copy Markdown

codecov bot commented Apr 4, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 92.65%. Comparing base (952b302) to head (a4d354f).

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #349      +/-   ##
==========================================
- Coverage   92.68%   92.65%   -0.03%     
==========================================
  Files          67       67              
  Lines        3513     3514       +1     
  Branches      229      229              
==========================================
  Hits         3256     3256              
  Misses        193      193              
- Partials       64       65       +1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Comment on lines +141 to +142
# The starvation can happen if the UDF code yields messages using regular
# for-loop (non async). See the sample code in https://github.com/numaproj/numaflow-python/issues/342
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is a bug on how the user is writing the for loop, right? The should use async forbut using for. Is my understanding correct?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If they make a blocking call in the middle, the event loop should get blocked out as well.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though @BulkBeing when we were using the generator pattern in the previous version, we don't see this behavior because we are yielding right away? Instead of storing and yielding in different methods?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay we were not doing parallel invocations at that point, so only one message was getting processed regardless

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BulkBeing can we try to put the user code in an async loop and test once?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, in the previous version (0.9.0) where it was working for the user, we were not using async queue.

Python logs something like this as well if the task was stuck without yielding (the code I used yields 1 value every second for 20 seconds):

2026-04-03 10:49:50,256 asyncio WARNING Executing <Task finished name='Task-75' coro=<AsyncMapStreamServicer._invoke_map_stream() done, defined at /app/.venv/lib/python3.13/site-packages/pynumaflow/mapstreamer/servicer/async_servicer.py:118> result=None created at /usr/local/lib/python3.13/asyncio/tasks.py:410> took 20.143 seconds

Let me try converting it to async for loop.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It works fine (without asyncio.sleep(0)) if the UDF uses async generator (async for with yield):

async for msg in processor_monitor.get_messages():
        yielded += 1
        log.info("Yielding message %d", yielded)
        yield msg

Note that it results in more changes like changing get_messages to async, using an asyncio.AsyncQueue to communicate between background thread and main thread, passing event loop to background thread and using the queue like this: self.loop.call_soon_threadsafe(self.queue.put_nowait, msg) (AsyncQueue isn't thread safe).
In short, with get_messages(), there is real await points like msg = await self.queue.get().

@vigith vigith marked this pull request as ready for review April 5, 2026 18:46
Copy link
Copy Markdown
Member

@vigith vigith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because users tend to use for loop instead of async for.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Regression?] MapStreamAsyncServer not streaming

3 participants