Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.11] gh-91227: Ignore ERROR_PORT_UNREACHABLE in proactor recvfrom() (GH-32011) #117210

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions Lib/asyncio/windows_events.py
Expand Up @@ -513,6 +513,10 @@ def finish_recv(trans, key, ov):
try:
return ov.getresult()
except OSError as exc:
# WSARecvFrom will report ERROR_PORT_UNREACHABLE when the same
# socket is used to send to an address that is not listening.
if exc.winerror == _overlapped.ERROR_PORT_UNREACHABLE:
return b'', None
if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
_overlapped.ERROR_OPERATION_ABORTED):
raise ConnectionResetError(*exc.args)
Expand All @@ -533,6 +537,10 @@ def finish_recv(trans, key, ov):
try:
return ov.getresult()
except OSError as exc:
# WSARecvFrom will report ERROR_PORT_UNREACHABLE when the same
# socket is used to send to an address that is not listening.
if exc.winerror == _overlapped.ERROR_PORT_UNREACHABLE:
return 0, None
if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
_overlapped.ERROR_OPERATION_ABORTED):
raise ConnectionResetError(*exc.args)
Expand Down
74 changes: 74 additions & 0 deletions Lib/test/test_asyncio/test_events.py
Expand Up @@ -1352,6 +1352,80 @@ def test_create_datagram_endpoint_sock(self):
tr.close()
self.loop.run_until_complete(pr.done)

def test_datagram_send_to_non_listening_address(self):
# see:
# https://github.com/python/cpython/issues/91227
# https://github.com/python/cpython/issues/88906
# https://bugs.python.org/issue47071
# https://bugs.python.org/issue44743
# The Proactor event loop would fail to receive datagram messages after
# sending a message to an address that wasn't listening.
loop = self.loop

class Protocol(asyncio.DatagramProtocol):

_received_datagram = None

def datagram_received(self, data, addr):
self._received_datagram.set_result(data)

async def wait_for_datagram_received(self):
self._received_datagram = loop.create_future()
result = await asyncio.wait_for(self._received_datagram, 10)
self._received_datagram = None
return result

def create_socket():
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setblocking(False)
sock.bind(('127.0.0.1', 0))
return sock

socket_1 = create_socket()
transport_1, protocol_1 = loop.run_until_complete(
loop.create_datagram_endpoint(Protocol, sock=socket_1)
)
addr_1 = socket_1.getsockname()

socket_2 = create_socket()
transport_2, protocol_2 = loop.run_until_complete(
loop.create_datagram_endpoint(Protocol, sock=socket_2)
)
addr_2 = socket_2.getsockname()

# creating and immediately closing this to try to get an address that
# is not listening
socket_3 = create_socket()
transport_3, protocol_3 = loop.run_until_complete(
loop.create_datagram_endpoint(Protocol, sock=socket_3)
)
addr_3 = socket_3.getsockname()
transport_3.abort()

transport_1.sendto(b'a', addr=addr_2)
self.assertEqual(loop.run_until_complete(
protocol_2.wait_for_datagram_received()
), b'a')

transport_2.sendto(b'b', addr=addr_1)
self.assertEqual(loop.run_until_complete(
protocol_1.wait_for_datagram_received()
), b'b')

# this should send to an address that isn't listening
transport_1.sendto(b'c', addr=addr_3)
loop.run_until_complete(asyncio.sleep(0))

# transport 1 should still be able to receive messages after sending to
# an address that wasn't listening
transport_2.sendto(b'd', addr=addr_1)
self.assertEqual(loop.run_until_complete(
protocol_1.wait_for_datagram_received()
), b'd')

transport_1.close()
transport_2.close()

def test_internal_fds(self):
loop = self.create_event_loop()
if not isinstance(loop, selector_events.BaseSelectorEventLoop):
Expand Down
81 changes: 81 additions & 0 deletions Lib/test/test_asyncio/test_sock_lowlevel.py
Expand Up @@ -555,12 +555,93 @@ class SelectEventLoopTests(BaseSockTestsMixin,
def create_event_loop(self):
return asyncio.SelectorEventLoop()


class ProactorEventLoopTests(BaseSockTestsMixin,
test_utils.TestCase):

def create_event_loop(self):
return asyncio.ProactorEventLoop()


async def _basetest_datagram_send_to_non_listening_address(self,
recvfrom):
# see:
# https://github.com/python/cpython/issues/91227
# https://github.com/python/cpython/issues/88906
# https://bugs.python.org/issue47071
# https://bugs.python.org/issue44743
# The Proactor event loop would fail to receive datagram messages
# after sending a message to an address that wasn't listening.

def create_socket():
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setblocking(False)
sock.bind(('127.0.0.1', 0))
return sock

socket_1 = create_socket()
addr_1 = socket_1.getsockname()

socket_2 = create_socket()
addr_2 = socket_2.getsockname()

# creating and immediately closing this to try to get an address
# that is not listening
socket_3 = create_socket()
addr_3 = socket_3.getsockname()
socket_3.shutdown(socket.SHUT_RDWR)
socket_3.close()

socket_1_recv_task = self.loop.create_task(recvfrom(socket_1))
socket_2_recv_task = self.loop.create_task(recvfrom(socket_2))
await asyncio.sleep(0)

await self.loop.sock_sendto(socket_1, b'a', addr_2)
self.assertEqual(await socket_2_recv_task, b'a')

await self.loop.sock_sendto(socket_2, b'b', addr_1)
self.assertEqual(await socket_1_recv_task, b'b')
socket_1_recv_task = self.loop.create_task(recvfrom(socket_1))
await asyncio.sleep(0)

# this should send to an address that isn't listening
await self.loop.sock_sendto(socket_1, b'c', addr_3)
self.assertEqual(await socket_1_recv_task, b'')
socket_1_recv_task = self.loop.create_task(recvfrom(socket_1))
await asyncio.sleep(0)

# socket 1 should still be able to receive messages after sending
# to an address that wasn't listening
socket_2.sendto(b'd', addr_1)
self.assertEqual(await socket_1_recv_task, b'd')

socket_1.shutdown(socket.SHUT_RDWR)
socket_1.close()
socket_2.shutdown(socket.SHUT_RDWR)
socket_2.close()


def test_datagram_send_to_non_listening_address_recvfrom(self):
async def recvfrom(socket):
data, _ = await self.loop.sock_recvfrom(socket, 4096)
return data

self.loop.run_until_complete(
self._basetest_datagram_send_to_non_listening_address(
recvfrom))


def test_datagram_send_to_non_listening_address_recvfrom_into(self):
async def recvfrom_into(socket):
buf = bytearray(4096)
length, _ = await self.loop.sock_recvfrom_into(socket, buf,
4096)
return buf[:length]

self.loop.run_until_complete(
self._basetest_datagram_send_to_non_listening_address(
recvfrom_into))

else:
import selectors

Expand Down
@@ -0,0 +1 @@
Fix the asyncio ProactorEventLoop implementation so that sending a datagram to an address that is not listening does not prevent receiving any more datagrams.
1 change: 1 addition & 0 deletions Modules/overlapped.c
Expand Up @@ -2092,6 +2092,7 @@ overlapped_exec(PyObject *module)
WINAPI_CONSTANT(F_DWORD, ERROR_OPERATION_ABORTED);
WINAPI_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT);
WINAPI_CONSTANT(F_DWORD, ERROR_PIPE_BUSY);
WINAPI_CONSTANT(F_DWORD, ERROR_PORT_UNREACHABLE);
WINAPI_CONSTANT(F_DWORD, INFINITE);
WINAPI_CONSTANT(F_HANDLE, INVALID_HANDLE_VALUE);
WINAPI_CONSTANT(F_HANDLE, NULL);
Expand Down