New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
non awaited coroutines on a IsolatedAsyncioTestCase results on a RuntimeWarning #90726
Comments
I am unittesting a tcp proxy module using coroutines. This is the coroutine I am using to do the forwarding, allowing the writer stream to drain while the rest of the coroutines are proceeding: async def forward_stream(reader: StreamReader, writer: StreamWriter, event: asyncio.Event, source: str):
writer_drain = writer.drain()
while not event.is_set():
try:
data = await asyncio.wait_for(reader.read(1024), 1)
except asyncio.TimeoutError:
continue
if not data:
event.set()
break
# parse the data
if reading := parse(data):
# wait for the previous write to finish, and forward the data to the other end, process the data in between
await writer_drain
writer.write(data)
writer_drain = writer.drain()
In my unit tests, I have the following (EnergyAgentProxy is the wrapper calling the coroutine in the module that creates the proxy) class TestConnections(IsolatedAsyncioTestCase):
async def asyncSetUp(self) -> None:
self.proxy = asyncio.create_task(EnergyAgentProxy(self.proxy_port, self.server_port, self.upstream_port)) The problem is: When running these tests, I am getting the following error: So... to me, it looks like when the tasks are being cancelled I am getting this warning because the last "await writer_drain" in forward stream is not executed, but I cannot ensure that. Am I doing something wrong? Is there any way I can just prevent this warning from showing up in my tests? |
Seems that, should I add an "await asyncio.sleep(1)" in asyncTearDown, so getting class TestConnections(IsolatedAsyncioTestCase):
async def asyncSetUp(self) -> None:
self.proxy = asyncio.create_task(EnergyAgentProxy(self.proxy_port, self.server_port, self.upstream_port))
async def asyncTearDown(self) -> None:
await asyncio.sleep(1) is enough to "hide the problem under the carpet"... but sounds weird... |
Your code has at least one concurrency problem. Let's look back at forward_stream() function: async def forward_stream(reader: StreamReader, writer: StreamWriter, event: asyncio.Event, source: str):
writer_drain = writer.drain() # <--- awaitable is created here
while not event.is_set():
try:
data = await asyncio.wait_for(reader.read(1024), 1) # <-- CancelledError can be caught here, stack unwinds and writer_drain is never awaited, sure.
except asyncio.TimeoutError:
continue
... # the rest is not important for this case To solve the problem, you should create writer_drain *before its awaiting*, not before another 'await' call. |
Hi Andrew, thank you for your answer. I am experimenting with coroutines, as I am pretty new to them. My idea was to let the writer drain while other packets where read, and thus I am waiting for the writer_drain right before starting writer.write again. Isn't that the correct wait to overlap the readings and the writings? If I modify my initial code to look like: async def forward_stream(reader: StreamReader, writer: StreamWriter, event: asyncio.Event, source: str):
writer_drain = writer.drain() # <--- awaitable is created here
while not event.is_set():
try:
data = await asyncio.wait_for(reader.read(1024), 1) # <-- CancelledError can be caught here, stack unwinds and writer_drain is never awaited, sure.
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
event.set()
break
... # the rest is not important for this case
so that in case the task is cancelled, writer_drain will be awaited outside of the loop. This works, at the cost of having to introduce code specific for testing purposes (which feels wrong). In "production", the workflow of this code will be to loose the connection, break out of the loop, and wait for the writer stream to finish... but I am not introducing any method allowing me to cancel the streams once the script is running. In the same way leaked tasks are "swallowed", which I have tested and works, shouldn't be these cases also handled by the tearDownClass method of IsolatedAsyncioTestCase? |
Your version works but can be simplified. Just use
without grabbing the drainer early. |
You are absolutely correct. Thank you very much! |
You are welcome! |
Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.
Show more details
GitHub fields:
bugs.python.org fields:
The text was updated successfully, but these errors were encountered: