Skip to content

Commit

Permalink
Drop python3.8, python3.9 support, bump anyio to 4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
tkukushkin committed Oct 18, 2023
1 parent 4da9549 commit 17f498a
Show file tree
Hide file tree
Showing 23 changed files with 215 additions and 157 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.8', '3.9', '3.10']
python-version: ['3.10', '3.11', '3.12']
include:
- python-version: '3.8'
tox_python_version: '38'
- python-version: '3.9'
tox_python_version: '39'
- python-version: '3.10'
tox_python_version: '310'
- python-version: '3.11'
tox_python_version: '311'
- python-version: '3.12'
tox_python_version: '312'
env:
TOXENV: tests_py${{ matrix.tox_python_version }}

Expand Down
6 changes: 0 additions & 6 deletions pyproject.toml

This file was deleted.

4 changes: 1 addition & 3 deletions requirements/lint.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
black
flake8
isort
ruff
mypy
pyright
70 changes: 70 additions & 0 deletions ruff.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
line-length = 120
src = ["src", "."]
select = [
"F", # Pyflakes
"E", # Pycodestyle Error
"W", # PycodeStyle Warning
"I", # Isort
"N", # pep8-naming
"UP", # pyupgrade
"YTT", # flake8-2020
"B", # flake8-bugbear
"C4", # flake8-comprehensions
"DTZ", # flake8-datetimez
"T10", # flake8-debugger
"ISC", # flake8-implicit-str-concat
"G", # flake8-logging-format
"PIE", # flake8-pie
"T20", # flake8-print
"PYI", # flake8-pyi
"Q", # flake8-quotes
"RSE", # flake8-raise
"RET", # flake8-return
"SIM", # flake8-simplify
"TID", # flake8-tidy-imports
"PTH", # flake8-use-pathlib
"TD", # flake8-todos
"PGH", # pygrep-hooks
"PL", # Pylint
"TRY", # tryceratops
"RUF", # Ruff-specific rules
]
ignore = [
"B005", # strip-with-multi-characters
"B905", # zip-without-explicit-strict
"N818", # error-suffix-on-exception-name
"Q003", # avoidable-escaped-quote
"SIM108", # if-else-block-instead-of-if-exp
"SIM212", # if-expr-with-twisted-arms
"PTH123", # builtin-open
"TD002", # missing-todo-author
"PLR0911", # too-many-return-statements
"PLR0912", # too-many-branches
"PLR0913", # too-many-arguments
"PLR0915", # too-many-statements
"PLR2004", # magic-value-comparison
"PLW2901", # redefined-loop-name
"SIM300", # yoda-conditions
"TID252", # relative-imports
"PGH004", # blanket-noqa
"TRY002", # raise-vanilla-class
"TRY003", # raise-vanilla-args
"TRY200", # reraise-no-cause
"RUF001", # ambiguous-unicode-character-string
"RUF002", # ambiguous-unicode-character-docstring
"RUF003", # ambiguous-unicode-character-comment
"RUF005", # collection-literal-concatenation
"RUF006", # asyncio-dangling-task
"RUF012", # mutable-class-default
"RET504", # unnecessary-assign
]
target-version = "py310"

[per-file-ignores]
"tests*/**/*.py" = [
"N", # pep8-naming
]

[isort]
combine-as-imports = true
no-lines-before = ["local-folder"]
6 changes: 0 additions & 6 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
[bdist_wheel]
universal = 1

[tool:pytest]
asyncio_mode = auto
markers =
Expand All @@ -12,6 +9,3 @@ exclude_lines =
pragma: no cover
raise NotImplementedError
pass

[flake8]
max-line-length = 120
17 changes: 7 additions & 10 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,25 @@
import os
from pathlib import Path

from setuptools import find_packages, setup

this_directory = os.path.abspath(os.path.dirname(__file__))
with open(os.path.join(this_directory, "README.md"), encoding="utf-8") as f:
long_description = f.read()

setup(
name="aioamqp-consumer-best",
version="2.3.0",
python_requires="~=3.8",
python_requires=">=3.10",
url="https://github.com/tkukushkin/aioamqp-consumer-best",
author="Timofey Kukushkin",
author_email="tima@kukushkin.me",
description="Consumer utility for AMQP",
long_description=long_description,
long_description=(Path(__file__).parent / "README.md").read_text(encoding="utf-8"),
long_description_content_type="text/markdown",
license="MIT",
packages=find_packages("src"),
package_dir={"": "src"},
include_package_data=True,
install_requires=[
"aioamqp",
"anyio>=3.4",
"anyio>=4",
"exceptiongroup>=1.1.3",
],
classifiers=[
"Development Status :: 4 - Beta",
Expand All @@ -32,9 +29,9 @@
"Operating System :: OS Independent",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Software Development :: Libraries :: Python Modules",
"Typing :: Typed",
],
Expand Down
13 changes: 7 additions & 6 deletions src/aioamqp_consumer_best/_connect.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import asyncio
from asyncio import Future
from collections.abc import AsyncGenerator, Mapping
from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator, Dict, Mapping, Optional, Tuple
from typing import Any

import aioamqp
from aioamqp import AioamqpException, AmqpProtocol
Expand All @@ -14,16 +15,16 @@
async def connect(
connection_params: ConnectionParams,
*,
heartbeat_interval: Optional[int] = 60,
client_properties: Optional[Mapping[str, Any]] = None,
) -> AsyncGenerator[Tuple[asyncio.Transport, aioamqp.AmqpProtocol, "Future[None]"], None]:
heartbeat_interval: int | None = 60,
client_properties: Mapping[str, Any] | None = None,
) -> AsyncGenerator[tuple[asyncio.Transport, aioamqp.AmqpProtocol, Future[None]], None]:
client_properties = client_properties or {}

kwargs: Dict[str, Any] = {}
kwargs: dict[str, Any] = {}
if heartbeat_interval is not None:
kwargs["heartbeat"] = heartbeat_interval

connection_error_future: "Future[None]" = Future()
connection_error_future: Future[None] = Future()

def on_error(exception: AioamqpException) -> None:
if not connection_error_future.done():
Expand Down
7 changes: 4 additions & 3 deletions src/aioamqp_consumer_best/_helpers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import asyncio
from typing import AsyncIterator, TypeVar
from collections.abc import AsyncIterator
from typing import TypeVar

T = TypeVar("T")
_T = TypeVar("_T")


async def queue_to_iterator(queue: "asyncio.Queue[T]") -> AsyncIterator[T]:
async def queue_to_iterator(queue: asyncio.Queue[_T]) -> AsyncIterator[_T]:
while True:
yield await queue.get()
2 changes: 1 addition & 1 deletion src/aioamqp_consumer_best/_load_balancing_policy.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import abc
from collections.abc import Iterable
from itertools import cycle
from typing import Iterable

from aioamqp_consumer_best.records import ConnectionParams

Expand Down
81 changes: 41 additions & 40 deletions src/aioamqp_consumer_best/base_middlewares.py
Original file line number Diff line number Diff line change
@@ -1,74 +1,75 @@
import asyncio
from datetime import datetime
from typing import AsyncIterator, Awaitable, Callable, Generic, List, Optional, TypeVar
import time
from collections.abc import AsyncIterator, Awaitable, Callable
from typing import Generic, TypeVar

T = TypeVar("T")
U = TypeVar("U")
V = TypeVar("V")
_T = TypeVar("_T")
_U = TypeVar("_U")
_V = TypeVar("_V")


class Middleware(Generic[T, U]):
async def __call__(self, inp: AsyncIterator[T]) -> AsyncIterator[U]: # pragma: no cover
class Middleware(Generic[_T, _U]):
async def __call__(self, inp: AsyncIterator[_T]) -> AsyncIterator[_U]: # pragma: no cover
# ensure function to be generator
empty_list: List[U] = []
empty_list: list[_U] = []
for x in empty_list:
yield x
raise NotImplementedError

def __or__(self, other: "Middleware[U, V]") -> "_Composition[T, U, V]":
def __or__(self, other: "Middleware[_U, _V]") -> "_Composition[_T, _U, _V]":
return _Composition(first=self, second=other)

@staticmethod
def from_callable(func: Callable[[AsyncIterator[T]], AsyncIterator[U]]) -> "_FromCallable[T, U]":
def from_callable(func: Callable[[AsyncIterator[_T]], AsyncIterator[_U]]) -> "_FromCallable[_T, _U]":
return _FromCallable(func)


class _Composition(Middleware[T, V], Generic[T, U, V]):
first: Middleware[T, U]
second: Middleware[U, V]
class _Composition(Middleware[_T, _V], Generic[_T, _U, _V]):
first: Middleware[_T, _U]
second: Middleware[_U, _V]

def __init__(
self,
first: Middleware[T, U],
second: Middleware[U, V],
first: Middleware[_T, _U],
second: Middleware[_U, _V],
) -> None:
self.first = first
self.second = second

async def __call__(self, inp: AsyncIterator[T]) -> AsyncIterator[V]:
async def __call__(self, inp: AsyncIterator[_T]) -> AsyncIterator[_V]:
async for item in self.second(self.first(inp)):
yield item


