Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-32356: idempotent pause_/resume_reading; new is_reading method. #4914

Merged
merged 2 commits into from
Dec 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Doc/library/asyncio-protocol.rst
Original file line number Diff line number Diff line change
Expand Up @@ -118,17 +118,31 @@ ReadTransport

Interface for read-only transports.

.. method:: is_reading()

Return ``True`` if the transport is receiving new data.

.. versionadded:: 3.7

.. method:: pause_reading()

Pause the receiving end of the transport. No data will be passed to
the protocol's :meth:`data_received` method until :meth:`resume_reading`
is called.

.. versionchanged:: 3.7
The method is idempotent, i.e. it can be called when the
transport is already paused or closed.

.. method:: resume_reading()

Resume the receiving end. The protocol's :meth:`data_received` method
will be called once again if some data is available for reading.

.. versionchanged:: 3.7
The method is idempotent, i.e. it can be called when the
transport is already reading.


WriteTransport
--------------
Expand Down
15 changes: 7 additions & 8 deletions Lib/asyncio/proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,21 +152,20 @@ def __init__(self, loop, sock, protocol, waiter=None,
self._paused = False
self._loop.call_soon(self._loop_reading)

def is_reading(self):
return not self._paused and not self._closing

def pause_reading(self):
if self._closing:
raise RuntimeError('Cannot pause_reading() when closing')
if self._paused:
raise RuntimeError('Already paused')
if self._closing or self._paused:
return
self._paused = True
if self._loop.get_debug():
logger.debug("%r pauses reading", self)

def resume_reading(self):
if not self._paused:
raise RuntimeError('Not paused')
self._paused = False
if self._closing:
if self._closing or not self._paused:
return
self._paused = False
self._loop.call_soon(self._loop_reading, self._read_fut)
if self._loop.get_debug():
logger.debug("%r resumes reading", self)
Expand Down
15 changes: 7 additions & 8 deletions Lib/asyncio/selector_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,22 +702,21 @@ def __init__(self, loop, sock, protocol, waiter=None,
self._loop.call_soon(futures._set_result_unless_cancelled,
waiter, None)

def is_reading(self):
return not self._paused and not self._closing

def pause_reading(self):
if self._closing:
raise RuntimeError('Cannot pause_reading() when closing')
if self._paused:
raise RuntimeError('Already paused')
if self._closing or self._paused:
return
self._paused = True
self._loop._remove_reader(self._sock_fd)
if self._loop.get_debug():
logger.debug("%r pauses reading", self)

def resume_reading(self):
if not self._paused:
raise RuntimeError('Not paused')
self._paused = False
if self._closing:
if self._closing or not self._paused:
return
self._paused = False
self._loop._add_reader(self._sock_fd, self._read_ready)
if self._loop.get_debug():
logger.debug("%r resumes reading", self)
Expand Down
6 changes: 6 additions & 0 deletions Lib/asyncio/sslproto.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,12 @@ def __del__(self):
source=self)
self.close()

def is_reading(self):
tr = self._ssl_protocol._transport
if tr is None:
raise RuntimeError('SSL transport has not been initialized yet')
return tr.is_reading()

def pause_reading(self):
"""Pause the receiving end.

Expand Down
4 changes: 4 additions & 0 deletions Lib/asyncio/transports.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ def get_protocol(self):
class ReadTransport(BaseTransport):
"""Interface for read-only transports."""

def is_reading(self):
"""Return True if the transport is receiving."""
raise NotImplementedError

def pause_reading(self):
"""Pause the receiving end.

Expand Down
10 changes: 10 additions & 0 deletions Lib/test/test_asyncio/test_proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,26 +334,36 @@ def test_pause_resume_reading(self):
f = asyncio.Future(loop=self.loop)
f.set_result(msg)
futures.append(f)

self.loop._proactor.recv.side_effect = futures
self.loop._run_once()
self.assertFalse(tr._paused)
self.assertTrue(tr.is_reading())
self.loop._run_once()
self.protocol.data_received.assert_called_with(b'data1')
self.loop._run_once()
self.protocol.data_received.assert_called_with(b'data2')

tr.pause_reading()
tr.pause_reading()
self.assertTrue(tr._paused)
self.assertFalse(tr.is_reading())
for i in range(10):
self.loop._run_once()
self.protocol.data_received.assert_called_with(b'data2')

tr.resume_reading()
tr.resume_reading()
self.assertFalse(tr._paused)
self.assertTrue(tr.is_reading())
self.loop._run_once()
self.protocol.data_received.assert_called_with(b'data3')
self.loop._run_once()
self.protocol.data_received.assert_called_with(b'data4')
tr.close()

self.assertFalse(tr.is_reading())


def pause_writing_transport(self, high):
tr = self.socket_transport()
Expand Down
28 changes: 25 additions & 3 deletions Lib/test/test_asyncio/test_selector_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,23 @@ def test_make_ssl_transport(self):
with test_utils.disable_logger():
transport = self.loop._make_ssl_transport(
m, asyncio.Protocol(), m, waiter)

with self.assertRaisesRegex(RuntimeError,
r'SSL transport.*not.*initialized'):
transport.is_reading()

# execute the handshake while the logger is disabled
# to ignore SSL handshake failure
test_utils.run_briefly(self.loop)

self.assertTrue(transport.is_reading())
transport.pause_reading()
transport.pause_reading()
self.assertFalse(transport.is_reading())
transport.resume_reading()
transport.resume_reading()
self.assertTrue(transport.is_reading())

# Sanity check
class_name = transport.__class__.__name__
self.assertIn("ssl", class_name.lower())
Expand Down Expand Up @@ -894,15 +907,24 @@ def test_pause_resume_reading(self):
tr = self.socket_transport()
test_utils.run_briefly(self.loop)
self.assertFalse(tr._paused)
self.assertTrue(tr.is_reading())
self.loop.assert_reader(7, tr._read_ready)

tr.pause_reading()
tr.pause_reading()
self.assertTrue(tr._paused)
self.assertFalse(7 in self.loop.readers)
self.assertFalse(tr.is_reading())
self.loop.assert_no_reader(7)

tr.resume_reading()
tr.resume_reading()
self.assertFalse(tr._paused)
self.assertTrue(tr.is_reading())
self.loop.assert_reader(7, tr._read_ready)
with self.assertRaises(RuntimeError):
tr.resume_reading()

tr.close()
self.assertFalse(tr.is_reading())
self.loop.assert_no_reader(7)

def test_read_ready(self):
transport = self.socket_transport()
Expand Down
17 changes: 12 additions & 5 deletions Lib/test/test_asyncio/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,12 +327,19 @@ def _remove_reader(self, fd):
return False

def assert_reader(self, fd, callback, *args):
assert fd in self.readers, 'fd {} is not registered'.format(fd)
if fd not in self.readers:
raise AssertionError(f'fd {fd} is not registered')
handle = self.readers[fd]
assert handle._callback == callback, '{!r} != {!r}'.format(
handle._callback, callback)
assert handle._args == args, '{!r} != {!r}'.format(
handle._args, args)
if handle._callback != callback:
raise AssertionError(
f'unexpected callback: {handle._callback} != {callback}')
if handle._args != args:
raise AssertionError(
f'unexpected callback args: {handle._args} != {args}')

def assert_no_reader(self, fd):
if fd in self.readers:
raise AssertionError(f'fd {fd} is registered')

def _add_writer(self, fd, callback, *args):
self.writers[fd] = events.Handle(callback, args, self)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
asyncio.transport.resume_reading() and pause_reading() are now idempotent.
New transport.is_reading() method is added.