Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
name: Release python package

on:
push:
tags:
- "*"
release:
types:
- released

jobs:
deploy:
runs-on: ubuntu-latest
permissions:
id-token: write
steps:
- uses: actions/checkout@v2
- name: Install poetry
run: pipx install poetry
- name: Set up Python
uses: actions/setup-python@v4
- uses: actions/checkout@v5
with:
python-version: "3.11"
- name: Install deps
run: poetry install
- name: Set verison
run: poetry version "${{ github.ref_name }}"
persist-credentials: false
- uses: astral-sh/setup-uv@v7
with:
enable-cache: false
python-version: "3.12"
version: "latest"
- run: uv version "${GITHUB_REF_NAME}"
- run: uv build
- name: Release package
env:
POETRY_PYPI_TOKEN_PYPI: ${{ secrets.PYPI_TOKEN }}
run: poetry publish --build
UV_PUBLISH_TOKEN: ${{ secrets.PYPI_API_TOKEN }}
run: uv publish
60 changes: 36 additions & 24 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -1,29 +1,39 @@
name: Testing taskiq-aio-pika

on: [push, pull_request]
on:
pull_request:
paths-ignore:
- '*.md'
push:
paths-ignore:
- '*.md'

permissions:
actions: read
contents: read
pull-requests: read

jobs:
lint:
strategy:
matrix:
cmd:
- black
- ruff
- mypy
cmd: ["black", "ruff", "mypy"]
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Install poetry
run: pipx install poetry
- name: Set up Python
uses: actions/setup-python@v4
- uses: actions/checkout@v5
with:
persist-credentials: false
- id: setup-uv
uses: astral-sh/setup-uv@v7
with:
python-version: "3.11"
cache: "poetry"
enable-cache: true
cache-suffix: 3.11
version: "latest"
python-version: 3.11
- name: Install deps
run: poetry install
run: uv sync --all-extras
- name: Run lint check
run: poetry run pre-commit run -a ${{ matrix.cmd }}
run: uv run pre-commit run -a ${{ matrix.cmd }}
pytest:
services:
rabbit:
Expand All @@ -41,18 +51,20 @@ jobs:
- 5672:5672
strategy:
matrix:
py_version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
py_version: ["3.10", "3.11", "3.12", "3.13"]
runs-on: "ubuntu-latest"
steps:
- uses: actions/checkout@v2
- name: Install poetry
run: pipx install poetry
- name: Set up Python
uses: actions/setup-python@v4
- uses: actions/checkout@v5
with:
persist-credentials: false
- id: setup-uv
uses: astral-sh/setup-uv@v7
with:
python-version: "${{ matrix.py_version }}"
cache: "poetry"
enable-cache: true
cache-suffix: ${{ matrix.py_version }}
version: "latest"
python-version: ${{ matrix.py_version }}
- name: Install deps
run: poetry install
run: uv sync --all-extras
- name: Run pytest check
run: poetry run pytest -vv -n auto --cov="taskiq_aio_pika" .
run: uv run pytest -vv -n auto --cov="taskiq_aio_pika" .
25 changes: 19 additions & 6 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,42 @@
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.4.0
rev: v6.0.0
hooks:
- id: check-ast
- id: trailing-whitespace
- id: check-toml
- id: end-of-file-fixer

- repo: https://github.com/asottile/add-trailing-comma
rev: v2.1.0
rev: v4.0.0
hooks:
- id: add-trailing-comma

- repo: https://github.com/crate-ci/typos
rev: v1.38.1
hooks:
- id: typos

- repo: https://github.com/Yelp/detect-secrets
rev: v1.5.0
hooks:
- id: detect-secrets
args: [
'--exclude-lines', 'amqp://guest:guest@localhost:5672',
]

- repo: local
hooks:
- id: black
name: Format with Black
entry: poetry run black
entry: uv run black
language: system
types: [python]

- id: ruff
name: Run ruff lints
entry: poetry run ruff
entry: uv run ruff
language: system
pass_filenames: false
types: [python]
Expand All @@ -36,10 +49,10 @@ repos:

- id: mypy
name: Validate types with MyPy
entry: poetry run mypy
entry: uv run mypy
language: system
types: [python]
pass_filenames: false
types: [python]
args:
- ./taskiq_aio_pika
- ./tests
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
MIT License

Copyright (c) 2022-2024 Pavel Kirilin
Copyright (c) 2022-2025 Pavel Kirilin

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
63 changes: 28 additions & 35 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
# AioPika broker for taskiq

