diff --git a/README.md b/README.md index 98e0029..0fae617 100644 --- a/README.md +++ b/README.md @@ -11,8 +11,97 @@ This library is in early stages of development. It is meant to be used with Rabb ## Getting Started -An example is provide in ./getting_started_main.py you can run it after starting a RabbitMQ 4.0 broker with: +An example is provided in ./getting_started_main.py you can run it after starting a RabbitMQ 4.0 broker with: poetry run python ./examples/getting_started/main.py +### Creating a connection + +A connection to the RabbitMQ AMQP 1.0 server can be established using the Connection object. + +For example: + +```python + connection = Connection("amqp://guest:guest@localhost:5672/") + connection.dial() +``` + +### Managing resources + +Once we have a Connection object we can get a Management object in order to submit to the server management operations +(es: declare/delete queues and exchanges, purging queues, binding/unbinding objects ecc...) + +For example (this code is declaring an exchange and a queue: + +```python + management = connection.management() + + print("declaring exchange and queue") + management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={})) + + management.declare_queue( + QuorumQueueSpecification(name=queue_name) + ) +``` + +### Publishing messages + +Once we have a Connection object we can get a Publisher object in order to send messages to the server (to an exchange or queue) + +For example: + +```python + addr_queue = AddressHelper.queue_address(queue_name) + publisher = connection.publisher(addr) + + # publish messages + for i in range(messages_to_publish): + publisher.publish(Message(body="test")) + + publisher.close() +``` + +### Consuming messages + +Once we have a Connection object we can get a Consumer object in order to consumer messages from the server (queue). + +Messages are received through a callback + +For example: + +Create a class which extends AMQPMessagingHandler which defines at minimum the on_consumer method, that will receive the +messages consumed: + +```python +class MyMessageHandler(AMQPMessagingHandler): + + def __init__(self): + super().__init__() + self._count = 0 + + def on_message(self, event: Event): + print("received message: " + str(event.message.body)) + + # accepting + self.delivery_context.accept(event) +``` + +Then from connection get a consumer object: + +```python + addr_queue = AddressHelper.queue_address(queue_name) + consumer = connection.consumer(addr_queue, handler=MyMessageHandler()) + + try: + consumer.run() + except KeyboardInterrupt: + pass + + consumer.close() +``` + +The consumer will run indefinitively waiting for messages to arrive. + + + diff --git a/examples/getting_started/main.py b/examples/getting_started/main.py index df51e3c..0bb4398 100644 --- a/examples/getting_started/main.py +++ b/examples/getting_started/main.py @@ -20,7 +20,7 @@ def __init__(self): self._count = 0 def on_message(self, event: Event): - print("received message: " + str(event.message.annotations)) + print("received message: " + str(event.message.body)) # accepting self.delivery_context.accept(event) diff --git a/pyproject.toml b/pyproject.toml index 13df5ed..7a68560 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "rabbitmq-amqp-python-client" -version = "0.1.0" +version = "0.1.0-alpha.0" description = "Python RabbitMQ client for AMQP 1.0 protocol" authors = ["RabbitMQ team"] license = "Apache-2.0 license" diff --git a/rabbitmq_amqp_python_client/address_helper.py b/rabbitmq_amqp_python_client/address_helper.py index 9abba3e..1721f19 100644 --- a/rabbitmq_amqp_python_client/address_helper.py +++ b/rabbitmq_amqp_python_client/address_helper.py @@ -72,3 +72,9 @@ def binding_path_with_exchange_queue( + ";args=" ) return binding_path_wth_exchange_queue_key + + +def validate_address(address: str) -> bool: + if address.startswith("/queues") or address.startswith("/exchanges"): + return True + return False diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index 45589dd..7c63883 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -1,7 +1,9 @@ import logging from typing import Optional +from .address_helper import validate_address from .consumer import Consumer +from .exceptions import ArgumentOutOfRangeException from .management import Management from .publisher import Publisher from .qpid.proton._handlers import MessagingHandler @@ -35,11 +37,19 @@ def close(self) -> None: self._conn.close() def publisher(self, destination: str) -> Publisher: + if validate_address(destination) is False: + raise ArgumentOutOfRangeException( + "destination address must start with /queues or /exchanges" + ) publisher = Publisher(self._conn, destination) return publisher def consumer( self, destination: str, handler: Optional[MessagingHandler] = None ) -> Consumer: + if validate_address(destination) is False: + raise ArgumentOutOfRangeException( + "destination address must start with /queues or /exchanges" + ) consumer = Consumer(self._conn, destination, handler) return consumer diff --git a/rabbitmq_amqp_python_client/consumer.py b/rabbitmq_amqp_python_client/consumer.py index 3ea9b94..18b2d71 100644 --- a/rabbitmq_amqp_python_client/consumer.py +++ b/rabbitmq_amqp_python_client/consumer.py @@ -40,15 +40,18 @@ def close(self) -> None: self._receiver.close() def run(self) -> None: + logger.debug("Running the consumer: starting to consume") if self._receiver is not None: self._receiver.container.run() def stop(self) -> None: + logger.debug("Stopping the consumer: starting to consume") if self._receiver is not None: self._receiver.container.stop_events() self._receiver.container.stop() def _create_receiver(self, addr: str) -> BlockingReceiver: + logger.debug("Creating the receiver") return self._conn.create_receiver( addr, options=ReceiverOptionUnsettled(addr), handler=self._handler ) diff --git a/rabbitmq_amqp_python_client/utils.py b/rabbitmq_amqp_python_client/utils.py index 1804eb3..cbc1a35 100644 --- a/rabbitmq_amqp_python_client/utils.py +++ b/rabbitmq_amqp_python_client/utils.py @@ -1,7 +1,7 @@ def validate_annotations(annotations: []) -> bool: # type: ignore validated = True for annotation in annotations: - if len(annotation) > 0 and annotation[:2] == "x-": + if annotation.startswith("x-"): pass else: validated = False diff --git a/tests/test_consumer.py b/tests/test_consumer.py index ff276a7..683c437 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -53,6 +53,24 @@ def test_consumer_sync_queue_accept(connection: Connection) -> None: assert consumed > 0 +def test_consumer_invalid_destination(connection: Connection) -> None: + + queue_name = "test-queue-sync-invalid-accept" + raised = False + consumer = None + try: + consumer = connection.consumer("/invalid-destination/" + queue_name) + except ArgumentOutOfRangeException: + raised = True + except Exception: + raised = False + + if consumer is not None: + consumer.close() + + assert raised is True + + def test_consumer_async_queue_accept(connection: Connection) -> None: messages_to_send = 1000 diff --git a/tests/test_publisher.py b/tests/test_publisher.py index 0cc2a57..57d7196 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -1,5 +1,6 @@ from rabbitmq_amqp_python_client import ( AddressHelper, + ArgumentOutOfRangeException, BindingSpecification, Connection, ExchangeSpecification, @@ -31,6 +32,27 @@ def test_publish_queue(connection: Connection) -> None: assert raised is False +def test_publish_to_invalid_destination(connection: Connection) -> None: + + queue_name = "test-queue" + + raised = False + + publisher = None + try: + publisher = connection.publisher("/invalid-destination/" + queue_name) + publisher.publish(Message(body="test")) + except ArgumentOutOfRangeException: + raised = True + except Exception: + raised = False + + if publisher is not None: + publisher.close() + + assert raised is True + + def test_publish_exchange(connection: Connection) -> None: exchange_name = "test-exchange"