class _FromCallable(Middleware[T, U]):
def __init__(self, func: Callable[[AsyncIterator[T]], AsyncIterator[U]]) -> None:
class _FromCallable(Middleware[_T, _U]):
def __init__(self, func: Callable[[AsyncIterator[_T]], AsyncIterator[_U]]) -> None:
self._func = func

async def __call__(self, inp: AsyncIterator[T]) -> AsyncIterator[U]:
async def __call__(self, inp: AsyncIterator[_T]) -> AsyncIterator[_U]:
async for item in self._func(inp):
yield item


class ToBulks(Middleware[T, List[T]]):
max_bulk_size: Optional[int]
bulk_timeout: Optional[float]
class ToBulks(Middleware[_T, list[_T]]):
max_bulk_size: int | None
bulk_timeout: float | None

def __init__(self, max_bulk_size: Optional[int] = None, bulk_timeout: Optional[float] = None) -> None:
def __init__(self, max_bulk_size: int | None = None, bulk_timeout: float | None = None) -> None:
assert (
max_bulk_size is not None or bulk_timeout is not None
), "`max_bulk_size` or `bulk_timeout` must be specified"
self.max_bulk_size = max_bulk_size
self.bulk_timeout = bulk_timeout

async def __call__(self, inp: AsyncIterator[T]) -> AsyncIterator[List[T]]:
items: List[T] = []
bulk_start: Optional[datetime] = None
async def __call__(self, inp: AsyncIterator[_T]) -> AsyncIterator[list[_T]]:
items: list[_T] = []
bulk_start: float | None = None
nxt = asyncio.ensure_future(inp.__anext__())
try:
while True:
timeout: Optional[float] = None
timeout: float | None = None
if bulk_start is not None and self.bulk_timeout is not None:
timeout = self.bulk_timeout - (datetime.now() - bulk_start).total_seconds()
timeout = self.bulk_timeout - (time.monotonic() - bulk_start)
try:
item = await asyncio.wait_for(asyncio.shield(nxt), timeout)
except asyncio.TimeoutError:
Expand All @@ -79,7 +80,7 @@ async def __call__(self, inp: AsyncIterator[T]) -> AsyncIterator[List[T]]:
except StopAsyncIteration:
break

bulk_start = bulk_start or datetime.now()
bulk_start = bulk_start or time.monotonic()
items.append(item)
if self.max_bulk_size is not None and len(items) == self.max_bulk_size:
yield items
Expand All @@ -92,37 +93,37 @@ async def __call__(self, inp: AsyncIterator[T]) -> AsyncIterator[List[T]]:
yield items


class Filter(Middleware[T, T]):
def __init__(self, predicate: Callable[[T], Awaitable[bool]]) -> None:
class Filter(Middleware[_T, _T]):
def __init__(self, predicate: Callable[[_T], Awaitable[bool]]) -> None:
self._predicate = predicate

async def __call__(self, inp: AsyncIterator[T]) -> AsyncIterator[T]:
async def __call__(self, inp: AsyncIterator[_T]) -> AsyncIterator[_T]:
async for item in inp:
if await self._predicate(item):
yield item


class Map(Middleware[T, U]):
def __init__(self, func: Callable[[T], Awaitable[U]]) -> None:
class Map(Middleware[_T, _U]):
def __init__(self, func: Callable[[_T], Awaitable[_U]]) -> None:
self._func = func

async def __call__(self, inp: AsyncIterator[T]) -> AsyncIterator[U]:
async def __call__(self, inp: AsyncIterator[_T]) -> AsyncIterator[_U]:
async for item in inp:
yield await self._func(item)


class FilterNones(Middleware[Optional[T], T]):
async def __call__(self, inp: AsyncIterator[Optional[T]]) -> AsyncIterator[T]:
class FilterNones(Middleware[_T | None, _T]):
async def __call__(self, inp: AsyncIterator[_T | None]) -> AsyncIterator[_T]:
async for item in inp:
if item:
yield item


class SkipAll(Middleware[T, None]):
async def __call__(self, inp: AsyncIterator[T]) -> AsyncIterator[None]:
class SkipAll(Middleware[_T, None]):
async def __call__(self, inp: AsyncIterator[_T]) -> AsyncIterator[None]:
async for _ in inp:
pass
# ensure function to be generator
empty_list: List[None] = []
empty_list: list[None] = []
for x in empty_list:
yield x # pragma: no cover
Loading

0 comments on commit 17f498a

Please sign in to comment.