Skip to content

[WIP] Async Scheduler Prototype #19831

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

LucasWilkinson
Copy link
Collaborator

@LucasWilkinson LucasWilkinson commented Jun 19, 2025

Overview

This PR prototypes async scheduling capabilities for vLLM V1 by decoupling scheduler state updates from model output processing, enabling the scheduler to run ahead of model execution for improved throughput. This prototype is an attempt to implement async scheduling without having a separate async scheduler that would need to be maintained for the feature matrix; instead we try to make it so the scheduler code is the same for both when it's running async and when it needs to run synchronously for features like spec decode.

Current Status

This is a prototype implementation focusing on core functionality. The naming is not well thought out yet and is open to suggestions.

  • Scheduler Tests Passing (synchronously) test_scheduler.py
  • Able to run engine core loop asynchronously
  • Benchmark Perf
  • V0 working (check for breakages)

Note: The RequestGenerationState and RequestSchedulerState classes currently use an __getattribute__ hack to minimize the diff in this PR, making it easier to review. However, we should come up with a better solution for the final implementation.

Caching Update: We now cache_blocks after model execution when the tokens are available, but we still need to figure out the best way to re-cache allocated blocks so that "If an allocated block is already full of tokens, we immediately add it to the Cache Block, so that the block can be reused by other requests in the same batch."

Key Changes

1. Split scheduler.update_from_output into Two Components

  • scheduler._update_from_output: Updates internal scheduler state required for the next scheduler.schedule() call
  • scheduler._process_model_output_tasks: Handles downstream responsibilities including:
    • Constructing EngineCoreOutputs
    • KV block caching for automatic prefix caching
    • Stop condition checking
    • KV connector updates

2. Asynchronous Model Output Processing

The scheduler._update_from_output method now accepts a Future[ModelRunnerOutput] instead of direct model outputs. This enables the scheduler to proceed to the next scheduler.schedule() call without waiting on the model to finish in some cases (if scheduler._update_from_output never awaits on the model output Future). However, when features require model outputs (e.g., speculative decoding, structured output), they can simply await on the Future - this blocks the scheduler thread until the model finishes executing, creating a seamless fallback to synchronous scheduling.

The scheduler._process_model_output_tasks takes the model output Future and converts it to a Future[EngineCoreOutputs], allowing the EngineCore to queue up an asyncio task to send outputs back to clients without blocking the core loop (i.e., the next scheduler.schedule() call).

This design balances the advantages of both approaches: asynchronous execution when possible, with automatic synchronous fallback when required by specific features.

3. Request Object Decomposition

The main challenge to a scheme like this is that vLLM uses a monolithic Request object which tracks basically everything related to a request, and many feature APIs use this Request object as input despite only using a very small subset of the attributes. A monolithic Request object poses a challenge for async scheduling since attributes related to scheduling might be running ahead of other attributes in the request object, creating potential inconsistencies if people assume the object is internally consistent.

To address this, we split the request object into focused components:

  • RequestParams: Immutable request parameters (request inputs) that remain constant during generation and can be shared around since they are always internally consistent, even when the scheduler is running ahead of the generation state. This includes:

    • Request metadata (request_id, client_index, arrival_time)
    • Model inputs (prompt_token_ids, sampling_params, multi_modal_inputs)
    • LoRA and structured output configurations
    • Stop conditions and token limits
  • RequestSchedulerState: Scheduler-specific mutable state that tracks:

    • Request status (WAITING, RUNNING, PREEMPTED, etc.)
    • Token counts for scheduling decisions (num_computed_tokens, num_tokens_with_spec)
    • KV cache management state (num_cached_tokens, kv_transfer_params)
    • Event logging for performance monitoring
    • Critically, this does not track the actual token IDs, allowing it to run ahead of the model. This was the major blocker with the monolithic request object since it contained all_token_ids, so for it to be internally consistent it would have to wait for the model output to be updated.
  • RequestGenerationState: Generation-specific mutable state that handles:

    • Output token accumulation and management
    • Stop condition evaluation
    • Sampling parameter enforcement
    • Output token ID sequences

This decomposition required extensive updates across the codebase, as many APIs only cared about request_id or a subset of the request object, so now we pass a subset of the monolithic request object since it no longer exists.

Copy link

👋 Hi! Thank you for contributing to the vLLM project.

💬 Join our developer Slack at https://slack.vllm.ai to discuss your PR in #pr-reviews, coordinate on features in #feat- channels, or join special interest groups in #sig- channels.

Just a reminder: PRs would not trigger full CI run by default. Instead, it would only run fastcheck CI which starts running only a small and essential subset of CI tests to quickly catch errors. You can run other CI tests on top of those by going to your fastcheck build on Buildkite UI (linked in the PR checks section) and unblock them. If you do not have permission to unblock, ping simon-mo or khluu to add you in our Buildkite org.

Once the PR is approved and ready to go, your PR reviewer(s) can run CI to test the changes comprehensively before merging.

To run CI, PR reviewers can either: Add ready label to the PR or enable auto-merge.

🚀

Copy link

mergify bot commented Jun 19, 2025

This pull request has merge conflicts that must be resolved before it can be
merged. Please rebase the PR, @LucasWilkinson.

https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/working-with-forks/syncing-a-fork

@mergify mergify bot added the needs-rebase label Jun 19, 2025
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @LucasWilkinson, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a prototype for an asynchronous scheduler, fundamentally restructuring how requests are managed and processed within the system. It aims to improve concurrency by decoupling model execution from scheduler state updates, centralizing request state management in the EngineCore, and refining the interfaces for various components like the KV cache manager and KV connectors.

