Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
322 changes: 164 additions & 158 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ readme = "README.md"
python = "^3.9"
python-qpid-proton = "^0.39.0"
typing-extensions = "^4.13.0"
packaging = "^23.0"


[tool.poetry.group.dev.dependencies]
Expand Down
77 changes: 77 additions & 0 deletions rabbitmq_amqp_python_client/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
)

import typing_extensions
from packaging import version

from .address_helper import validate_address
from .consumer import Consumer
Expand Down Expand Up @@ -166,6 +167,82 @@ def _create_connection(self) -> None:
password=self._password,
)

self._validate_server_properties()

def _validate_server_properties(self) -> None:
"""
Validate the server properties returned in the connection handshake.

Checks that the server is RabbitMQ and the version is >= 4.0.0.

Raises:
ValidationCodeException: If server is not RabbitMQ or version < 4.0.0
"""
if self._conn is None or self._conn.conn is None:
raise ValidationCodeException("Connection not established")

remote_props = self._conn.conn.remote_properties
if remote_props is None:
raise ValidationCodeException("No remote properties received from server")

# Check if server is RabbitMQ
product = remote_props.get("product")
if product != "RabbitMQ":
raise ValidationCodeException(
f"Connection to non-RabbitMQ server detected. "
f"Expected 'RabbitMQ', got '{product}'"
)

# Check server version is >= 4.0.0
server_version = remote_props.get("version")
if server_version is None:
raise ValidationCodeException("Server version not provided")

try:
if version.parse(str(server_version)) < version.parse("4.0.0"):
raise ValidationCodeException(
f"The AMQP client library requires RabbitMQ 4.0.0 or higher. "
f"Server version: {server_version}"
)
except Exception as e:
raise ValidationCodeException(
f"Failed to parse server version '{server_version}': {e}"
)

logger.debug(f"Connected to RabbitMQ server version {server_version}")

def _is_server_version_gte_4_2_0(self) -> bool:
"""
Check if the server version is greater than or equal to 4.2.0.

This is an internal method that can be used to conditionally enable
features that require RabbitMQ 4.2.0 or higher.

Returns:
bool: True if server version >= 4.2.0, False otherwise

Raises:
ValidationCodeException: If connection is not established or
remote properties are not available
"""
if self._conn is None or self._conn.conn is None:
raise ValidationCodeException("Connection not established")

remote_props = self._conn.conn.remote_properties
if remote_props is None:
raise ValidationCodeException("No remote properties received from server")

server_version = remote_props.get("version")
if server_version is None:
raise ValidationCodeException("Server version not provided")

try:
return version.parse(str(server_version)) >= version.parse("4.2.0")
except Exception as e:
raise ValidationCodeException(
f"Failed to parse server version '{server_version}': {e}"
)

def dial(self) -> None:
"""
Establish a connection to the AMQP server.
Expand Down
2 changes: 1 addition & 1 deletion rabbitmq_amqp_python_client/qpid/proton/_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ def remote_desired_capabilities(self):
return c and SymbolList(c)

@property
def remote_properties(self):
def remote_properties(self) -> Optional[Data]:
"""
The properties specified by the remote peer for this connection.

Expand Down
5 changes: 5 additions & 0 deletions rabbitmq_amqp_python_client/qpid/proton/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,11 @@ def __init__(
lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT),
msg="Opening connection",
)
self.wait(
lambda: (self.conn.state & Endpoint.REMOTE_ACTIVE),
timeout=10,
msg="Connection opened",
)

except ConnectionException:
if self.conn is not None:
Expand Down
Loading