Skip to content

Commit

Permalink
Issue #23198: Reactor asyncio.StreamReader
Browse files Browse the repository at this point in the history
- Add a new _wakeup_waiter() method
- Replace _create_waiter() method with a _wait_for_data() coroutine function
- Use the value None instead of True or False to wake up the waiter
  • Loading branch information
vstinner committed Jan 13, 2015
1 parent 231b404 commit c2c12e4
Showing 1 changed file with 22 additions and 25 deletions.
47 changes: 22 additions & 25 deletions Lib/asyncio/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,8 @@ def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
else:
self._loop = loop
self._buffer = bytearray()
self._eof = False # Whether we're done.
self._waiter = None # A future.
self._eof = False # Whether we're done.
self._waiter = None # A future used by _wait_for_data()
self._exception = None
self._transport = None
self._paused = False
Expand All @@ -331,6 +331,14 @@ def set_exception(self, exc):
if not waiter.cancelled():
waiter.set_exception(exc)

def _wakeup_waiter(self):
"""Wakeup read() or readline() function waiting for data or EOF."""
waiter = self._waiter
if waiter is not None:
self._waiter = None
if not waiter.cancelled():
waiter.set_result(None)

def set_transport(self, transport):
assert self._transport is None, 'Transport already set'
self._transport = transport
Expand All @@ -342,11 +350,7 @@ def _maybe_resume_transport(self):

def feed_eof(self):
self._eof = True
waiter = self._waiter
if waiter is not None:
self._waiter = None
if not waiter.cancelled():
waiter.set_result(True)
self._wakeup_waiter()

def at_eof(self):
"""Return True if the buffer is empty and 'feed_eof' was called."""
Expand All @@ -359,12 +363,7 @@ def feed_data(self, data):
return

self._buffer.extend(data)

waiter = self._waiter
if waiter is not None:
self._waiter = None
if not waiter.cancelled():
waiter.set_result(False)
self._wakeup_waiter()

if (self._transport is not None and
not self._paused and
Expand All @@ -379,15 +378,21 @@ def feed_data(self, data):
else:
self._paused = True

def _create_waiter(self, func_name):
def _wait_for_data(self, func_name):
"""Wait until feed_data() or feed_eof() is called."""
# StreamReader uses a future to link the protocol feed_data() method
# to a read coroutine. Running two read coroutines at the same time
# would have an unexpected behaviour. It would not possible to know
# which coroutine would get the next data.
if self._waiter is not None:
raise RuntimeError('%s() called while another coroutine is '
'already waiting for incoming data' % func_name)
return futures.Future(loop=self._loop)

self._waiter = futures.Future(loop=self._loop)
try:
yield from self._waiter
finally:
self._waiter = None

@coroutine
def readline(self):
Expand Down Expand Up @@ -417,11 +422,7 @@ def readline(self):
break

if not_enough:
self._waiter = self._create_waiter('readline')
try:
yield from self._waiter
finally:
self._waiter = None
yield from self._wait_for_data('readline')

self._maybe_resume_transport()
return bytes(line)
Expand All @@ -448,11 +449,7 @@ def read(self, n=-1):
return b''.join(blocks)
else:
if not self._buffer and not self._eof:
self._waiter = self._create_waiter('read')
try:
yield from self._waiter
finally:
self._waiter = None
yield from self._wait_for_data('read')

if n < 0 or len(self._buffer) <= n:
data = bytes(self._buffer)
Expand Down

0 comments on commit c2c12e4

Please sign in to comment.