Highlights

  • Asynchronous Scheduler: The core scheduler's update_from_output method has been refactored to be asynchronous, now accepting a Future for the model's output. This enables non-blocking operations and allows the system to potentially perform other tasks while waiting for model execution results.
  • Request Object Refactoring: The monolithic Request class has been split into three distinct classes: RequestParams (for immutable request parameters), RequestState (for mutable state managed by the EngineCore), and SchedulerRequestState (for mutable state specific to the scheduler's view). This clarifies ownership and responsibility for different aspects of a request's lifecycle.
  • Decoupled State Updates: The responsibility for processing ModelRunnerOutput and updating request states, including determining finished requests, has been moved from the Scheduler to the EngineCore. This centralizes request state management within the EngineCore and streamlines the data flow.
  • Stop Condition Relocation: The logic for checking request stop conditions (e.g., End-of-Sequence token, maximum token length) has been moved from the Scheduler to the EngineCore's post-processing of model outputs. This change is reflected in updated test cases and a clearer separation of concerns.
  • KV Cache Manager Interface Changes: Methods within the kv_cache_manager (e.g., allocate_slots, free, free_block_hashes) now directly accept request_id and num_computed_tokens instead of full Request objects. This promotes better modularity and reduces coupling between components.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces significant architectural changes, primarily focused on making the scheduler asynchronous and refactoring the Request object into RequestParams and RequestState/SchedulerRequestState. These changes propagate through many parts of the codebase, including tests, KV cache management, and the engine core.

Key observations:

  • The introduction of async operations in the scheduler (scheduler.update_from_output) and EngineCore.step is a major shift.
  • The Request object has been split, which generally improves separation of concerns (immutable parameters vs. mutable state).
  • Helper functions like scheduler_update_from_output in tests are good additions for managing async calls.

However, there are several areas that need attention:

  • Test Coverage: Large portions of tests, especially test_stop_via_update_from_output, are commented out with TODOs. This indicates a regression in test coverage or incomplete migration of functionality related to stop token handling. This is a high-priority concern.
  • API Changes: The scheduler.update_from_output method's signature and return type have changed significantly. The EngineCore now seems to handle more of the output processing logic that was previously in the scheduler. This needs to be carefully reviewed for correctness.
  • State Management: There's a new self.requests dictionary in EngineCore, which might duplicate state already managed by the scheduler. This could lead to complexity or inconsistencies.
  • Functional Changes: The removal of self.coordinator.cache_blocks from KVCacheManager.allocate_slots and changes to num_draft_tokens handling in slot allocation need clear justification and verification that caching and speculative decoding behavior remain correct.
  • Potential Bugs: There's a critical scope issue in gpu_model_runner.py where req_state is used without definition, and a potential incorrect attribute access in structured_output/__init__.py.

Overall, while the async refactoring is a substantial undertaking, the current state of the PR has some critical issues and missing test coverage that need to be addressed before merging. The PR description should also be updated to reflect the WIP nature and the significant changes, especially regarding stop token handling and test status.

Comment on lines +1484 to +1494
for req_id in self.input_batch.req_ids:
idx = self.input_batch.req_id_to_index[req_id]
num_new_tokens = len(valid_sampled_token_ids[idx])
self.input_batch.num_computed_tokens_cpu[idx] += num_new_tokens

if num_new_tokens == 1:
# Avoid slicing list in most common case.
req_state.output_token_ids.append(valid_sampled_token_ids[-1])
elif num_new_tokens > 0:
req_state.output_token_ids.extend(
valid_sampled_token_ids[-num_new_tokens:])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

In the new loop added to execute_model, req_state is used (e.g., req_state.output_token_ids.append(...)) but it's not defined within this loop's scope. It seems like it should be fetched from self.requests using req_id, similar to how self.requests[req_id] is used elsewhere in the codebase after the refactoring.

For example: req_state = self.requests[req_id].

Suggested change
for req_id in self.input_batch.req_ids:
idx = self.input_batch.req_id_to_index[req_id]
num_new_tokens = len(valid_sampled_token_ids[idx])
self.input_batch.num_computed_tokens_cpu[idx] += num_new_tokens
if num_new_tokens == 1:
# Avoid slicing list in most common case.
req_state.output_token_ids.append(valid_sampled_token_ids[-1])
elif num_new_tokens > 0:
req_state.output_token_ids.extend(
valid_sampled_token_ids[-num_new_tokens:])
for req_id in self.input_batch.req_ids:
idx = self.input_batch.req_id_to_index[req_id]
req_state = self.requests[req_id] # Define req_state
num_new_tokens = len(valid_sampled_token_ids[idx])
self.input_batch.num_computed_tokens_cpu[idx] += num_new_tokens
if num_new_tokens == 1:
# Avoid slicing list in most common case.
req_state.output_token_ids.append(valid_sampled_token_ids[idx][-1]) # Use idx here
elif num_new_tokens > 0:
req_state.output_token_ids.extend(
valid_sampled_token_ids[idx][-num_new_tokens:]) # Use idx here

Comment on lines 483 to 570
# TODO(lucas): new tests; stop tokens no longer handled by the scheduler
# scheduler_output = SchedulerOutput(scheduled_new_reqs=[],
# scheduled_cached_reqs=[],
# num_scheduled_tokens={
# requests[0].request_id: 1,
# requests[1].request_id: 2
# },
# total_num_scheduled_tokens=3,
# scheduled_encoder_inputs={},
# scheduled_spec_decode_tokens={
# requests[0].request_id: [],
# requests[1].request_id: [10]
# },
# num_common_prefix_blocks=0,
# finished_req_ids=set(),
# free_encoder_input_ids=[],
# structured_output_request_ids={},
# grammar_bitmask=None)

# model_output = ModelRunnerOutput(
# req_ids=[req.request_id for req in requests],
# req_id_to_index={
# req.request_id: i
# for i, req in enumerate(requests)
# },
# sampled_token_ids=[[EOS_TOKEN_ID],
# [10,
# 11]], # First request hits EOS, second continues
# spec_token_ids=None,
# logprobs=None,
# prompt_logprobs_dict={})

# scheduler_update_from_output(scheduler, scheduler_output, model_output)

# # Verify first request stopped, second continues
# assert len(scheduler.running) == 1
# assert scheduler.running[0].request_id == requests[1].request_id
# assert requests[0].status == RequestStatus.FINISHED_STOPPED
# assert requests[0].request_id in scheduler.finished_req_ids
# assert list(requests[0].output_token_ids) == [EOS_TOKEN_ID]
# assert list(requests[1].output_token_ids) == [10, 11]

# # Test case 2: Stop on custom stop token
# scheduler = create_scheduler(num_speculative_tokens=2)
# requests = create_requests(num_requests=2,
# max_tokens=10,
# stop_token_ids=[42, 43])
# for req in requests:
# req.num_computed_tokens = req.num_tokens
# scheduler.requests[req.request_id] = req
# scheduler.running.append(req)

# scheduler_output = SchedulerOutput(scheduled_new_reqs=[],
# scheduled_cached_reqs=[],
# num_scheduled_tokens={
# requests[0].request_id: 3,
# requests[1].request_id: 2
# },
# total_num_scheduled_tokens=5,
# scheduled_encoder_inputs={},
# scheduled_spec_decode_tokens={
# requests[0].request_id: [10, 42],
# requests[1].request_id: [13]
# },
# num_common_prefix_blocks=0,
# finished_req_ids=set(),
# free_encoder_input_ids=[],
# structured_output_request_ids={},
# grammar_bitmask=None)

# model_output = ModelRunnerOutput(
# req_ids=[req.request_id for req in requests],
# req_id_to_index={
# req.request_id: i
# for i, req in enumerate(requests)
# },
# sampled_token_ids=[[10, 42, 12],
# [13, 14]], # First request hits stop token
# spec_token_ids=None,
# logprobs=None,
# prompt_logprobs_dict={})

# scheduler_update_from_output(scheduler, scheduler_output, model_output)

# # Verify first request stopped on custom token
# assert len(scheduler.running) == 1
# assert scheduler.running[0].request_id == requests[1].request_id
# assert requests[0].status == RequestStatus.FINISHED_STOPPED
# assert requests[0].stop_reason == 42
# assert requests[0].request_id in scheduler.finished_req_ids
# assert list(requests[0].output_token_ids) == [10, 42]
# assert list(requests[1].output_token_ids) == [13, 14]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

A large portion of this test (test_stop_via_update_from_output for EOS and custom stop tokens) has been commented out with the TODO: new tests; stop tokens no longer handled by the scheduler. This indicates a significant change in functionality where stop token handling has moved out of the scheduler.

This PR should either include the new tests or clearly document that these tests will be addressed in a follow-up PR. Leaving critical test functionality commented out reduces test coverage and makes it harder to verify the correctness of the new stop token handling logic.

Comment on lines 626 to 670
# TODO(lucas): new tests; stop tokens no longer handled by the scheduler
# # Test case 4: Ignore EOS flag
# scheduler = create_scheduler(num_speculative_tokens=2)
# requests = create_requests(num_requests=1, max_tokens=10)
# requests[0].sampling_params.ignore_eos = True
# requests[0].num_computed_tokens = requests[0].num_tokens
# scheduler.requests[requests[0].request_id] = requests[0]
# scheduler.running.append(requests[0])

# scheduler_output = SchedulerOutput(
# scheduled_new_reqs=[],
# scheduled_cached_reqs=[],
# num_scheduled_tokens={requests[0].request_id: 3},
# total_num_scheduled_tokens=3,
# scheduled_encoder_inputs={},
# scheduled_spec_decode_tokens={
# requests[0].request_id: [EOS_TOKEN_ID, 10]
# },
# num_common_prefix_blocks=0,
# finished_req_ids=set(),
# free_encoder_input_ids=[],
# structured_output_request_ids={},
# grammar_bitmask=None)

# model_output = ModelRunnerOutput(
# req_ids=[requests[0].request_id],
# req_id_to_index={requests[0].request_id: 0},
# sampled_token_ids=[[EOS_TOKEN_ID, 10, 11]],
# spec_token_ids=None,
# logprobs=None,
# prompt_logprobs_dict={})

# scheduler_update_from_output(scheduler, scheduler_output, model_output)

# # Verify request continues past EOS
# assert len(scheduler.running) == 1
# assert not requests[0].is_finished()
# assert list(requests[0].output_token_ids) == [EOS_TOKEN_ID, 10, 11]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to the previous comment, a significant part of test_stop_via_update_from_output (specifically 'Test case 4: Ignore EOS flag') is commented out with a TODO. This needs to be addressed to ensure full test coverage for the new stop token handling mechanism.

Comment on lines 281 to 298
# P/D: delay caching blocks if we have to recv from
# remote. Update state for locally cached blocks.
if not self.enable_caching or delay_cache_blocks:
return KVCacheBlocks(new_blocks)

# Speculated tokens might be rejected in the future, so we does
# not cache any speculated tokens. We only cache blocks with
# generated (accepted) tokens.
self.coordinator.cache_blocks(
request, self.req_to_block_hashes[request.request_id],
num_computed_tokens + num_new_tokens - num_draft_tokens)

return KVCacheBlocks(new_blocks)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The call to self.coordinator.cache_blocks has been removed from allocate_slots. Previously, this was responsible for caching blocks with generated (accepted) tokens, explicitly not caching speculated tokens.

What is the new mechanism for caching blocks after allocation, especially considering the comment about not caching speculated tokens has also been removed? This change could impact prefix caching behavior and performance if blocks are not cached appropriately.

Comment on lines 243 to 251
new_blocks = self.kv_cache_manager.allocate_slots(
request,
request.request_id,
num_new_tokens,
num_draft_tokens=num_draft_tokens,
request.num_computed_tokens,
num_lookahead_tokens=self.num_lookahead_tokens)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The allocate_slots method in KVCacheManager now requires request_id and num_computed_tokens instead of the full request object. Additionally, the num_draft_tokens parameter has been removed from this call.

How is the number of draft tokens now accounted for during slot allocation, if at all? The previous logic explicitly used num_draft_tokens when calling self.coordinator.cache_blocks (which is now removed from allocate_slots). This change might affect how KV cache is managed for speculative decoding.

Comment on lines 301 to 304
# TODO(lucas): this is a edge case here where we are this request gets
# prempted by the next scheduler step (which may be past the stop token
# due to async scheduling)
all_full = request.num_tokens % self.block_size == 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The change from request.num_computed_tokens to request.num_tokens for calculating all_full is noted. The TODO comment highlights a potential edge case related to preemption and async scheduling affecting request.num_tokens. This is an important consideration for correctness.

Could you elaborate on the specific scenario where request.num_tokens might be past the stop token due to async scheduling and how this change addresses or relates to that? Ensuring all_full is calculated correctly is crucial for KV cache management.

@@ -85,22 +85,21 @@ class CachedRequestData:
# the request's block IDs. If True, new_block_ids will be used as the
# request's block IDs instead of appending to the existing block IDs.
resumed_from_preemption: bool
new_token_ids: list[int]
# new_token_ids: list[int] # TODO: remove (need to solve PP case)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The new_token_ids field and its associated logic have been commented out with a TODO: remove (need to solve PP case). This is a significant change to CachedRequestData.

What is the impact of removing new_token_ids on pipeline parallelism (PP)? If this is a known issue, it should be tracked and addressed. Ensure that the functionality relying on new_token_ids (especially in PP scenarios) is either refactored or the TODO is resolved.


assert request.is_finished()

delay_free_blocks, kv_xfer_params = self._connector_finished(request)
self.encoder_cache_manager.free(request)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In _free_request, self.encoder_cache_manager.free(request) is called. Given that request is of type SchedulerRequestState, and EncoderCacheManager.free expects RequestParams, this should be self.encoder_cache_manager.free(request.params) to align with the type hints and the refactoring pattern seen elsewhere.

Suggested change
self.encoder_cache_manager.free(request)
self.encoder_cache_manager.free(request.params)

@@ -71,6 +73,7 @@ def __init__(self,
VLLM_VERSION, vllm_config)

self.log_stats = log_stats
self.requests: dict[str, RequestState] = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The EngineCore class now has its own self.requests: dict[str, RequestState]. The Scheduler also maintains a self.requests: dict[str, SchedulerRequestState]. While RequestState and SchedulerRequestState are different, they both store state related to requests.

Is there a reason for this duplication? Could this lead to inconsistencies or increased complexity in managing request lifecycle and state? Consider if the scheduler's request store could be the single source of truth, or if EngineCore truly needs its own separate mapping.

@@ -88,13 +88,13 @@ def grammar_init(self, request: Request) -> None:
f"Unsupported structured output backend: {backend}")

grammar = self.executor.submit(self._async_create_grammar, request)
request.structured_output_request.grammar = grammar # type: ignore[assignment]
request.params.structured_output_request.grammar = grammar # type: ignore[assignment]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In grammar_init, request is of type RequestParams. The line request.params.structured_output_request.grammar = grammar attempts to access request.params.structured_output_request. However, structured_output_request is a direct attribute of RequestParams.

This should likely be request.structured_output_request.grammar = grammar.

Suggested change
request.params.structured_output_request.grammar = grammar # type: ignore[assignment]
request.structured_output_request.grammar = grammar # type: ignore[assignment]

@mergify mergify bot added the tpu Related to Google TPUs label Jun 19, 2025
Signed-off-by: Lucas Wilkinson <lwilkinson@neuralmagic.com>

wip

Signed-off-by: Lucas Wilkinson <lwilkinson@neuralmagic.com>

wip

Signed-off-by: Lucas Wilkinson <lwilkinson@neuralmagic.com>

renaming plus more tests passing

Signed-off-by: Lucas Wilkinson <lwilkinson@neuralmagic.com>

updates

Signed-off-by: Lucas Wilkinson <lwilkinson@neuralmagic.com>

more tests passing

Signed-off-by: Lucas Wilkinson <lwilkinson@neuralmagic.com>

