Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 90 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,97 @@ This library is in early stages of development. It is meant to be used with Rabb

## Getting Started

An example is provide in ./getting_started_main.py you can run it after starting a RabbitMQ 4.0 broker with:
An example is provided in ./getting_started_main.py you can run it after starting a RabbitMQ 4.0 broker with:

poetry run python ./examples/getting_started/main.py

### Creating a connection

A connection to the RabbitMQ AMQP 1.0 server can be established using the Connection object.

For example:

```python
connection = Connection("amqp://guest:guest@localhost:5672/")
connection.dial()
```

### Managing resources

Once we have a Connection object we can get a Management object in order to submit to the server management operations
(es: declare/delete queues and exchanges, purging queues, binding/unbinding objects ecc...)

For example (this code is declaring an exchange and a queue:

```python
management = connection.management()

print("declaring exchange and queue")
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))

management.declare_queue(
QuorumQueueSpecification(name=queue_name)
)
```

### Publishing messages

Once we have a Connection object we can get a Publisher object in order to send messages to the server (to an exchange or queue)

For example:

```python
addr_queue = AddressHelper.queue_address(queue_name)
publisher = connection.publisher(addr)

# publish messages
for i in range(messages_to_publish):
publisher.publish(Message(body="test"))

publisher.close()
```

### Consuming messages

Once we have a Connection object we can get a Consumer object in order to consumer messages from the server (queue).

Messages are received through a callback

For example:

Create a class which extends AMQPMessagingHandler which defines at minimum the on_consumer method, that will receive the
messages consumed:

```python
class MyMessageHandler(AMQPMessagingHandler):

def __init__(self):
super().__init__()
self._count = 0

def on_message(self, event: Event):
print("received message: " + str(event.message.body))

# accepting
self.delivery_context.accept(event)
```

Then from connection get a consumer object:

```python
addr_queue = AddressHelper.queue_address(queue_name)
consumer = connection.consumer(addr_queue, handler=MyMessageHandler())

try:
consumer.run()
except KeyboardInterrupt:
pass

consumer.close()
```

The consumer will run indefinitively waiting for messages to arrive.




2 changes: 1 addition & 1 deletion examples/getting_started/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def __init__(self):
self._count = 0

def on_message(self, event: Event):
print("received message: " + str(event.message.annotations))
print("received message: " + str(event.message.body))

# accepting
self.delivery_context.accept(event)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "rabbitmq-amqp-python-client"
version = "0.1.0"
version = "0.1.0-alpha.0"
description = "Python RabbitMQ client for AMQP 1.0 protocol"
authors = ["RabbitMQ team"]
license = "Apache-2.0 license"
Expand Down
6 changes: 6 additions & 0 deletions rabbitmq_amqp_python_client/address_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,9 @@ def binding_path_with_exchange_queue(
+ ";args="
)
return binding_path_wth_exchange_queue_key


def validate_address(address: str) -> bool:
if address.startswith("/queues") or address.startswith("/exchanges"):
return True
return False
10 changes: 10 additions & 0 deletions rabbitmq_amqp_python_client/connection.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import logging
from typing import Optional

from .address_helper import validate_address
from .consumer import Consumer
from .exceptions import ArgumentOutOfRangeException
from .management import Management
from .publisher import Publisher
from .qpid.proton._handlers import MessagingHandler
Expand Down Expand Up @@ -35,11 +37,19 @@ def close(self) -> None:
self._conn.close()

def publisher(self, destination: str) -> Publisher:
if validate_address(destination) is False:
raise ArgumentOutOfRangeException(
"destination address must start with /queues or /exchanges"
)
publisher = Publisher(self._conn, destination)
return publisher

def consumer(
self, destination: str, handler: Optional[MessagingHandler] = None
) -> Consumer:
if validate_address(destination) is False:
raise ArgumentOutOfRangeException(
"destination address must start with /queues or /exchanges"
)
consumer = Consumer(self._conn, destination, handler)
return consumer
3 changes: 3 additions & 0 deletions rabbitmq_amqp_python_client/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,18 @@ def close(self) -> None:
self._receiver.close()

def run(self) -> None:
logger.debug("Running the consumer: starting to consume")
if self._receiver is not None:
self._receiver.container.run()

def stop(self) -> None:
logger.debug("Stopping the consumer: starting to consume")
if self._receiver is not None:
self._receiver.container.stop_events()
self._receiver.container.stop()

def _create_receiver(self, addr: str) -> BlockingReceiver:
logger.debug("Creating the receiver")
return self._conn.create_receiver(
addr, options=ReceiverOptionUnsettled(addr), handler=self._handler
)
2 changes: 1 addition & 1 deletion rabbitmq_amqp_python_client/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
def validate_annotations(annotations: []) -> bool: # type: ignore
validated = True
for annotation in annotations:
if len(annotation) > 0 and annotation[:2] == "x-":
if annotation.startswith("x-"):
pass
else:
validated = False
Expand Down
18 changes: 18 additions & 0 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,24 @@ def test_consumer_sync_queue_accept(connection: Connection) -> None:
assert consumed > 0


def test_consumer_invalid_destination(connection: Connection) -> None:

queue_name = "test-queue-sync-invalid-accept"
raised = False
consumer = None
try:
consumer = connection.consumer("/invalid-destination/" + queue_name)
except ArgumentOutOfRangeException:
raised = True
except Exception:
raised = False

if consumer is not None:
consumer.close()

assert raised is True


def test_consumer_async_queue_accept(connection: Connection) -> None:

messages_to_send = 1000
Expand Down
22 changes: 22 additions & 0 deletions tests/test_publisher.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from rabbitmq_amqp_python_client import (
AddressHelper,
ArgumentOutOfRangeException,
BindingSpecification,
Connection,
ExchangeSpecification,
Expand Down Expand Up @@ -31,6 +32,27 @@ def test_publish_queue(connection: Connection) -> None:
assert raised is False


def test_publish_to_invalid_destination(connection: Connection) -> None:

queue_name = "test-queue"

raised = False

publisher = None
try:
publisher = connection.publisher("/invalid-destination/" + queue_name)
publisher.publish(Message(body="test"))
except ArgumentOutOfRangeException:
raised = True
except Exception:
raised = False

if publisher is not None:
publisher.close()

assert raised is True


def test_publish_exchange(connection: Connection) -> None:

exchange_name = "test-exchange"
Expand Down