[releases/2026/0][Legacy] Remove disconnected requests from execution queue (#3968)#3973
[releases/2026/0][Legacy] Remove disconnected requests from execution queue (#3968)#3973mzegla merged 2 commits intoreleases/2026/0from
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds request-level disconnection tracking to legacy LLM/VLM servables so disconnected clients’ queued requests can be skipped during execution.
Changes:
- Add a
clientDisconnectedflag to legacy execution contexts. - Register a client disconnection callback during scheduling to flip the flag.
- Update legacy executors to skip generation when the request is marked disconnected.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| src/llm/visual_language_model/legacy/servable.hpp | Add clientDisconnected flag to VLM legacy execution context. |
| src/llm/visual_language_model/legacy/servable.cpp | Register disconnection callback during VLM scheduling. |
| src/llm/visual_language_model/legacy/legacy_executor.cpp | Skip VLM generation when clientDisconnected is set. |
| src/llm/language_model/legacy/servable.hpp | Add clientDisconnected flag to LLM legacy execution context. |
| src/llm/language_model/legacy/servable.cpp | Register disconnection callback during LLM scheduling. |
| src/llm/language_model/legacy/legacy_executor.cpp | Skip LLM generation when clientDisconnected is set. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| std::weak_ptr<VisualLanguageModelLegacyServableExecutionContext> weakContext = legacyExecutionContext; | ||
| legacyExecutionContext->payload.client->registerDisconnectionCallback([weakContext]() { | ||
| if (auto context = weakContext.lock()) { | ||
| context->clientDisconnected = true; | ||
| } | ||
| }); | ||
| if (legacyExecutionContext->payload.client->isDisconnected()) { | ||
| return absl::CancelledError(); | ||
| } |
There was a problem hiding this comment.
Registering the disconnection callback before the isDisconnected() early-return can add unnecessary callback registrations for already-disconnected clients. Consider checking isDisconnected() first and returning absl::CancelledError() before registering the callback.
| std::weak_ptr<VisualLanguageModelLegacyServableExecutionContext> weakContext = legacyExecutionContext; | |
| legacyExecutionContext->payload.client->registerDisconnectionCallback([weakContext]() { | |
| if (auto context = weakContext.lock()) { | |
| context->clientDisconnected = true; | |
| } | |
| }); | |
| if (legacyExecutionContext->payload.client->isDisconnected()) { | |
| return absl::CancelledError(); | |
| } | |
| if (legacyExecutionContext->payload.client->isDisconnected()) { | |
| return absl::CancelledError(); | |
| } | |
| std::weak_ptr<VisualLanguageModelLegacyServableExecutionContext> weakContext = legacyExecutionContext; | |
| legacyExecutionContext->payload.client->registerDisconnectionCallback([weakContext]() { | |
| if (auto context = weakContext.lock()) { | |
| context->clientDisconnected = true; | |
| } | |
| }); |
| std::weak_ptr<LegacyServableExecutionContext> weakContext = legacyExecutionContext; | ||
| legacyExecutionContext->payload.client->registerDisconnectionCallback([weakContext]() { | ||
| if (auto context = weakContext.lock()) { | ||
| context->clientDisconnected = true; | ||
| } | ||
| }); | ||
| if (legacyExecutionContext->payload.client->isDisconnected()) { | ||
| return absl::CancelledError(); | ||
| } |
There was a problem hiding this comment.
Same pattern as the VLM path: checking isDisconnected() before registering the callback avoids unnecessary callback registrations when the client is already disconnected.
| std::weak_ptr<LegacyServableExecutionContext> weakContext = legacyExecutionContext; | |
| legacyExecutionContext->payload.client->registerDisconnectionCallback([weakContext]() { | |
| if (auto context = weakContext.lock()) { | |
| context->clientDisconnected = true; | |
| } | |
| }); | |
| if (legacyExecutionContext->payload.client->isDisconnected()) { | |
| return absl::CancelledError(); | |
| } | |
| if (legacyExecutionContext->payload.client->isDisconnected()) { | |
| return absl::CancelledError(); | |
| } | |
| std::weak_ptr<LegacyServableExecutionContext> weakContext = legacyExecutionContext; | |
| legacyExecutionContext->payload.client->registerDisconnectionCallback([weakContext]() { | |
| if (auto context = weakContext.lock()) { | |
| context->clientDisconnected = true; | |
| } | |
| }); |
| auto& requestExecutionContext = requests.front(); | ||
| if (requestExecutionContext->clientDisconnected) { | ||
| requestExecutionContext->success = false; | ||
| SPDLOG_LOGGER_DEBUG(llm_executor_logger, "Client disconnected, skipping request processing."); |
There was a problem hiding this comment.
Prefer explicitly reading the atomic via load() (optionally with std::memory_order_relaxed) rather than relying on implicit conversion, to make the concurrency intent clear and consistent across platforms/toolchains.
| auto& requestExecutionContext = requests.front(); | ||
| if (requestExecutionContext->clientDisconnected) { | ||
| requestExecutionContext->success = false; | ||
| SPDLOG_LOGGER_DEBUG(llm_executor_logger, "Client disconnected, skipping request processing."); |
There was a problem hiding this comment.
Same as the LLM executor: use requestExecutionContext->clientDisconnected.load(...) for a clearer, explicit atomic read.
| SPDLOG_LOGGER_TRACE(llm_executor_logger, "Generation started"); | ||
| try { | ||
| requestExecutionContext->results = pipe->generate(requestExecutionContext->inputText, requestExecutionContext->inputImages, requestExecutionContext->generationConfigBuilder->getConfig(), requestExecutionContext->textStreamer); | ||
| } catch (std::exception& e) { | ||
| requestExecutionContext->success = false; | ||
| SPDLOG_LOGGER_ERROR(llm_executor_logger, "VLM pipeline generation failed: {}.", e.what()); | ||
| } | ||
| SPDLOG_LOGGER_TRACE(llm_executor_logger, "Generation ended"); | ||
| } |
There was a problem hiding this comment.
The disconnection flag is only checked before starting pipe->generate(...). If a client disconnects during generation, the callback will set clientDisconnected, but the request will still run to completion and be reported as successful unless an exception occurs. If cancellation during generation is a goal, consider re-checking clientDisconnected after generation and marking success = false, or integrating a cancellation mechanism supported by pipe->generate(...) (if available).
No description provided.