Skip to content

Async reduce UDF returns Runtime error for asynchronous generator #63

@kohlisid

Description

@kohlisid

Describe the bug
While using the async reduce UDF if running a function with a high I/O delay or at high input message throughput, the pod seems to crash with the following error. In my case the UDF makes an HTTP request to a server and sends the response to the sink.

future: <Task finished name='Task-25619' coro=<<async_generator_athrow without __name__>()>exception=RuntimeError('aclose(): asynchronous generator is already running')>
RuntimeError: aclose(): asynchronous generator is already running.

This is reproducible, and depending on the input RPU and the response time, this might occur from the start of processing or after sometime.

To Reproduce

Steps to reproduce using the attached files

  1. Change the attached UDF code at line 28 to point to any web server and change the response in line 15 accordingly
  2. Build the image and create a new pipeline using the YAML file.
  3. The issue is observed at a high value of RPU (>100) or high response delay from the server
  4. Monitor the UDF vertex logs for the error message

Expected behavior
Messages should be processed or graceful termination with proper exception handling

Screenshots

Screenshot 2023-03-06 at 9 40 40 AM

Screenshot 2023-03-06 at 9 42 10 AM

Environment (please complete the following information):

  • pynumaflow = "~0.3.2"

Additional context
Reference files
aclose.zip


Message from the maintainers:

Impacted by this bug? Give it a 👍. We often sort issues this way to know what to prioritize.

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions