-
Notifications
You must be signed in to change notification settings - Fork 12
/
test_message.py
31 lines (26 loc) · 956 Bytes
/
test_message.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from aio_pika import IncomingMessage
from aiormq.types import DeliveredMessage
from pamqp import ContentHeader
from pamqp.specification import Basic
from amqpdispatcher.message import Message
def test_incoming_message_to_message():
raw_message = IncomingMessage(
message=DeliveredMessage(
delivery=Basic.Deliver(
consumer_tag="ctag",
delivery_tag="dtag",
redelivered=True,
exchange="exc",
routing_key="rkey",
),
header=ContentHeader(),
body=b"",
channel=None,
)
)
message = Message(raw_message=raw_message)
assert message.delivery_info["consumer_tag"], "ctag"
assert message.delivery_info["delivery_tag"], "dtag"
assert message.delivery_info["redelivered"], True
assert message.delivery_info["exchange"], "exc"
assert message.delivery_info["routing_key"], "rkey"