diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 8ab420d5bd719d..96e61f7c3a4e9f 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -783,6 +783,8 @@ def __init__(self, loop, sock, protocol, extra=None, server=None): self._buffer = self._buffer_factory() self._conn_lost = 0 # Set when call to connection_lost scheduled. self._closing = False # Set when close() called. + self._paused = False # Set when pause_reading() called + if self._server is not None: self._server._attach() loop._transports[self._sock_fd] = self @@ -828,6 +830,25 @@ def get_protocol(self): def is_closing(self): return self._closing + def is_reading(self): + return not self.is_closing() and not self._paused + + def pause_reading(self): + if not self.is_reading(): + return + self._paused = True + self._loop._remove_reader(self._sock_fd) + if self._loop.get_debug(): + logger.debug("%r pauses reading", self) + + def resume_reading(self): + if self._closing or not self._paused: + return + self._paused = False + self._add_reader(self._sock_fd, self._read_ready) + if self._loop.get_debug(): + logger.debug("%r resumes reading", self) + def close(self): if self._closing: return @@ -887,9 +908,8 @@ def get_write_buffer_size(self): return len(self._buffer) def _add_reader(self, fd, callback, *args): - if self._closing: + if not self.is_reading(): return - self._loop._add_reader(fd, callback, *args) @@ -904,7 +924,6 @@ def __init__(self, loop, sock, protocol, waiter=None, self._read_ready_cb = None super().__init__(loop, sock, protocol, extra, server) self._eof = False - self._paused = False self._empty_waiter = None # Disable the Nagle algorithm -- small writes will be @@ -929,25 +948,6 @@ def set_protocol(self, protocol): super().set_protocol(protocol) - def is_reading(self): - return not self._paused and not self._closing - - def pause_reading(self): - if self._closing or self._paused: - return - self._paused = True - self._loop._remove_reader(self._sock_fd) - if self._loop.get_debug(): - logger.debug("%r pauses reading", self) - - def resume_reading(self): - if self._closing or not self._paused: - return - self._paused = False - self._add_reader(self._sock_fd, self._read_ready) - if self._loop.get_debug(): - logger.debug("%r resumes reading", self) - def _read_ready(self): self._read_ready_cb() diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 0495f332f3127a..ac4519acc4307b 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -482,13 +482,21 @@ def __init__(self, loop, pipe, protocol, waiter=None, extra=None): self._loop.call_soon(self._protocol.connection_made, self) # only start reading when connection_made() has been called - self._loop.call_soon(self._loop._add_reader, + self._loop.call_soon(self._add_reader, self._fileno, self._read_ready) if waiter is not None: # only wake up the waiter when connection_made() has been called self._loop.call_soon(futures._set_result_unless_cancelled, waiter, None) + def _add_reader(self, fd, callback): + if not self.is_reading(): + return + self._loop._add_reader(fd, callback) + + def is_reading(self): + return not self._paused and not self._closing + def __repr__(self): info = [self.__class__.__name__] if self._pipe is None: @@ -529,7 +537,7 @@ def _read_ready(self): self._loop.call_soon(self._call_connection_lost, None) def pause_reading(self): - if self._closing or self._paused: + if not self.is_reading(): return self._paused = True self._loop._remove_reader(self._fileno) diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py index ae30185cef776a..44fc7706abee1d 100644 --- a/Lib/test/test_asyncio/test_proactor_events.py +++ b/Lib/test/test_asyncio/test_proactor_events.py @@ -444,6 +444,19 @@ def monkey(): self.assertFalse(tr.is_reading()) + def test_pause_reading_connection_made(self): + tr = self.socket_transport() + self.protocol.connection_made.side_effect = lambda _: tr.pause_reading() + test_utils.run_briefly(self.loop) + self.assertFalse(tr.is_reading()) + self.loop.assert_no_reader(7) + + tr.resume_reading() + self.assertTrue(tr.is_reading()) + + tr.close() + self.assertFalse(tr.is_reading()) + def pause_writing_transport(self, high): tr = self.socket_transport() diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index 22dcfb23083522..fcab26278235e0 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -534,6 +534,22 @@ def test_pause_resume_reading(self): self.assertFalse(tr.is_reading()) self.loop.assert_no_reader(7) + def test_pause_reading_connection_made(self): + tr = self.socket_transport() + self.protocol.connection_made.side_effect = lambda _: tr.pause_reading() + test_utils.run_briefly(self.loop) + self.assertFalse(tr.is_reading()) + self.loop.assert_no_reader(7) + + tr.resume_reading() + self.assertTrue(tr.is_reading()) + self.loop.assert_reader(7, tr._read_ready) + + tr.close() + self.assertFalse(tr.is_reading()) + self.loop.assert_no_reader(7) + + def test_read_eof_received_error(self): transport = self.socket_transport() transport.close = mock.Mock() diff --git a/Misc/NEWS.d/next/Core and Builtins/2019-12-01-12-58-31.bpo-31821.1FNmwk.rst b/Misc/NEWS.d/next/Core and Builtins/2019-12-01-12-58-31.bpo-31821.1FNmwk.rst new file mode 100644 index 00000000000000..13c054fdd68276 --- /dev/null +++ b/Misc/NEWS.d/next/Core and Builtins/2019-12-01-12-58-31.bpo-31821.1FNmwk.rst @@ -0,0 +1 @@ +Fix :func:`!pause_reading` to work when called from :func:`!connection_made` in :mod:`asyncio`.