Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/streams/example_with_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
Event,
Message,
OffsetSpecification,
StreamOptions,
StreamConsumerOptions,
StreamSpecification,
)

Expand Down Expand Up @@ -104,8 +104,8 @@ def main() -> None:
message_handler=MyMessageHandler(),
# can be first, last, next or an offset long
# you can also specify stream filters with methods: apply_filters and filter_match_unfiltered
stream_filter_options=StreamOptions(
offset_specification=OffsetSpecification.first, filters=["banana"]
stream_consumer_options=StreamConsumerOptions(
offset_specification=OffsetSpecification.first
),
)
print(
Expand Down
4 changes: 2 additions & 2 deletions rabbitmq_amqp_python_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
OAuth2Options,
OffsetSpecification,
RecoveryConfiguration,
StreamOptions,
StreamConsumerOptions,
)
from .environment import Environment
from .exceptions import (
Expand Down Expand Up @@ -85,7 +85,7 @@
"CurrentUserStore",
"PKCS12Store",
"ConnectionClosed",
"StreamOptions",
"StreamConsumerOptions",
"OffsetSpecification",
"OutcomeState",
"Environment",
Expand Down
12 changes: 6 additions & 6 deletions rabbitmq_amqp_python_client/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from .entities import (
OAuth2Options,
RecoveryConfiguration,
StreamOptions,
StreamConsumerOptions,
)
from .exceptions import (
ArgumentOutOfRangeException,
Expand Down Expand Up @@ -363,7 +363,7 @@ def publisher(self, destination: str = "") -> Publisher:
ArgumentOutOfRangeException: If destination address format is invalid
"""
if destination != "":
if validate_address(destination) is False:
if not validate_address(destination):
raise ArgumentOutOfRangeException(
"destination address must start with /queues or /exchanges"
)
Expand All @@ -376,7 +376,7 @@ def consumer(
self,
destination: str,
message_handler: Optional[MessagingHandler] = None,
stream_filter_options: Optional[StreamOptions] = None,
stream_consumer_options: Optional[StreamConsumerOptions] = None,
credit: Optional[int] = None,
) -> Consumer:
"""
Expand All @@ -385,7 +385,7 @@ def consumer(
Args:
destination: The address to consume from
message_handler: Optional handler for processing messages
stream_filter_options: Optional configuration for stream consumption
stream_consumer_options: Optional configuration for stream consumption
credit: Optional credit value for flow control

Returns:
Expand All @@ -394,12 +394,12 @@ def consumer(
Raises:
ArgumentOutOfRangeException: If destination address format is invalid
"""
if validate_address(destination) is False:
if not validate_address(destination):
raise ArgumentOutOfRangeException(
"destination address must start with /queues or /exchanges"
)
consumer = Consumer(
self._conn, destination, message_handler, stream_filter_options, credit
self._conn, destination, message_handler, stream_consumer_options, credit
)
self._consumers.append(consumer)
return consumer
Expand Down
6 changes: 3 additions & 3 deletions rabbitmq_amqp_python_client/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Literal, Optional, Union, cast

from .amqp_consumer_handler import AMQPMessagingHandler
from .entities import StreamOptions
from .entities import StreamConsumerOptions
from .options import (
ReceiverOptionUnsettled,
ReceiverOptionUnsettledWithFilters,
Expand All @@ -29,7 +29,7 @@ class Consumer:
_conn (BlockingConnection): The underlying connection to RabbitMQ
_addr (str): The address to consume from
_handler (Optional[MessagingHandler]): Optional message handling callback
_stream_options (Optional[StreamOptions]): Configuration for stream consumption
_stream_options (Optional[StreamConsumerOptions]): Configuration for stream consumption
_credit (Optional[int]): Flow control credit value
"""

Expand All @@ -38,7 +38,7 @@ def __init__(
conn: BlockingConnection,
addr: str,
handler: Optional[AMQPMessagingHandler] = None,
stream_options: Optional[StreamOptions] = None,
stream_options: Optional[StreamConsumerOptions] = None,
credit: Optional[int] = None,
):
"""
Expand Down
52 changes: 42 additions & 10 deletions rabbitmq_amqp_python_client/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,32 @@ class ExchangeToExchangeBindingSpecification:
binding_key: Optional[str] = None


class StreamOptions:
"""
StreamFilterOptions defines the filtering options for a stream consumer.
for values and match_unfiltered see: https://www.rabbitmq.com/blog/2023/10/16/stream-filtering
"""


class StreamFilterOptions:
values: Optional[list[str]] = None
match_unfiltered: bool = False
application_properties: Optional[dict[str, Any]] = None
sql: str = ""

def __init__(
self,
values: Optional[list[str]] = None,
match_unfiltered: bool = False,
application_properties: Optional[dict[str, Any]] = None,
sql: str = "",
):
self.values = values
self.match_unfiltered = match_unfiltered
self.application_properties = application_properties
self.sql = sql


class StreamConsumerOptions:
"""
Configuration options for stream queues.

Expand All @@ -161,29 +186,36 @@ class StreamOptions:
Args:
offset_specification: Either an OffsetSpecification enum value or
an integer offset
filters: List of filter strings to apply to the stream
filter_options: Filter options for the stream consumer. See StreamFilterOptions
"""

def __init__(
self,
offset_specification: Optional[Union[OffsetSpecification, int]] = None,
filters: Optional[list[str]] = None,
filter_match_unfiltered: bool = False,
filter_options: Optional[StreamFilterOptions] = None,
):

if offset_specification is None and filters is None:
self.streamFilterOptions = filter_options

if offset_specification is None and self.streamFilterOptions is None:
raise ValidationCodeException(
"At least one between offset_specification and filters must be set when setting up filtering"
)
self._filter_set: Dict[symbol, Described] = {}
if offset_specification is not None:
self._offset(offset_specification)

if filters is not None:
self._filter_values(filters)

if filter_match_unfiltered is True:
self._filter_match_unfiltered(filter_match_unfiltered)
if (
self.streamFilterOptions is not None
and self.streamFilterOptions.values is not None
):
self._filter_values(self.streamFilterOptions.values)

if (
self.streamFilterOptions is not None
and self.streamFilterOptions.match_unfiltered
):
self._filter_match_unfiltered(self.streamFilterOptions.match_unfiltered)

def _offset(self, offset_specification: Union[OffsetSpecification, int]) -> None:
"""
Expand Down
4 changes: 2 additions & 2 deletions rabbitmq_amqp_python_client/options.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .entities import StreamOptions
from .entities import StreamConsumerOptions
from .qpid.proton._data import ( # noqa: E402
PropertyDict,
symbol,
Expand Down Expand Up @@ -68,7 +68,7 @@ def test(self, link: Link) -> bool:


class ReceiverOptionUnsettledWithFilters(Filter): # type: ignore
def __init__(self, addr: str, filter_options: StreamOptions):
def __init__(self, addr: str, filter_options: StreamConsumerOptions):
super().__init__(filter_options.filter_set())
self._addr = addr

Expand Down
21 changes: 12 additions & 9 deletions rabbitmq_amqp_python_client/qpid/proton/_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def __init__(
self._msg = pn_message()
self.instructions = None
self.annotations = None
self.properties = None
self.application_properties = None
self.body = body
self.inferred = inferred

Expand Down Expand Up @@ -149,7 +149,7 @@ def _check_property_keys(self) -> None:
# We cannot make changes to the dict while iterating, so we
# must save and make the changes afterwards
changed_keys = []
for k in self.properties.keys():
for k in self.application_properties.keys():
if isinstance(k, str):
# strings and their subclasses
if type(k) is symbol or type(k) is char:
Expand All @@ -169,9 +169,12 @@ def _check_property_keys(self) -> None:
)
# Make the key changes
for old_key, new_key in changed_keys:
self.properties[new_key] = self.properties.pop(old_key)
self.application_properties[new_key] = self.application_properties.pop(
old_key
)

def _pre_encode(self) -> None:

inst = Data(pn_message_instructions(self._msg))
ann = Data(pn_message_annotations(self._msg))
props = Data(pn_message_properties(self._msg))
Expand All @@ -184,9 +187,9 @@ def _pre_encode(self) -> None:
if self.annotations is not None:
ann.put_object(self.annotations)
props.clear()
if self.properties is not None:
if self.application_properties is not None:
self._check_property_keys()
props.put_object(self.properties)
props.put_object(self.application_properties)
body.clear()
if self.body is not None:
body.put_object(self.body)
Expand All @@ -206,9 +209,9 @@ def _post_decode(self) -> None:
else:
self.annotations = None
if props.next():
self.properties = props.get_object()
self.application_properties = props.get_object()
else:
self.properties = None
self.application_properties = None
if body.next():
self.body = body.get_object()
else:
Expand All @@ -222,7 +225,7 @@ def clear(self) -> None:
pn_message_clear(self._msg)
self.instructions = None
self.annotations = None
self.properties = None
self.application_properties = None
self.body = None

@property
Expand Down Expand Up @@ -641,7 +644,7 @@ def __repr__(self) -> str:
"reply_to_group_id",
"instructions",
"annotations",
"properties",
"application_properties",
"body",
):
value = getattr(self, attr)
Expand Down
Loading