[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/taskiq-aio-pika?style=for-the-badge)](https://pypi.org/project/taskiq-aio-pika/)
[![PyPI](https://img.shields.io/pypi/v/taskiq-aio-pika?style=for-the-badge)](https://pypi.org/project/taskiq-aio-pika/)
[![PyPI - Downloads](https://img.shields.io/pypi/dm/taskiq-aio-pika?style=for-the-badge)](https://pypistats.org/packages/taskiq-aio-pika)

This library provides you with aio-pika broker for taskiq.

Usage:
Features:
- Supports delayed messages using dead-letter queues or RabbitMQ delayed message exchange plugin.
- Supports message priorities.

Usage example:

```python
from taskiq_aio_pika import AioPikaBroker

Expand All @@ -14,19 +23,13 @@ async def test() -> None:

```

## Non-obvious things

You can send delayed messages and set priorities to messages using labels.

## Delays

### **Default retries**
### Default delays

To send delayed message, you have to specify
delay label. You can do it with `task` decorator,
or by using kicker.
In this type of delay we are using additional queue with `expiration` parameter and after with time message will be deleted from `delay` queue and sent to the main taskiq queue.
For example:
To send delayed message, you have to specify delay label. You can do it with `task` decorator, or by using kicker.

In this type of delay we are using additional queue with `expiration` parameter. After declared time message will be deleted from `delay` queue and sent to the main queue. For example:

```python
broker = AioPikaBroker()
Expand All @@ -42,7 +45,7 @@ async def main():
await delayed_task.kiq()

# This message is going to be received after the delay in 4 seconds.
# Since we overriden the `delay` label using kicker.
# Since we overridden the `delay` label using kicker.
await delayed_task.kicker().with_labels(delay=4).kiq()

# This message is going to be send immediately. Since we deleted the label.
Expand All @@ -52,16 +55,13 @@ async def main():
# have to wait delay period before message is going to be sent.
```

### **Retries with `rabbitmq-delayed-message-exchange` plugin**
### Delays with `rabbitmq-delayed-message-exchange` plugin

To send delayed message you can install `rabbitmq-delayed-message-exchange`
plugin https://github.com/rabbitmq/rabbitmq-delayed-message-exchange.
First of all please make sure that your RabbitMQ server has [rabbitmq-delayed-message-exchange plugin](https://github.com/rabbitmq/rabbitmq-delayed-message-exchange) installed.

And you need to configure you broker.
There is `delayed_message_exchange_plugin` `AioPikaBroker` parameter and it must be `True` to turn on delayed message functionality.
Also you need to configure you broker by passing `delayed_message_exchange_plugin=True` to broker.

The delay plugin can handle tasks with different delay times well, and the delay based on dead letter queue is suitable for tasks with the same delay time.
For example:
This plugin can handle tasks with different delay times well, and the delay based on dead letter queue is suitable for tasks with the same delay time. For example:

```python
broker = AioPikaBroker(
Expand All @@ -79,22 +79,18 @@ async def main():
await delayed_task.kiq()

# This message is going to be received after the delay in 4 seconds.
# Since we overriden the `delay` label using kicker.
# Since we overridden the `delay` label using kicker.
await delayed_task.kicker().with_labels(delay=4).kiq()
```

## Priorities

You can define priorities for messages using `priority` label.
Messages with higher priorities are delivered faster.
But to use priorities you need to define `max_priority` of the main queue, by passing `max_priority` parameter in broker's init.
This parameter sets maximum priority for the queue and
declares it as the prority queue.
You can define priorities for messages using `priority` label. Messages with higher priorities are delivered faster.
But to use priorities you need to define `max_priority` of the main queue, by passing `max_priority` parameter in broker's init. This parameter sets maximum priority for the queue and declares it as the priority queue.

Before doing so please read the [documentation](https://www.rabbitmq.com/priority.html#behaviour) about what
downsides you get by using prioritized queues.


```python
broker = AioPikaBroker(max_priority=10)

Expand All @@ -113,12 +109,12 @@ async def main():

# This message is going to have priority 0.
await prio_task.kicker().with_labels(priority=None).kiq()

```

## Configuration

AioPikaBroker parameters:

* `url` - url to rabbitmq. If None, "amqp://guest:guest@localhost:5672" is used.
* `result_backend` - custom result backend.
* `task_id_generator` - custom task_id genertaor.
Expand All @@ -128,22 +124,19 @@ AioPikaBroker parameters:
* `routing_key` - that used to bind that queue to the exchange.
* `declare_exchange` - whether you want to declare new exchange if it doesn't exist.
* `max_priority` - maximum priority for messages.
* `delay_queue_name` - custom delay queue name.
This queue is used to deliver messages with delays.
* `delay_queue_name` - custom delay queue name. This queue is used to deliver messages with delays.
* `dead_letter_queue_name` - custom dead letter queue name.
This queue is used to receive negatively acknowleged messages from the main queue.
This queue is used to receive negatively acknowledged messages from the main queue.
* `qos` - number of messages that worker can prefetch.
* `declare_queues` - whether you want to declare queues even on
client side. May be useful for message persistance.
* `declare_queues` - whether you want to declare queues even on client side. May be useful for message persistence.
* `declare_queues_kwargs` - see [Custom Queue Arguments](#custom-queue-arguments) for more details.

## Custom Queue Arguments

You can pass custom arguments to the underlying RabbitMQ queue declaration by using the `declare_queues_kwargs` parameter of `AioPikaBroker`. If you want to set specific queue arguments (such as RabbitMQ extensions or custom behaviors), provide them in the `arguments` dictionary inside `declare_queues_kwargs`.

These arguments will be merged with the default arguments used by the broker (such as dead-lettering and priority settings).

**Example:**
These arguments will be merged with the default arguments used by the broker
(such as dead-lettering and priority settings). If there are any conflicts, the values you provide will take precedence over the broker's defaults. Example:

```python
broker = AioPikaBroker(
Expand Down
17 changes: 17 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
services:
rabbitmq:
container_name: taskiq_aio_pika_rabbitmq
image: heidiks/rabbitmq-delayed-message-exchange:latest
environment:
RABBITMQ_DEFAULT_USER: "guest"
RABBITMQ_DEFAULT_PASS: "guest"
RABBITMQ_DEFAULT_VHOST: "/"
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "check_running", "-q"]
interval: 10s
timeout: 5s
retries: 8
ports:
- "5672:5672"
- "15672:15672"
- "61613:61613"
Loading