fix mm test failure

Signed-off-by: Lucas Wilkinson <lwilkinson@neuralmagic.com>

restore caching behavior

Signed-off-by: Lucas Wilkinson <lwilkinson@neuralmagic.com>

tests passing

Signed-off-by: Lucas Wilkinson <lwilkinson@neuralmagic.com>

fix tests

Signed-off-by: Lucas Wilkinson <lwilkinson@neuralmagic.com>

wip

Signed-off-by: Lucas Wilkinson <lwilkinson@neuralmagic.com>
Signed-off-by: Lucas Wilkinson <lwilkinson@neuralmagic.com>
Signed-off-by: Lucas Wilkinson <lwilkinson@neuralmagic.com>
Signed-off-by: Lucas Wilkinson <lwilkinson@neuralmagic.com>
@LucasWilkinson LucasWilkinson force-pushed the lwilkinson/async-scheduler branch from 8a40787 to a57d588 Compare June 23, 2025 21:21
Signed-off-by: Lucas Wilkinson <lwilkinson@neuralmagic.com>
Signed-off-by: Lucas Wilkinson <lwilkinson@neuralmagic.com>
@mergify mergify bot removed the tpu Related to Google TPUs label Jun 23, 2025
Signed-off-by: Lucas Wilkinson <lwilkinson@neuralmagic.com>
@erictanjn
Copy link

