Skip to content

feat(spider-execution-manager): Update Runtime implementation to track the consumed task assignment records.#350

Merged
LinZhihao-723 merged 3 commits into
y-scope:mainfrom
LinZhihao-723:em-runtime-update
Jun 20, 2026
Merged

feat(spider-execution-manager): Update Runtime implementation to track the consumed task assignment records.#350
LinZhihao-723 merged 3 commits into
y-scope:mainfrom
LinZhihao-723:em-runtime-update

Conversation

@LinZhihao-723

@LinZhihao-723 LinZhihao-723 commented Jun 18, 2026

Copy link
Copy Markdown
Member

Description

This PR updates the implementation of Runtime to reflect the latest changes in our em-to-scheduler protocol: the runtime now keeps track of a queue of previously consumed task assignments. These records are reported back to the scheduler on every task dispatch or on exit. The runtime also sets up a background task to send heartbeats to the scheduler periodically.

Checklist

  • The PR satisfies the contribution guidelines.
  • This is a breaking change and that has been indicated in the PR title, OR this isn't a
    breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

  • Ensure all workflows pass.
  • Update unit tests to make sure that on a graceful termination, the scheduler should receive feedbacks of all assigned tasks.
  • Add new unit test cases to make sure if the runtime is cancelled before a the storage responses for task registration, the task won't be marked as consumed.

Summary by CodeRabbit

Release Notes

  • New Features

    • Enhanced task assignment lifecycle tracking and acknowledgement
    • Added scheduler heartbeat monitoring for improved system coordination
  • Improvements

    • Refined error handling and reporting during task registration
    • Optimized shutdown procedure with better resource cleanup
  • Tests

    • Expanded test coverage for heartbeat synchronization and cancellation scenarios

@LinZhihao-723 LinZhihao-723 requested review from a team and sitaowang1998 as code owners June 18, 2026 22:47
@coderabbitai

coderabbitai Bot commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Walkthrough

Introduces TaskAssignment and TaskAssignmentRecord types in spider-core, consolidates TaskAssignment from spider-scheduler into spider-core, refactors SchedulerClient/SchedulerResponse to carry these types with acknowledgement and heartbeat semantics, wires a periodic scheduler heartbeat task and prev_assignments tracking into Runtime, and updates mocks and integration tests accordingly.

Changes

Task Assignment Lifecycle and Scheduler Heartbeat

Layer / File(s) Summary
Core task-assignment types
components/spider-core/src/types/scheduler.rs
Adds TaskAssignment and TaskAssignmentRecord public structs, expands ID imports, and introduces TaskAssignmentRecord::new.
SchedulerClient trait and SchedulerResponse refactor
components/spider-execution-manager/src/client/scheduler.rs
Refactors SchedulerResponse to hold task_assignment and scheduler_id; updates next_task to accept prev_assignment: Option<TaskAssignmentRecord> and shutdown to accept prev_assignments: Vec<TaskAssignmentRecord>.
TaskAssignment consolidation in spider-scheduler
components/spider-scheduler/src/types.rs
Removes local TaskAssignment struct and replaces it with a re-export from spider-core.
Runtime state, scheduler heartbeat task, and shutdown wiring
components/spider-execution-manager/src/runtime.rs
Adds scheduler_heartbeat_interval to RuntimeConfig, replaces StorageInvalidInput error variant with transparent StorageResponse, extends Runtime state with scheduler_heartbeat_join and prev_assignments, spawns a periodic scheduler heartbeat Tokio task, and refactors run shutdown to join both background tasks and call scheduler_client.shutdown with drained assignments.
Runtime main loop and register_task_instance refactor
components/spider-execution-manager/src/runtime.rs
Passes prev_assignments.pop_front() into next_task, updates session comparison and ExecuteRequest construction from response, and refactors register_task_instance to mark consumed assignments and route stale-cache/stale-session errors to Ok(None).
Mock scheduler and storage infrastructure
tests/huntsman/test-utils/Cargo.toml, tests/huntsman/test-utils/src/mock.rs
Adds dashmap; extends MockScheduler with outstanding tracking and heartbeat_count; adds inspector APIs; reworks SchedulerClient mock to acknowledge assignments; adds block_register control to MockStorage.
Integration test updates and new test cases
tests/huntsman/em-runtime/tests/test_runtime.rs
Updates helper construction and error injection to use CacheStale/StaleSession; adds scheduler.outstanding() == &[] post-cancellation assertions; adds liveness_and_scheduler_heartbeats_advance_in_lockstep and cancellation_during_register_leaves_assignment_unacked tests.

Sequence Diagram(s)

sequenceDiagram
  participant Runtime
  participant SchedulerHeartbeatTask
  participant SchedulerClient
  participant StorageClient

  note over Runtime: create()
  Runtime->>SchedulerHeartbeatTask: spawn periodic heartbeat loop
  note over Runtime: run() main loop
  Runtime->>SchedulerClient: next_task(em_id, prev_assignments.pop_front())
  SchedulerClient-->>Runtime: SchedulerResponse(task_assignment, scheduler_id, session_id)
  Runtime->>StorageClient: register_task_instance(response)
  alt registration ok
    StorageClient-->>Runtime: Ok(Some(instance))
    Runtime->>Runtime: mark_consume(assignment)
  else CacheStale or StaleSession
    StorageClient-->>Runtime: Err(CacheStale | StaleSession)
    Runtime->>Runtime: refresh liveness, mark_consume, return Ok(None)
  else other storage error
    StorageClient-->>Runtime: Err(other)
    Runtime-->>Runtime: RuntimeError::StorageResponse
  end
  note over Runtime: shutdown
  Runtime->>SchedulerHeartbeatTask: cancel token
  Runtime->>SchedulerClient: shutdown(em_id, prev_assignments.into())
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • y-scope/spider#329: Introduced the execution-manager Runtime::run/main_loop and the SchedulerClient::next_task interface that this PR directly extends with assignment acknowledgement and heartbeat wiring.
  • y-scope/spider#348: Evolved the shared task-assignment model to carry a globally unique TaskAssignmentId, directly preceding this PR's consolidation of TaskAssignment into spider-core and its use throughout the scheduler/EM boundary.
🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main change: updating the Runtime implementation to track consumed task assignment records, which is the central objective of this PR.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

result = self.scheduler_client.next_task(self.em_id) => {
result = self.scheduler_client.next_task(
self.em_id,
self.prev_assignments.pop_front()

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not report all received assignments?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure yet what the multi-executor implementation would look like, so I made the next_task only send one processed task assignment, assuming the task assignment is from its own previous execution. I think it should be fair for now since we only have one assignment per EM. We could promote this param to a vector in the future, though, if we found it makes more sense for a multi-executor case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants