Skip to content

feat: add zmq transport protocol#90

Merged
viraatc merged 2 commits intomainfrom
feat/viraatc-zmq-transport
Jan 14, 2026
Merged

feat: add zmq transport protocol#90
viraatc merged 2 commits intomainfrom
feat/viraatc-zmq-transport

Conversation

@viraatc
Copy link
Copy Markdown
Collaborator

@viraatc viraatc commented Jan 9, 2026

What does this PR do?

MR includes refactors:

  • accumulator_protocol.py to make worker agnostic
  • separate worker_manager.py (extracted from worker.py)

MR Includes changes to ZMQ:

  • add new transport protocol used by worker, worker_manager, http_client
  • removed old pyzmq.asyncio based zmq_utils.py
  • new ZMQ Transport Protocol Implementation (minimal repro of aiozmq: https://github.com/aio-libs/aiozmq)
  • consolidated all tests and usages

New impl. ~= Old Impl. for msg-size 64kb+ (since ser/des overhead starts dominating).
Much faster (with tighter latencies) for smaller packet sizes.

new:

(.venv) $ pytest tests/performance/endpoint_client/transport/test_zmq.py -s --no-cov 2>&1 | grep "chars"
  Query            32 chars (  128 B): issue=  254,200 msg/s    32.5 MB/s, recv=  253,800 msg/s    32.5 MB/s
  QueryResult      32 chars (  113 B): issue=  269,600 msg/s    30.5 MB/s, recv=  269,200 msg/s    30.4 MB/s
  StreamChunk      32 chars (   96 B): issue=  322,400 msg/s    31.0 MB/s, recv=  322,000 msg/s    30.9 MB/s
  Query           128 chars (  224 B): issue=  299,400 msg/s    67.1 MB/s, recv=  299,000 msg/s    67.0 MB/s
  QueryResult     128 chars (  209 B): issue=  272,200 msg/s    56.9 MB/s, recv=  271,800 msg/s    56.8 MB/s
  StreamChunk     128 chars (  192 B): issue=  284,400 msg/s    54.6 MB/s, recv=  284,000 msg/s    54.5 MB/s
  Query           512 chars (  609 B): issue=  236,000 msg/s   143.7 MB/s, recv=  235,600 msg/s   143.5 MB/s
  QueryResult     512 chars (  594 B): issue=  257,600 msg/s   153.0 MB/s, recv=  257,200 msg/s   152.8 MB/s
  StreamChunk     512 chars (  577 B): issue=  284,000 msg/s   163.9 MB/s, recv=  283,600 msg/s   163.6 MB/s
  Query          1024 chars ( 1121 B): issue=  210,400 msg/s   235.9 MB/s, recv=  210,000 msg/s   235.4 MB/s
  QueryResult    1024 chars ( 1106 B): issue=  224,400 msg/s   248.2 MB/s, recv=  224,000 msg/s   247.7 MB/s
  StreamChunk    1024 chars ( 1089 B): issue=  244,800 msg/s   266.6 MB/s, recv=  244,400 msg/s   266.2 MB/s
  Query          4096 chars ( 4193 B): issue=  180,000 msg/s   754.7 MB/s, recv=  179,600 msg/s   753.1 MB/s
  QueryResult    4096 chars ( 4178 B): issue=  189,600 msg/s   792.1 MB/s, recv=  189,200 msg/s   790.5 MB/s
  StreamChunk    4096 chars ( 4161 B): issue=  203,400 msg/s   846.3 MB/s, recv=  203,057 msg/s   844.9 MB/s
  Query         16384 chars (16481 B): issue=  101,000 msg/s  1664.6 MB/s, recv=  100,600 msg/s  1658.0 MB/s
  QueryResult   16384 chars (16466 B): issue=  100,400 msg/s  1653.2 MB/s, recv=  100,000 msg/s  1646.6 MB/s
  StreamChunk   16384 chars (16449 B): issue=  100,000 msg/s  1644.9 MB/s, recv=   99,600 msg/s  1638.3 MB/s
  Query         32768 chars (32865 B): issue=   55,000 msg/s  1807.6 MB/s, recv=   54,600 msg/s  1794.4 MB/s
  QueryResult   32768 chars (32850 B): issue=   57,800 msg/s  1898.7 MB/s, recv=   57,400 msg/s  1885.6 MB/s
  StreamChunk   32768 chars (32833 B): issue=   61,000 msg/s  2002.8 MB/s, recv=   60,600 msg/s  1989.7 MB/s

old:

(.venv) $ pytest tests/performance/endpoint_client/transport/test_zmq_old_asyncio.py -s --no-cov 2>&1 | grep "chars"

  Query            32 chars (  128 B): issue=   56,760 msg/s     7.3 MB/s, recv=   56,744 msg/s     7.3 MB/s
  QueryResult      32 chars (  113 B): issue=   56,420 msg/s     6.4 MB/s, recv=   56,405 msg/s     6.4 MB/s
  StreamChunk      32 chars (   96 B): issue=   57,620 msg/s     5.5 MB/s, recv=   57,617 msg/s     5.5 MB/s
  Query           128 chars (  224 B): issue=   57,320 msg/s    12.8 MB/s, recv=   57,319 msg/s    12.8 MB/s
  QueryResult     128 chars (  209 B): issue=   56,460 msg/s    11.8 MB/s, recv=   56,427 msg/s    11.8 MB/s
  StreamChunk     128 chars (  192 B): issue=   53,980 msg/s    10.4 MB/s, recv=   53,974 msg/s    10.4 MB/s
  Query           512 chars (  609 B): issue=   56,020 msg/s    34.1 MB/s, recv=   55,983 msg/s    34.1 MB/s
  QueryResult     512 chars (  594 B): issue=   55,020 msg/s    32.7 MB/s, recv=   54,995 msg/s    32.7 MB/s
  StreamChunk     512 chars (  577 B): issue=   57,020 msg/s    32.9 MB/s, recv=   57,017 msg/s    32.9 MB/s
  Query          1024 chars ( 1121 B): issue=   53,820 msg/s    60.3 MB/s, recv=   53,818 msg/s    60.3 MB/s
  QueryResult    1024 chars ( 1106 B): issue=   54,340 msg/s    60.1 MB/s, recv=   54,332 msg/s    60.1 MB/s
  StreamChunk    1024 chars ( 1089 B): issue=   55,580 msg/s    60.5 MB/s, recv=   55,567 msg/s    60.5 MB/s
  Query          4096 chars ( 4193 B): issue=   53,100 msg/s   222.6 MB/s, recv=   53,075 msg/s   222.5 MB/s
  QueryResult    4096 chars ( 4178 B): issue=   52,980 msg/s   221.4 MB/s, recv=   52,940 msg/s   221.2 MB/s
  StreamChunk    4096 chars ( 4161 B): issue=   55,020 msg/s   228.9 MB/s, recv=   54,994 msg/s   228.8 MB/s
  Query         16384 chars (16481 B): issue=   44,380 msg/s   731.4 MB/s, recv=   44,340 msg/s   730.8 MB/s
  QueryResult   16384 chars (16466 B): issue=   43,580 msg/s   717.6 MB/s, recv=   43,500 msg/s   716.3 MB/s
  StreamChunk   16384 chars (16449 B): issue=   41,420 msg/s   681.3 MB/s, recv=   41,380 msg/s   680.7 MB/s
  Query         32768 chars (32865 B): issue=   35,700 msg/s  1173.3 MB/s, recv=   35,699 msg/s  1173.2 MB/s
  QueryResult   32768 chars (32850 B): issue=   31,800 msg/s  1044.6 MB/s, recv=   31,777 msg/s  1043.9 MB/s
  StreamChunk   32768 chars (32833 B): issue=   29,780 msg/s   977.8 MB/s, recv=   29,763 msg/s   977.2 MB/s

Type of change

  • Bug fix
  • New feature
  • Documentation update
  • Refactor/cleanup

Related issues

Testing

  • Tests added/updated
  • All tests pass locally
  • Manual testing completed

Checklist

  • Code follows project style
  • Pre-commit hooks pass
  • Documentation updated (if needed)

@viraatc viraatc requested a review from a team as a code owner January 9, 2026 01:30
Copilot AI review requested due to automatic review settings January 9, 2026 01:30
@github-actions
Copy link
Copy Markdown

github-actions bot commented Jan 9, 2026

MLCommons CLA bot All contributors have signed the MLCommons CLA ✍️ ✅

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a ZMQ transport protocol abstraction for worker pool inter-process communication. The changes refactor the existing ZMQ implementation into a clean protocol-based architecture that decouples transport mechanisms from worker logic.

Key Changes:

  • Introduced transport protocol abstractions (WorkerPoolTransport, WorkerConnector, ReceiverTransport, SenderTransport)
  • Implemented ZMQ transport with event-driven I/O using asyncio event loop integration
  • Moved SSE accumulator logic out of Worker into dedicated protocol and implementations
  • Removed ZMQConfig in favor of transport-specific configuration
  • Updated worker initialization to use connector pattern instead of direct ZMQ socket management

Reviewed changes

Copilot reviewed 27 out of 27 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
src/inference_endpoint/endpoint_client/transport/protocol.py Defines transport protocol interfaces for worker IPC abstraction
src/inference_endpoint/endpoint_client/transport/zmq.py ZMQ implementation with event-driven transport and automatic temp directory management
src/inference_endpoint/endpoint_client/transport/__init__.py Transport module exports and documentation
src/inference_endpoint/endpoint_client/worker.py Refactored to use transport protocols; removed ZMQ-specific code and accumulator classes
src/inference_endpoint/endpoint_client/worker_manager.py New dedicated worker manager using transport factory pattern
src/inference_endpoint/endpoint_client/configs.py Added accumulator and transport configuration; removed ZMQConfig
src/inference_endpoint/endpoint_client/accumulator_protocol.py Protocol definition for SSE accumulators
src/inference_endpoint/openai/accumulator.py OpenAI SSE accumulator implementation
src/inference_endpoint/sglang/accumulator.py SGLang SSE accumulator implementation
src/inference_endpoint/endpoint_client/http_client.py Updated to use worker manager and simplified initialization
tests/unit/endpoint_client/test_zmq_config.py Replaced ZMQConfig tests with ZmqWorkerPoolTransport factory tests
tests/integration/endpoint_client/test_zmq_communication.py Simplified integration tests using new transport layer
Multiple test files Updated to remove ZMQConfig usage and use new transport connector pattern

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@gemini-code-assist
Copy link
Copy Markdown

Summary of Changes

Hello @viraatc, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly refactors the internal architecture of the inference endpoint client by introducing a robust abstraction layer for inter-process communication (IPC) and Server-Sent Events (SSE) stream accumulation. This change decouples the core client and worker logic from specific implementation details of ZMQ and SSE handling, enhancing modularity, maintainability, and extensibility. The refactoring involves encapsulating ZMQ-specific logic into a new transport module, moving SSE accumulators to dedicated files, and streamlining the interfaces of the HTTPEndpointClient and worker processes.

Highlights

  • IPC Abstraction Layer: Introduced new transport protocols (ReceiverTransport, SenderTransport, WorkerConnector, WorkerPoolTransport) to abstract inter-process communication, making the client and worker modules independent of the specific IPC technology.
  • ZMQ Encapsulation: The ZeroMQ implementation is now fully encapsulated within src/inference_endpoint/endpoint_client/transport/zmq.py, replacing the previous zmq_utils.py and direct ZMQ usage in http_client and worker.
  • Worker Manager Refactor: The WorkerManager logic has been extracted into its own module (src/inference_endpoint/endpoint_client/worker_manager.py), improving modularity and responsibility separation.
  • SSE Accumulator Abstraction: Server-Sent Events (SSE) stream accumulators are now defined by a SSEAccumulatorProtocol and their implementations (OpenAI, SGLang) are moved to dedicated files (src/inference_endpoint/openai/accumulator.py, src/inference_endpoint/sglang/accumulator.py).
  • Simplified Client/Worker Interfaces: The HTTPEndpointClient and Worker classes have been simplified by removing direct ZMQConfig dependencies, relying instead on the new transport and accumulator abstractions.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant and well-executed refactoring of the endpoint client's inter-process communication. By abstracting the transport layer into a separate protocol-based component, the ZMQ-specific logic has been cleanly separated from the core client and worker implementation. This greatly improves modularity, maintainability, and testability. The introduction of SSEAccumulatorProtocol and moving accumulator logic to dedicated classes is also a welcome improvement.

My review includes a few suggestions to address potential bugs and improve code clarity, particularly in the probe command and one of the integration tests. Overall, this is a high-quality refactoring that strengthens the client's architecture.

Copilot AI review requested due to automatic review settings January 9, 2026 09:36
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 30 out of 30 changed files in this pull request and generated 9 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@viraatc viraatc force-pushed the feat/viraatc-zmq-transport branch from 2fe5c29 to 237fb7f Compare January 9, 2026 09:50
Copilot AI review requested due to automatic review settings January 9, 2026 09:51
@viraatc viraatc force-pushed the feat/viraatc-zmq-transport branch from 237fb7f to 5b5c50d Compare January 9, 2026 09:51
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 27 out of 27 changed files in this pull request and generated 7 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copilot AI review requested due to automatic review settings January 10, 2026 00:35
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 29 out of 29 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copilot AI review requested due to automatic review settings January 10, 2026 00:55
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 31 out of 31 changed files in this pull request and generated no new comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@viraatc viraatc force-pushed the feat/viraatc-zmq-transport branch 2 times, most recently from 59fc3e6 to 0ba3f97 Compare January 10, 2026 01:14
Copilot AI review requested due to automatic review settings January 12, 2026 07:02
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 32 out of 32 changed files in this pull request and generated no new comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@viraatc viraatc force-pushed the feat/viraatc-zmq-transport branch from ec1bc22 to e63e7d9 Compare January 12, 2026 21:06
Copilot AI review requested due to automatic review settings January 12, 2026 21:09
@viraatc viraatc force-pushed the feat/viraatc-zmq-transport branch from e63e7d9 to 3306e6c Compare January 12, 2026 21:09
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 34 out of 34 changed files in this pull request and generated no new comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@viraatc viraatc force-pushed the feat/viraatc-zmq-transport branch from 9b077b8 to e02a762 Compare January 12, 2026 22:28
Copilot AI review requested due to automatic review settings January 12, 2026 23:18
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 35 out of 35 changed files in this pull request and generated no new comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@viraatc viraatc force-pushed the feat/viraatc-zmq-transport branch from 0656ebf to cbe58d8 Compare January 12, 2026 23:19
Copy link
Copy Markdown
Collaborator

@arekay-nv arekay-nv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much nicer to read now. Thanks!

@viraatc viraatc mentioned this pull request Jan 13, 2026
5 tasks
Copilot AI review requested due to automatic review settings January 13, 2026 21:53
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 35 out of 35 changed files in this pull request and generated no new comments.

Comments suppressed due to low confidence (3)

tests/integration/endpoint_client/test_zmq_communication.py:1

  • The copyright year is 2026, which is in the future. This should be changed to 2025 or the actual year when the code was created.
    tests/unit/endpoint_client/test_zmq_config.py:1
  • The copyright year is 2026, which is in the future. This should be changed to 2025 or the actual year when the code was created.
    src/inference_endpoint/endpoint_client/zmq_utils.py:1
  • The copyright year is 2026, which is in the future. This should be changed to 2025 or the actual year when the code was created.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@viraatc viraatc merged commit ef89455 into main Jan 14, 2026
4 checks passed
@github-actions github-actions bot locked and limited conversation to collaborators Jan 14, 2026
@viraatc viraatc deleted the feat/viraatc-zmq-transport branch February 6, 2026 23:07
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants