Skip to content

feat: RabbitMQ Connector#133

Merged
chrisk314 merged 111 commits intomainfrom
feat/rabbitmq-connector
Jun 25, 2025
Merged

feat: RabbitMQ Connector#133
chrisk314 merged 111 commits intomainfrom
feat/rabbitmq-connector

Conversation

@chrisk314
Copy link
Contributor

@chrisk314 chrisk314 commented Apr 30, 2025

Summary

This PR introduces a RabbitMQConnector class for exchanging data between components via the RabbitMQ AMQP message broker. This connector will provide a robust messaging system for large scale distributed model runs.

Changes

  • Adds RabbitMQConnector and RabbitMQChannel classes for data exchange via RabbitMQ.
  • Adds aio-piko dependency for asynchronously interacting with RabbitMQ.
  • Refactors channel related tests splitting into unit tests and integration tests for Channels which require backing services such as RabbitMQ.
  • Sets timeout of 5 minutes in the Github Actions steps for unit and integration tests.
  • Pins Ray due to hanging issue in Ray 2.47.0 [Core] Ray 2.47 regression: All tasks hang when using uv ray-project/ray#53848

TODO

  • Namespace exchanges/queues in RabbitMQ by plugboard job id.
  • Cleanup of queues after model execution.
  • Try to reproduce the aio-pika basic get timeout sometimes not being respected and raise github issue

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @chrisk314, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

Summary of Changes

This pull request introduces a RabbitMQConnector class to facilitate data exchange between components using RabbitMQ, an AMQP message broker. This connector aims to provide a robust messaging system suitable for large-scale distributed model runs. The changes include adding the RabbitMQConnector and RabbitMQChannel classes, incorporating the aio-pika dependency for asynchronous RabbitMQ interactions, and refactoring channel-related tests into unit and integration tests.

Highlights

  • RabbitMQ Connector Implementation: Adds RabbitMQConnector and RabbitMQChannel classes for data exchange via RabbitMQ.
  • Dependency Addition: Introduces aio-pika as a dependency for asynchronous RabbitMQ interaction.
  • Test Refactoring: Refactors channel tests, separating them into unit and integration tests for channels requiring backing services like RabbitMQ.
  • Docker Compose Updates: Updates the compose.yaml file to include RabbitMQ service configuration, network settings, and volume definitions.
  • DI Container Configuration: Configures the dependency injection container to provide a RabbitMQ connection resource.

Changelog

Click here to see the changelog
  • compose.yaml
    • Adds a name: plugboard to the compose file.
    • Configures the RabbitMQ service, including port mappings (5672:5672, 15672:15672), volume mounting (rabbitmq-data:/var/lib/rabbitmq), network attachment (main), and environment variables for default user and password.
    • Updates volume and network names from plugboard-* to *.
    • Adds default user and password for RabbitMQ.
  • plugboard/connector/init.py
    • Imports and exposes RabbitMQChannel and RabbitMQConnector in the plugboard.connector module.
  • plugboard/connector/rabbitmq_channel.py
    • Introduces RabbitMQChannel and RabbitMQConnector classes for RabbitMQ integration.
    • Implements send and recv methods for message exchange.
    • Includes logic for declaring exchanges and queues.
    • Uses aio-pika library for asynchronous communication with RabbitMQ.
  • plugboard/utils/di.py
    • Adds aio_pika import.
    • Introduces a resource for managing the RabbitMQ connection within the dependency injection container.
    • Adds a function _rabbitmq_conn to create and manage a robust RabbitMQ connection.
  • plugboard/utils/settings.py
    • Adds typing import.
    • Introduces _RabbitMQSettings class to encapsulate RabbitMQ-specific settings, including the RabbitMQ URL.
    • Adds rabbitmq settings to the main Settings class.
  • pyproject.toml
    • Adds aio-pika as a project dependency.
  • tests/integration/test_channel.py
    • Creates integration tests for channels, including RabbitMQ and ZMQ.
    • Uses pytest_cases to parameterize tests with different connector types.
    • Includes fixtures for RabbitMQConnector and ZMQConnector.
  • tests/integration/test_connector_pubsub.py
    • Creates integration tests for pubsub mode connectors, including RabbitMQ and ZMQ.
    • Uses pytest_cases to parameterize tests with different connector types.
    • Includes tests for single and multiple publishers and topics.
  • tests/unit/test_channel.py
    • Adds typing and os imports.
    • Adds ZMQConnector import.
    • Adds fixtures for ZMQConnector and connector classes.
    • Refactors tests to use fixtures for connector classes.
    • Splits channel tests into unit and integration tests.
    • Removes connector close call.
  • tests/unit/test_connector_pubsub.py
    • Adds typing and os imports.
    • Adds ZMQConnector import.
    • Adds fixtures for ZMQConnector and connector classes.
    • Refactors tests to use fixtures for connector classes.
    • Splits channel tests into unit and integration tests.
  • uv.lock
    • Adds aio-pika and its dependencies to the uv.lock file.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.


A message queue's gentle hum,
Data flows, never feeling numb,
From sender to receiver,
A system believer,
Ensuring all tasks overcome.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This PR introduces a RabbitMQConnector class for exchanging data between components via RabbitMQ. The changes look good overall, and the inclusion of integration tests is appreciated. Here are a few suggestions for improvement.

Summary of Findings

  • TODO comment in RabbitMQChannel.recv: The TODO comment in RabbitMQChannel.recv indicates an issue with the timeout not being respected. This should be investigated and addressed, or at least have a more detailed explanation and a link to the issue on GitHub.
  • Asyncio sleep after connection: The asyncio.sleep calls after establishing connections in RabbitMQConnector.connect_send and RabbitMQConnector.connect_recv are potentially unreliable. A more robust mechanism for ensuring connections are established before sending messages should be considered.

Merge Readiness

The pull request introduces significant new functionality with the RabbitMQ connector. While the code appears well-structured and includes integration tests, the TODO comment regarding timeout issues and the potentially unreliable asyncio.sleep calls should be addressed before merging. I am unable to approve this pull request, and recommend that others review and approve this code before merging. I recommend that the pull request not be merged until the high severity issues are addressed.

@chrisk314 chrisk314 marked this pull request as draft April 30, 2025 16:52
@chrisk314 chrisk314 requested a review from Copilot April 30, 2025 16:59
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces a RabbitMQ connector implementation so that components can exchange messages via the RabbitMQ AMQP broker. Key changes include the addition of RabbitMQConnector and RabbitMQChannel classes with supporting dependency injection and settings updates, the integration of aio‑pika as a new dependency, and corresponding updates to tests and CI configurations.

Reviewed Changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated no comments.

Show a summary per file
File Description
tests/unit/test_connector_pubsub.py Updated parameterization and fixture usage for connector tests
tests/unit/test_channel.py Updated multiprocessing fixtures and renamed a test for clarity
tests/integration/test_connector_pubsub.py Integrated RabbitMQConnector into integration tests
tests/integration/test_channel.py Added RabbitMQ support in integration channel tests
pyproject.toml Added aio‑pika dependency
plugboard/utils/settings.py Added RabbitMQ settings via a new _RabbitMQSettings class
plugboard/utils/di.py Introduced a new async resource for RabbitMQ connection management
plugboard/connector/rabbitmq_channel.py Added RabbitMQChannel and RabbitMQConnector implementations
plugboard/connector/init.py Updated exports to include new RabbitMQ channel and connector
compose.yaml Updated service configuration to support RabbitMQ
.github/workflows/lint-test.yaml Updated CI workflow to start RabbitMQ and add appropriate environment
Comments suppressed due to low confidence (2)

plugboard/utils/di.py:46

  • The _rabbitmq_conn async generator yields a RobustConnection and then immediately closes it in the finally block, which may result in the connection being closed before the consumer finishes using it. Consider restructuring the connection lifecycle management (for example, using an async context manager) to ensure that the connection remains open during its intended usage.
async def _rabbitmq_conn(logger: Singleton[structlog.BoundLogger], url: _t.Optional[str] = None) -> _t.AsyncIterator[aio_pika.RobustConnection]:

tests/unit/test_connector_pubsub.py:261

  • [nitpick] The test function 'test_pubsub_channel_multiple_topics' calls the helper '_test_pubsub_channel_multiple_topics_and_publishers', which suggests a mismatch between the test name and its behavior. Consider renaming the test function to accurately reflect that it tests both multiple topics and multiple publishers.
async def test_pubsub_channel_multiple_topics(...):

@codecov
Copy link

codecov bot commented Jun 16, 2025

@chrisk314 chrisk314 marked this pull request as ready for review June 16, 2025 20:31
@chrisk314 chrisk314 requested a review from toby-coleman June 18, 2025 07:55
Copy link
Contributor

@toby-coleman toby-coleman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. Couple of small comments related to documentation.

@chrisk314 chrisk314 merged commit 6aca340 into main Jun 25, 2025
32 of 35 checks passed
@chrisk314 chrisk314 deleted the feat/rabbitmq-connector branch June 25, 2025 10:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants