Skip to content

Commit 1c9c234

Browse files
author
DanielePalaia
committed
preparing for first release
1 parent 80f9f93 commit 1c9c234

File tree

4 files changed

+93
-3
lines changed

4 files changed

+93
-3
lines changed

README.md

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,95 @@ This library is in early stages of development. It is meant to be used with Rabb
1111

1212
## Getting Started
1313

14-
An example is provide in ./getting_started_main.py you can run it after starting a RabbitMQ 4.0 broker with:
14+
An example is provided in ./getting_started_main.py you can run it after starting a RabbitMQ 4.0 broker with:
1515

1616
poetry run python ./examples/getting_started/main.py
1717

18+
### Creating a connection
19+
20+
A connection to the RabbitMQ AMQP 1.0 server can be established using the Connection object.
21+
22+
For example:
23+
24+
```python
25+
connection = Connection("amqp://guest:guest@localhost:5672/")
26+
connection.dial()
27+
```
28+
29+
### Managing resources
30+
31+
Once we have a Connection object we can get a Management object in order to submit to the server management operations
32+
(es: declare/delete queues and exchanges, purging queues, binding/unbinding objects ecc...)
33+
34+
For example (this code is declaring an exchange and a queue:
35+
36+
```python
37+
management = connection.management()
38+
39+
print("declaring exchange and queue")
40+
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
41+
42+
management.declare_queue(
43+
QuorumQueueSpecification(name=queue_name)
44+
)
45+
```
46+
47+
### Publishing messages
48+
49+
Once we have a Connection object we can get a Publisher object in order to send messages to the server (to an exchange or queue)
50+
51+
For example:
52+
53+
```python
54+
addr_queue = AddressHelper.queue_address(queue_name)
55+
publisher = connection.publisher(addr)
56+
57+
# publish messages
58+
for i in range(messages_to_publish):
59+
publisher.publish(Message(body="test"))
60+
61+
publisher.close()
62+
```
63+
64+
### Consuming messages
65+
66+
Once we have a Connection object we can get a Consumer object in order to consumer messages from the server (queue).
67+
68+
Messages are received through a callback
69+
70+
For example:
71+
72+
Create a class which extends AMQPMessagingHandler which defines at minimum the on_consumer method, that will receive the
73+
messages consumed:
74+
75+
```python
76+
class MyMessageHandler(AMQPMessagingHandler):
77+
78+
def __init__(self):
79+
super().__init__()
80+
self._count = 0
81+
82+
def on_message(self, event: Event):
83+
print("received message: " + str(event.message.body))
84+
85+
# accepting
86+
self.delivery_context.accept(event)
87+
```
88+
89+
Then from connection get a consumer object:
90+
91+
```python
92+
addr_queue = AddressHelper.queue_address(queue_name)
93+
consumer = connection.consumer(addr_queue, handler=MyMessageHandler())
94+
95+
try:
96+
consumer.run()
97+
except KeyboardInterrupt:
98+
pass
99+
```
100+
101+
The consumer will run indefinitively waiting for messages to arrive.
102+
103+
104+
18105

examples/getting_started/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ def __init__(self):
2020
self._count = 0
2121

2222
def on_message(self, event: Event):
23-
print("received message: " + str(event.message.annotations))
23+
print("received message: " + str(event.message.body))
2424

2525
# accepting
2626
self.delivery_context.accept(event)

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "rabbitmq-amqp-python-client"
3-
version = "0.1.0"
3+
version = "0.1.0-alpha.0"
44
description = "Python RabbitMQ client for AMQP 1.0 protocol"
55
authors = ["RabbitMQ team"]
66
license = "Apache-2.0 license"

rabbitmq_amqp_python_client/consumer.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,18 @@ def close(self) -> None:
4040
self._receiver.close()
4141

4242
def run(self) -> None:
43+
logger.debug("Running the consumer: starting to consume")
4344
if self._receiver is not None:
4445
self._receiver.container.run()
4546

4647
def stop(self) -> None:
48+
logger.debug("Stopping the consumer: starting to consume")
4749
if self._receiver is not None:
4850
self._receiver.container.stop_events()
4951
self._receiver.container.stop()
5052

5153
def _create_receiver(self, addr: str) -> BlockingReceiver:
54+
logger.debug("Creating the receiver")
5255
return self._conn.create_receiver(
5356
addr, options=ReceiverOptionUnsettled(addr), handler=self._handler
5457
)

0 commit comments

Comments
 (0)