Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions packages/smithy-core/src/smithy_core/aio/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# SPDX-License-Identifier: Apache-2.0
import asyncio
from asyncio import iscoroutinefunction
from collections import deque
from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Callable
from io import BytesIO
from typing import Any, Self, cast
Expand Down Expand Up @@ -297,10 +298,9 @@ def __init__(
Calls to ``write`` will block until the number of chunks is less than this
number. Default is 16.
"""
self._data = deque[bytes]()
if intial_data is not None:
self._data = [intial_data]
else:
self._data = []
self._data.append(intial_data)

if max_buffered_chunks < 1:
raise ValueError(
Expand Down Expand Up @@ -419,7 +419,7 @@ async def __anext__(self) -> bytes:

# Pop the next chunk of data from the buffer, then notify any waiting
# coroutines, returning immediately after.
result = self._data.pop()
result = self._data.popleft()
self._data_condition.notify()
return result

Expand Down
7 changes: 4 additions & 3 deletions packages/smithy-core/tests/unit/aio/test_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,12 +359,13 @@ async def test_provider_reads_written_data() -> None:
# Start the read task in the background.
read_task = asyncio.create_task(drain_provider(provider, result))
await provider.write(b"foo")
await provider.write(b"bar")

# Wait for the buffer to drain. At that point all the data should
# be read, but the read task won't actually be complete yet
# because it's still waiting on future data.
await provider.flush()
assert result == [b"foo"]
assert result == [b"foo", b"bar"]
assert not read_task.done()

# Now actually close the provider, which will let the read task
Expand All @@ -373,7 +374,7 @@ async def test_provider_reads_written_data() -> None:
await read_task

# The result should not have changed
assert result == [b"foo"]
assert result == [b"foo", b"bar"]


async def test_close_stops_writes() -> None:
Expand All @@ -393,7 +394,7 @@ async def test_close_without_flush_deletes_buffered_data() -> None:
# We weren't able to read data, which is what we want. But here we dig into
# the internals to be sure that the buffer is clear and no data is haning
# around.
assert provider._data == [] # type: ignore
assert len(provider._data) == 0 # type: ignore


async def test_only_max_chunks_buffered() -> None:
Expand Down