Overview

This PR prototypes async scheduling capabilities for vLLM V1 by decoupling scheduler state updates from model output processing, enabling the scheduler to run ahead of model execution for improved throughput. This prototype is an attempt to implement async scheduling without having a separate async scheduler that would need to be maintained for the feature matrix; instead we try to make it so the scheduler code is the same for both when it's running async and when it needs to run synchronously for features like spec decode.

Current Status

This is a prototype implementation focusing on core functionality. The naming is not well thought out yet and is open to suggestions.

  • Scheduler Tests Passing (synchronously) test_scheduler.py
  • Able to run engine core loop asynchronously
  • Benchmark Perf
  • V0 working (check for breakages)

Note: The RequestGenerationState and RequestSchedulerState classes currently use an __getattribute__ hack to minimize the diff in this PR, making it easier to review. However, we should come up with a better solution for the final implementation.

Caching Update: We now cache_blocks after model execution when the tokens are available, but we still need to figure out the best way to re-cache allocated blocks so that "If an allocated block is already full of tokens, we immediately add it to the Cache Block, so that the block can be reused by other requests in the same batch."

Key Changes

1. Split scheduler.update_from_output into Two Components

  • scheduler._update_from_output: Updates internal scheduler state required for the next scheduler.schedule() call

  • scheduler._process_model_output_tasks: Handles downstream responsibilities including:

    • Constructing EngineCoreOutputs
    • KV block caching for automatic prefix caching
    • Stop condition checking
    • KV connector updates

