From 3c6685d952271c4ff858a11f7514e2bb13b3cd5b Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 12 Sep 2025 14:59:27 +0200 Subject: [PATCH 1/5] implement application properties filter complete: https://github.com/rabbitmq/rabbitmq-amqp-python-client/issues/42 Signed-off-by: Gabriele Santomaggio --- .../example_streams_with_filters.py | 10 +++++++++- rabbitmq_amqp_python_client/entities.py | 19 +++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/examples/streams_with_filters/example_streams_with_filters.py b/examples/streams_with_filters/example_streams_with_filters.py index e95982d..e9721ac 100644 --- a/examples/streams_with_filters/example_streams_with_filters.py +++ b/examples/streams_with_filters/example_streams_with_filters.py @@ -29,11 +29,13 @@ def __init__(self): def on_amqp_message(self, event: Event): # only messages with banana filters and with subject yellow + # and application property from = italy get received self._count = self._count + 1 logger.info( - "Received message: {}, subject {}.[Total Consumed: {}]".format( + "Received message: {}, subject {} application properties {} .[Total Consumed: {}]".format( Converter.bytes_to_string(event.message.body), event.message.subject, + event.message.application_properties, self._count, ) ) @@ -88,6 +90,7 @@ def main() -> None: addr_queue, message_handler=MyMessageHandler(), # the consumer will only receive messages with filter value banana and subject yellow + # and application property from = italy stream_consumer_options=StreamConsumerOptions( offset_specification=OffsetSpecification.first, filter_options=StreamFilterOptions( @@ -95,6 +98,7 @@ def main() -> None: message_properties=MessageProperties( subject="yellow", ), + application_properties={"from": "italy"} ), ), ) @@ -108,11 +112,13 @@ def main() -> None: # publish with a filter of apple for i in range(MESSAGES_TO_PUBLISH): color = "green" if i % 2 == 0 else "yellow" + from_value = "italy" if i % 3 == 0 else "spain" publisher.publish( Message( Converter.string_to_bytes(body="apple: " + str(i)), annotations={"x-stream-filter-value": "apple"}, subject=color, + application_properties={"from": from_value}, ) ) @@ -121,11 +127,13 @@ def main() -> None: # publish with a filter of banana for i in range(MESSAGES_TO_PUBLISH): color = "green" if i % 2 == 0 else "yellow" + from_value = "italy" if i % 3 == 0 else "spain" publisher.publish( Message( body=Converter.string_to_bytes("banana: " + str(i)), annotations={"x-stream-filter-value": "banana"}, subject=color, + application_properties={"from": from_value}, ) ) diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index db6d13f..3e8b5a7 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -11,6 +11,7 @@ STREAM_OFFSET_SPEC = "rabbitmq:stream-offset-spec" STREAM_FILTER_MATCH_UNFILTERED = "rabbitmq:stream-match-unfiltered" AMQP_PROPERTIES_FILTER = "amqp:properties-filter" +AMQP_APPLICATION_PROPERTIES_FILTER = "amqp:application-properties-filter" @dataclass @@ -252,6 +253,11 @@ def __init__( if filter_options is not None and filter_options.message_properties is not None: self._filter_message_properties(filter_options.message_properties) + if ( + filter_options is not None + and filter_options.application_properties is not None + ): + self._filter_application_properties(filter_options.application_properties) def _offset(self, offset_specification: Union[OffsetSpecification, int]) -> None: """ @@ -316,6 +322,19 @@ def _filter_message_properties( symbol(AMQP_PROPERTIES_FILTER), filter_prop ) + def _filter_application_properties( + self, application_properties: Optional[dict[str, Any]] + ) -> None: + app_prop = {} + if application_properties is not None: + for key, value in application_properties.items(): + app_prop[key] = value + + if len(app_prop) > 0: + self._filter_set[symbol(AMQP_APPLICATION_PROPERTIES_FILTER)] = ( + Described(symbol(AMQP_APPLICATION_PROPERTIES_FILTER), app_prop) + ) + def filter_set(self) -> Dict[symbol, Described]: """ Get the current filter set configuration. From 9fb4b41e83ca2efc1941edc0dc4146bacfc8617c Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 12 Sep 2025 15:02:28 +0200 Subject: [PATCH 2/5] formatting Signed-off-by: Gabriele Santomaggio --- examples/streams_with_filters/example_streams_with_filters.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/streams_with_filters/example_streams_with_filters.py b/examples/streams_with_filters/example_streams_with_filters.py index e9721ac..9f5c2dc 100644 --- a/examples/streams_with_filters/example_streams_with_filters.py +++ b/examples/streams_with_filters/example_streams_with_filters.py @@ -98,7 +98,7 @@ def main() -> None: message_properties=MessageProperties( subject="yellow", ), - application_properties={"from": "italy"} + application_properties={"from": "italy"}, ), ), ) From 3ddc58c3ebff2feb1ad994cb2eabfd8efc4c1010 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 12 Sep 2025 15:20:08 +0200 Subject: [PATCH 3/5] test Signed-off-by: Gabriele Santomaggio --- tests/test_streams.py | 140 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 140 insertions(+) diff --git a/tests/test_streams.py b/tests/test_streams.py index dce875c..ffddc56 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -1,3 +1,5 @@ +import time + from rabbitmq_amqp_python_client import ( AddressHelper, AMQPMessagingHandler, @@ -471,3 +473,141 @@ def test_stream_filter_message_properties( consumer.close() management.delete_queue(stream_name) + + +class MyMessageHandlerApplicationPropertiesFilter(AMQPMessagingHandler): + def __init__( + self, + ): + super().__init__() + + def on_message(self, event: Event): + self.delivery_context.accept(event) + assert event.message.application_properties == {"key": "value_17"} + raise ConsumerTestException("consumed") + + +def test_stream_filter_application_properties( + connection: Connection, environment: Environment +) -> None: + consumer = None + stream_name = "test_stream_application_message_properties" + messages_to_send = 30 + + queue_specification = StreamSpecification( + name=stream_name, + ) + management = connection.management() + management.declare_queue(queue_specification) + + addr_queue = AddressHelper.queue_address(stream_name) + + # consume and then publish + try: + connection_consumer = environment.connection() + connection_consumer.dial() + consumer = connection_consumer.consumer( + addr_queue, + message_handler=MyMessageHandlerApplicationPropertiesFilter(), + stream_consumer_options=StreamConsumerOptions( + filter_options=StreamFilterOptions( + application_properties={"key": "value_17"}, + ) + ), + ) + publisher = connection.publisher(addr_queue) + for i in range(messages_to_send): + msg = Message( + body=Converter.string_to_bytes("hello_{}".format(i)), + application_properties={"key": "value_{}".format(i)}, + ) + publisher.publish(msg) + + publisher.close() + + consumer.run() + # ack to terminate the consumer + except ConsumerTestException: + pass + + if consumer is not None: + consumer.close() + + management.delete_queue(stream_name) + + +class MyMessageHandlerMixingDifferentFilters(AMQPMessagingHandler): + def __init__( + self, + ): + super().__init__() + + def on_message(self, event: Event): + self.delivery_context.accept(event) + assert event.message.annotations == {"x-stream-filter-value": "my_value"} + assert event.message.application_properties == {"key": "value_17"} + assert event.message.subject == "important_15" + assert event.message.body == Converter.string_to_bytes("the_right_one") + raise ConsumerTestException("consumed") + + +def test_stream_filter_mixing_different( + connection: Connection, environment: Environment +) -> None: + consumer = None + stream_name = "test_stream_filter_mixing_different" + messages_to_send = 30 + + queue_specification = StreamSpecification( + name=stream_name, + ) + management = connection.management() + management.declare_queue(queue_specification) + + addr_queue = AddressHelper.queue_address(stream_name) + + # consume and then publish + try: + connection_consumer = environment.connection() + connection_consumer.dial() + consumer = connection_consumer.consumer( + addr_queue, + message_handler=MyMessageHandlerApplicationPropertiesFilter(), + stream_consumer_options=StreamConsumerOptions( + filter_options=StreamFilterOptions( + values=["my_value"], + application_properties={"key": "value_17"}, + message_properties=MessageProperties(subject="important_15"), + ) + ), + ) + publisher = connection.publisher(addr_queue) + # all these messages will be filtered out + for i in range(messages_to_send): + msg = Message( + body=Converter.string_to_bytes("hello_{}".format(i)), + ) + publisher.publish(msg) + + time.sleep( + 0.5 + ) # wait a bit to ensure messages are published in different chunks + msg = Message( + body=Converter.string_to_bytes("the_right_one"), + annotations={"x-stream-filter-value": "my_value"}, + application_properties={"key": "value_17"}, + subject="important_15", + ) + publisher.publish(msg) + + publisher.close() + + consumer.run() + # ack to terminate the consumer + except ConsumerTestException: + pass + + if consumer is not None: + consumer.close() + + management.delete_queue(stream_name) From 1f259496f11df2e70876d32110026394ce6e57b6 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 12 Sep 2025 15:59:29 +0200 Subject: [PATCH 4/5] Update tests/test_streams.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- tests/test_streams.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_streams.py b/tests/test_streams.py index ffddc56..260a166 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -572,7 +572,7 @@ def test_stream_filter_mixing_different( connection_consumer.dial() consumer = connection_consumer.consumer( addr_queue, - message_handler=MyMessageHandlerApplicationPropertiesFilter(), + message_handler=MyMessageHandlerMixingDifferentFilters(), stream_consumer_options=StreamConsumerOptions( filter_options=StreamFilterOptions( values=["my_value"], From 4d9c16d9a41bf918fcfe3ca04ade19dee67c2529 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 12 Sep 2025 16:23:13 +0200 Subject: [PATCH 5/5] fix test Signed-off-by: Gabriele Santomaggio --- tests/test_streams.py | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/tests/test_streams.py b/tests/test_streams.py index 260a166..a3b49f2 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -544,10 +544,10 @@ def __init__( def on_message(self, event: Event): self.delivery_context.accept(event) - assert event.message.annotations == {"x-stream-filter-value": "my_value"} - assert event.message.application_properties == {"key": "value_17"} - assert event.message.subject == "important_15" - assert event.message.body == Converter.string_to_bytes("the_right_one") + assert event.message.annotations["x-stream-filter-value"] == "the_value_filter" + assert event.message.application_properties == {"key": "app_value_9999"} + assert event.message.subject == "important_9999" + assert event.message.body == Converter.string_to_bytes("the_right_one_9999") raise ConsumerTestException("consumed") @@ -562,6 +562,7 @@ def test_stream_filter_mixing_different( name=stream_name, ) management = connection.management() + management.delete_queue(stream_name) management.declare_queue(queue_specification) addr_queue = AddressHelper.queue_address(stream_name) @@ -575,9 +576,9 @@ def test_stream_filter_mixing_different( message_handler=MyMessageHandlerMixingDifferentFilters(), stream_consumer_options=StreamConsumerOptions( filter_options=StreamFilterOptions( - values=["my_value"], - application_properties={"key": "value_17"}, - message_properties=MessageProperties(subject="important_15"), + values=["the_value_filter"], + application_properties={"key": "app_value_9999"}, + message_properties=MessageProperties(subject="important_9999"), ) ), ) @@ -589,14 +590,12 @@ def test_stream_filter_mixing_different( ) publisher.publish(msg) - time.sleep( - 0.5 - ) # wait a bit to ensure messages are published in different chunks + time.sleep(1) # wait a bit to ensure messages are published in different chunks msg = Message( - body=Converter.string_to_bytes("the_right_one"), - annotations={"x-stream-filter-value": "my_value"}, - application_properties={"key": "value_17"}, - subject="important_15", + body=Converter.string_to_bytes("the_right_one_9999"), + annotations={"x-stream-filter-value": "the_value_filter"}, + application_properties={"key": "app_value_9999"}, + subject="important_9999", ) publisher.publish(msg)