Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 30 additions & 7 deletions Lib/asyncio/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,29 @@ def _maybe_resume_transport(self):
self._paused = False
self._transport.resume_reading()

def _consume_buffer(self, n=None):
"""Take *n* bytes from the buffer (all bytes when *n* is ``None``).

Returns a :class:`bytes` object. Uses ``bytearray.take_bytes()``
when possible, but falls back to a non-mutating replacement when
a ``BufferError`` is raised due to active memoryview exports.
"""
try:
if n is None:
return self._buffer.take_bytes()
return self._buffer.take_bytes(n)
except BufferError:
# A memoryview held by a caller (e.g. an async database driver)
# prevents in-place resize. Fall back to a copy-and-replace
# strategy that does not mutate the exported object.
if n is None:
data = bytes(self._buffer)
self._buffer = bytearray()
else:
data = bytes(self._buffer[:n])
self._buffer = self._buffer[n:]
return data

def feed_eof(self):
self._eof = True
self._wakeup_waiter()
Expand Down Expand Up @@ -562,9 +585,9 @@ async def readline(self):
return e.partial
except exceptions.LimitOverrunError as e:
if self._buffer.startswith(sep, e.consumed):
del self._buffer[:e.consumed + seplen]
self._consume_buffer(e.consumed + seplen)
else:
self._buffer.clear()
self._consume_buffer()
self._maybe_resume_transport()
raise ValueError(e.args[0])
return line
Expand Down Expand Up @@ -667,7 +690,7 @@ async def readuntil(self, separator=b'\n'):
# adds data which makes separator be found. That's why we check for
# EOF *after* inspecting the buffer.
if self._eof:
chunk = self._buffer.take_bytes()
chunk = self._consume_buffer()
raise exceptions.IncompleteReadError(chunk, None)

# _wait_for_data() will resume reading if stream was paused.
Expand All @@ -677,7 +700,7 @@ async def readuntil(self, separator=b'\n'):
raise exceptions.LimitOverrunError(
'Separator is found, but chunk is longer than limit', match_start)

chunk = self._buffer.take_bytes(match_end)
chunk = self._consume_buffer(match_end)
self._maybe_resume_transport()
return chunk

Expand Down Expand Up @@ -723,7 +746,7 @@ async def read(self, n=-1):
await self._wait_for_data('read')

# This will work right even if buffer is less than n bytes
data = self._buffer.take_bytes(min(len(self._buffer), n))
data = self._consume_buffer(min(len(self._buffer), n))

self._maybe_resume_transport()
return data
Expand Down Expand Up @@ -754,12 +777,12 @@ async def readexactly(self, n):

while len(self._buffer) < n:
if self._eof:
incomplete = self._buffer.take_bytes()
incomplete = self._consume_buffer()
raise exceptions.IncompleteReadError(incomplete, n)

await self._wait_for_data('readexactly')

data = self._buffer.take_bytes(n)
data = self._consume_buffer(n)
self._maybe_resume_transport()
return data

Expand Down
88 changes: 88 additions & 0 deletions Lib/test/test_asyncio/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -1257,6 +1257,94 @@ async def main():
main_coro = main()
asyncio.run(main_coro)

# --- BufferError regression tests (gh-146379) ---
# StreamReader must not raise BufferError when a memoryview is held
# on the internal buffer, e.g. by an async database driver.

def test_readexactly_with_active_memoryview(self):
"""readexactly must succeed even when a memoryview is active."""
async def go():
reader = asyncio.StreamReader()
reader.feed_data(b"AABBCCDD")
mv = memoryview(reader._buffer)
data = await reader.readexactly(4)
self.assertEqual(data, b"AABB")
self.assertEqual(bytes(reader._buffer), b"CCDD")
mv.release()
asyncio.run(go())

def test_read_with_active_memoryview(self):
"""read(n) must succeed even when a memoryview is active."""
async def go():
reader = asyncio.StreamReader()
reader.feed_data(b"HELLO")
mv = memoryview(reader._buffer)
data = await reader.read(3)
self.assertEqual(data, b"HEL")
self.assertEqual(bytes(reader._buffer), b"LO")
mv.release()
asyncio.run(go())

def test_readline_with_active_memoryview(self):
"""readline must succeed even when a memoryview is active."""
async def go():
reader = asyncio.StreamReader()
reader.feed_data(b"line1\nline2\n")
mv = memoryview(reader._buffer)
data = await reader.readline()
self.assertEqual(data, b"line1\n")
mv.release()
asyncio.run(go())

def test_readuntil_with_active_memoryview(self):
"""readuntil must succeed even when a memoryview is active."""
async def go():
reader = asyncio.StreamReader()
reader.feed_data(b"payload|rest")
mv = memoryview(reader._buffer)
data = await reader.readuntil(b"|")
self.assertEqual(data, b"payload|")
self.assertEqual(bytes(reader._buffer), b"rest")
mv.release()
asyncio.run(go())

def test_readexactly_eof_with_active_memoryview(self):
"""readexactly at EOF must not raise BufferError."""
async def go():
reader = asyncio.StreamReader()
reader.feed_data(b"short")
reader.feed_eof()
mv = memoryview(reader._buffer)
with self.assertRaises(asyncio.IncompleteReadError) as cm:
await reader.readexactly(100)
self.assertEqual(cm.exception.partial, b"short")
mv.release()
asyncio.run(go())

def test_readuntil_eof_with_active_memoryview(self):
"""readuntil at EOF must not raise BufferError."""
async def go():
reader = asyncio.StreamReader()
reader.feed_data(b"no separator")
reader.feed_eof()
mv = memoryview(reader._buffer)
with self.assertRaises(asyncio.IncompleteReadError) as cm:
await reader.readuntil(b"|")
self.assertEqual(cm.exception.partial, b"no separator")
mv.release()
asyncio.run(go())

def test_readline_limit_overrun_with_active_memoryview(self):
"""readline over limit with active memoryview must not raise BufferError."""
async def go():
reader = asyncio.StreamReader(limit=5)
reader.feed_data(b"x" * 6 + b"\n")
mv = memoryview(reader._buffer)
with self.assertRaises(ValueError):
await reader.readline()
mv.release()
asyncio.run(go())


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Fix :class:`asyncio.StreamReader` raising :exc:`BufferError` when a
:class:`memoryview` is held on the internal buffer. The ``read``,
``readline``, ``readuntil``, and ``readexactly`` methods now fall back
to non-mutating buffer replacement when in-place resize is not possible.
Loading