2. Asynchronous Model Output Processing

The scheduler._update_from_output method now accepts a Future[ModelRunnerOutput] instead of direct model outputs. This enables the scheduler to proceed to the next scheduler.schedule() call without waiting on the model to finish in some cases (if scheduler._update_from_output never awaits on the model output Future). However, when features require model outputs (e.g., speculative decoding, structured output), they can simply await on the Future - this blocks the scheduler thread until the model finishes executing, creating a seamless fallback to synchronous scheduling.

The scheduler._process_model_output_tasks takes the model output Future and converts it to a Future[EngineCoreOutputs], allowing the EngineCore to queue up an asyncio task to send outputs back to clients without blocking the core loop (i.e., the next scheduler.schedule() call).

This design balances the advantages of both approaches: asynchronous execution when possible, with automatic synchronous fallback when required by specific features.

3. Request Object Decomposition

The main challenge to a scheme like this is that vLLM uses a monolithic Request object which tracks basically everything related to a request, and many feature APIs use this Request object as input despite only using a very small subset of the attributes. A monolithic Request object poses a challenge for async scheduling since attributes related to scheduling might be running ahead of other attributes in the request object, creating potential inconsistencies if people assume the object is internally consistent.

To address this, we split the request object into focused components:

  • RequestParams: Immutable request parameters (request inputs) that remain constant during generation and can be shared around since they are always internally consistent, even when the scheduler is running ahead of the generation state. This includes:

    • Request metadata (request_id, client_index, arrival_time)
    • Model inputs (prompt_token_ids, sampling_params, multi_modal_inputs)
    • LoRA and structured output configurations
    • Stop conditions and token limits
  • RequestSchedulerState: Scheduler-specific mutable state that tracks:

    • Request status (WAITING, RUNNING, PREEMPTED, etc.)
    • Token counts for scheduling decisions (num_computed_tokens, num_tokens_with_spec)
    • KV cache management state (num_cached_tokens, kv_transfer_params)
    • Event logging for performance monitoring
    • Critically, this does not track the actual token IDs, allowing it to run ahead of the model. This was the major blocker with the monolithic request object since it contained all_token_ids, so for it to be internally consistent it would have to wait for the model output to be updated.
  • RequestGenerationState: Generation-specific mutable state that handles:

    • Output token accumulation and management
    • Stop condition evaluation
    • Sampling parameter enforcement
    • Output token ID sequences

This decomposition required extensive updates across the codebase, as many APIs only cared about request_id or a subset of the request object, so now we pass a subset of the monolithic request object since it no longer exists.

Hello, assuming that when Scheduling 2 occurs, it should require the output of Execution 1, I'd like to know how asynchronous scheduling would specifically be implemented here. Can Scheduling 2 completely overlap with Execution 1?

@LucasWilkinson
Copy link
Collaborator Author

Superseded by #19970 ; Request decomposition will be pushed into its own PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

2 participants