Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions tests/test_broker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import asyncio
import uuid

import pytest
from taskiq import AsyncBroker, BrokerMessage

from taskiq_redis.redis_broker import ListQueueBroker, PubSubBroker


async def get_message(broker: AsyncBroker) -> BrokerMessage: # type: ignore
"""
Get a message from the broker.

:param broker: async message broker.
:return: first message from listen method.
"""
async for message in broker.listen(): # noqa: WPS328
return message


@pytest.fixture
def valid_broker_message() -> BrokerMessage:
"""
Generate valid broker message for tests.

:returns: broker message.
"""
return BrokerMessage(
task_id=uuid.uuid4().hex,
task_name=uuid.uuid4().hex,
message="my_msg",
labels={
"label1": "val1",
},
)


@pytest.mark.anyio
async def test_pub_sub_broker(
valid_broker_message: BrokerMessage,
redis_url: str,
) -> None:
"""
Test that messages are published and read correctly by PubSubBroker.

We create two workers that listen and send a message to them.
Expect both workers to receive the same message we sent.
"""
broker = PubSubBroker(url=redis_url, queue_name=uuid.uuid4().hex)
worker1_task = asyncio.create_task(get_message(broker))
worker2_task = asyncio.create_task(get_message(broker))
await asyncio.sleep(0.3)

await broker.kick(valid_broker_message)
await asyncio.sleep(0.3)

message1 = worker1_task.result()
message2 = worker2_task.result()
assert message1 == valid_broker_message
assert message1 == message2


@pytest.mark.anyio
async def test_list_queue_broker(
valid_broker_message: BrokerMessage,
redis_url: str,
) -> None:
"""
Test that messages are published and read correctly by ListQueueBroker.

We create two workers that listen and send a message to them.
Expect only one worker to receive the same message we sent.
"""
broker = ListQueueBroker(url=redis_url, queue_name=uuid.uuid4().hex)
worker1_task = asyncio.create_task(get_message(broker))
worker2_task = asyncio.create_task(get_message(broker))
await asyncio.sleep(0.3)

await broker.kick(valid_broker_message)
await asyncio.sleep(0.3)

assert worker1_task.done() != worker2_task.done()
message = worker1_task.result() if worker1_task.done() else worker2_task.result()
assert message == valid_broker_message