Skip to content

Commit

Permalink
fix: add QueueClosedError
Browse files Browse the repository at this point in the history
  • Loading branch information
phi-friday committed Aug 6, 2023
1 parent d939b88 commit 0f4129b
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 10 deletions.
18 changes: 16 additions & 2 deletions src/async_wrapper/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"QueueError",
"QueueEmptyError",
"QueueFullError",
"QueueClosedError",
"QueueBrokenError",
]

Expand Down Expand Up @@ -39,8 +40,21 @@ class QueueFullError(QueueError):
"""


class QueueClosedError(QueueError):
"""Error that occurs when attempting to get from or put into a closed queue.
This error is different from QueueBrokenError.
:exc:`QueueBrokenError` is an unintended error.
:exc:`QueueClosedError` is an error deliberately raised.
"""


class QueueBrokenError(QueueError):
"""Exception raised when attempting to operate on a closed queue.
"""Error that occurs when trying to get from or put into a closed queue.
This error is different from QueueClosedError.
:exc:`QueueClosedError` is an error deliberately raised.
This exception is raised when trying to get or put an item into a closed queue.
:exc:`QueueBrokenError` is an unintended error.
"""
25 changes: 20 additions & 5 deletions src/async_wrapper/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
from anyio import WouldBlock, create_memory_object_stream, create_task_group, fail_after
from anyio.streams.memory import BrokenResourceError, ClosedResourceError, EndOfStream

from async_wrapper.exception import QueueBrokenError, QueueEmptyError, QueueFullError
from async_wrapper.exception import (
QueueBrokenError,
QueueClosedError,
QueueEmptyError,
QueueFullError,
)

if sys.version_info < (3, 11): # pragma: no cover
from exceptiongroup import ExceptionGroup # type: ignore
Expand Down Expand Up @@ -293,9 +298,9 @@ def clone(self, *, putter: bool = False, getter: bool = False) -> Queue[ValueT]:
def _clone(self, *, putter: bool, getter: bool) -> Queue[ValueT]:
"""create clone of this queue"""
if self._closed:
raise QueueBrokenError("the queue is already closed")
raise QueueClosedError("the queue is already closed")
if not putter and not getter:
raise ValueError("putter and getter are None.")
raise RuntimeError("putter and getter are None.")
_putter = self._putter.clone() if putter else self._putter
_getter = self._getter.clone() if getter else self._getter
new = Queue(stream=(_putter, _getter))
Expand Down Expand Up @@ -343,7 +348,12 @@ def __aiter__(self) -> Self:
async def __anext__(self) -> ValueT:
try:
return await self.aget()
except (EndOfStream, QueueEmptyError, QueueBrokenError) as exc:
except (
EndOfStream,
QueueEmptyError,
QueueBrokenError,
QueueClosedError,
) as exc:
raise StopAsyncIteration from exc

def __iter__(self) -> Self:
Expand All @@ -352,7 +362,12 @@ def __iter__(self) -> Self:
def __next__(self) -> ValueT:
try:
return self.get()
except (EndOfStream, QueueEmptyError, QueueBrokenError) as exc:
except (
EndOfStream,
QueueEmptyError,
QueueBrokenError,
QueueClosedError,
) as exc:
raise StopIteration from exc

def __len__(self) -> int:
Expand Down
6 changes: 3 additions & 3 deletions tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from anyio import CancelScope, create_task_group, fail_after, wait_all_tasks_blocked

from async_wrapper import Queue, create_queue
from async_wrapper.exception import QueueBrokenError
from async_wrapper.exception import QueueBrokenError, QueueClosedError


def test_invalid_max_buffer() -> None:
Expand Down Expand Up @@ -146,7 +146,7 @@ async def test_clone() -> None:
async def test_clone_closed() -> None:
queue: Queue[str] = create_queue(1)
await queue.aclose()
pytest.raises(QueueBrokenError, queue.clone)
pytest.raises(QueueClosedError, queue.clone)


@pytest.mark.anyio()
Expand Down Expand Up @@ -322,7 +322,7 @@ async def test_get(q: Queue[Any]) -> None:
@pytest.mark.anyio()
def test_queue_clone_uset():
queue: Queue[Any] = create_queue(1)
with pytest.raises(ValueError, match="putter and getter are None."):
with pytest.raises(RuntimeError, match="putter and getter are None."):
queue.clone()


Expand Down

0 comments on commit 0f4129b

Please sign in to comment.