diff --git a/.github/workflows/e2e-bm.yaml b/.github/workflows/e2e-bm.yaml index 3b77921c..0301fae6 100644 --- a/.github/workflows/e2e-bm.yaml +++ b/.github/workflows/e2e-bm.yaml @@ -106,6 +106,15 @@ jobs: echo "=== Check flmctl connectivity ===" flmctl list -a || echo "Warning: Could not list applications" echo "" + echo "=== Flame nodes ===" + flmctl list -n || echo "Warning: Could not list nodes" + echo "" + echo "=== Flame node details ===" + for node in $(flmctl list -n 2>/dev/null | awk 'NR > 1 {print $1}'); do + echo "--- Node: $node ---" + flmctl view -n "$node" || echo "Warning: Could not view node $node" + done + echo "" echo "=== Verify object cache is accessible ===" curl -s http://127.0.0.1:9090/ || echo "Note: Object cache gRPC endpoint (expected no HTTP response)" diff --git a/common/src/apis/from_rpc.rs b/common/src/apis/from_rpc.rs index d1f5f5ef..43ea13a4 100644 --- a/common/src/apis/from_rpc.rs +++ b/common/src/apis/from_rpc.rs @@ -82,6 +82,15 @@ impl From for Event { } } +impl From for FlameResult { + fn from(result: rpc::Result) -> Self { + Self { + return_code: result.return_code, + message: result.message, + } + } +} + impl TryFrom for TaskContext { type Error = FlameError; diff --git a/common/src/apis/session.rs b/common/src/apis/session.rs index a2bec4c6..a7f4fb5d 100644 --- a/common/src/apis/session.rs +++ b/common/src/apis/session.rs @@ -21,6 +21,10 @@ impl Session { self.status.state == SessionState::Closed } + pub fn is_ready(&self, retry_limits: u32) -> bool { + self.retry_count < retry_limits + } + pub fn update_task(&mut self, task: &Task) -> Result<(), FlameError> { let task_ptr = TaskPtr::new(task.clone().into()); @@ -127,3 +131,27 @@ impl Session { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn is_ready_uses_transient_retry_count() { + assert!(Session { + retry_count: 1, + ..Default::default() + } + .is_ready(2)); + assert!(!Session { + retry_count: 2, + ..Default::default() + } + .is_ready(2)); + assert!(!Session { + retry_count: 3, + ..Default::default() + } + .is_ready(2)); + } +} diff --git a/common/src/apis/to_rpc.rs b/common/src/apis/to_rpc.rs index e3c2116a..edd7c829 100644 --- a/common/src/apis/to_rpc.rs +++ b/common/src/apis/to_rpc.rs @@ -94,6 +94,15 @@ impl From for rpc::Event { } } +impl From for rpc::Result { + fn from(result: FlameResult) -> Self { + Self { + return_code: result.return_code, + message: result.message, + } + } +} + impl From for rpc::TaskContext { fn from(ctx: TaskContext) -> Self { Self { diff --git a/common/src/apis/types.rs b/common/src/apis/types.rs index db46ccbf..d24286ab 100644 --- a/common/src/apis/types.rs +++ b/common/src/apis/types.rs @@ -21,6 +21,14 @@ use stdng::MutexPtr; pub const DEFAULT_MAX_INSTANCES: u32 = 1_000_000; pub const DEFAULT_DELAY_RELEASE: Duration = Duration::seconds(60); +pub const BIND_RESULT_OK: i32 = 0; +pub const BIND_RESULT_APPLICATION_INSTALL_FAILED: i32 = 10; +pub const BIND_RESULT_SHIM_CREATE_FAILED: i32 = 11; +pub const BIND_RESULT_ON_SESSION_ENTER_FAILED: i32 = 12; +pub const BIND_RESULT_UNKNOWN_FAILED: i32 = 19; +pub const SESSION_EVENT_TASK_ID: i64 = 0; +pub const SESSION_BIND_FAILED: i32 = 1001; +pub const SESSION_RETRY_LIMIT_REACHED: i32 = 1002; pub type SessionID = String; pub type TaskID = i64; @@ -42,6 +50,15 @@ pub struct EventOwner { pub session_id: SessionID, } +impl EventOwner { + pub fn session(session_id: SessionID) -> Self { + Self { + session_id, + task_id: SESSION_EVENT_TASK_ID, + } + } +} + #[derive(Clone, Debug)] pub struct Event { pub code: i32, @@ -49,6 +66,12 @@ pub struct Event { pub creation_time: DateTime, } +#[derive(Clone, Debug, Default, Eq, PartialEq)] +pub struct FlameResult { + pub return_code: i32, + pub message: Option, +} + #[derive(Clone, Debug, Default)] pub struct TaskResult { pub state: TaskState, @@ -226,6 +249,7 @@ pub struct Session { pub batch_size: u32, pub priority: u32, pub resreq: Option, + pub retry_count: u32, } #[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Hash, strum_macros::Display)] @@ -735,6 +759,21 @@ mod tests { assert!(!rr(2, 2, 5).great(&rr(1, 2, 3))); } + #[test] + fn flame_result_converts_to_and_from_rpc_result() { + let result = FlameResult { + return_code: 12, + message: Some("bind failed".to_string()), + }; + + let rpc_result: rpc::flame::v1::Result = result.clone().into(); + assert_eq!(rpc_result.return_code, 12); + assert_eq!(rpc_result.message.as_deref(), Some("bind failed")); + + let parsed = FlameResult::from(rpc_result); + assert_eq!(parsed, result); + } + #[test] fn add_sums_all_three_fields() { let mut a = rr(1, 2, 3); diff --git a/common/src/ctx.rs b/common/src/ctx.rs index 34eb0d79..090872d6 100644 --- a/common/src/ctx.rs +++ b/common/src/ctx.rs @@ -32,6 +32,7 @@ const DEFAULT_FLAME_ENDPOINT: &str = "http://127.0.0.1:8080"; pub const DEFAULT_POLICIES: &[&str] = &["priority", "drf", "gang"]; const DEFAULT_STORAGE: &str = "sqlite://flame.db"; const DEFAULT_MAX_EXECUTORS_PER_NODE: u32 = 128; +pub const DEFAULT_SESSION_RETRY_LIMITS: u32 = 5; const DEFAULT_SCHEDULE_INTERVAL: u64 = 100; const DEFAULT_SHIM: &str = "host"; const DEFAULT_FLAME_CACHE_ENDPOINT: &str = "http://127.0.0.1:9090"; @@ -69,6 +70,18 @@ struct FlameClusterYaml { pub limits: Option, /// pprof profiling configuration pub pprof: Option, + /// Recovery configuration + pub recovery: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct FlameRecoveryYaml { + pub session: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct FlameSessionRecoveryYaml { + pub retry_limits: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -148,6 +161,7 @@ pub struct FlameCluster { pub executors: FlameExecutors, pub tls: Option, pub limits: FlameLimits, + pub recovery: FlameRecovery, pub pprof: Option, } @@ -162,6 +176,16 @@ pub struct FlameLimits { pub max_executors: u32, } +#[derive(Debug, Clone, Default)] +pub struct FlameRecovery { + pub session: FlameSessionRecovery, +} + +#[derive(Debug, Clone)] +pub struct FlameSessionRecovery { + pub retry_limits: u32, +} + #[derive(Debug, Clone)] pub struct FlamePprof { pub port: u16, @@ -393,6 +417,10 @@ impl TryFrom for FlameCluster { let limits = cluster.limits.map(FlameLimits::from).unwrap_or_default(); let pprof = cluster.pprof.map(FlamePprof::from); + let recovery = cluster + .recovery + .map(FlameRecovery::from) + .unwrap_or_default(); Ok(FlameCluster { name: cluster.name, @@ -412,11 +440,39 @@ impl TryFrom for FlameCluster { executors, tls, limits, + recovery, pprof, }) } } +impl From for FlameRecovery { + fn from(yaml: FlameRecoveryYaml) -> Self { + FlameRecovery { + session: yaml + .session + .map(FlameSessionRecovery::from) + .unwrap_or_default(), + } + } +} + +impl From for FlameSessionRecovery { + fn from(yaml: FlameSessionRecoveryYaml) -> Self { + FlameSessionRecovery { + retry_limits: yaml.retry_limits.unwrap_or(DEFAULT_SESSION_RETRY_LIMITS), + } + } +} + +impl Default for FlameSessionRecovery { + fn default() -> Self { + FlameSessionRecovery { + retry_limits: DEFAULT_SESSION_RETRY_LIMITS, + } + } +} + impl TryFrom for FlameExecutors { type Error = FlameError; fn try_from(executors: FlameExecutorsYaml) -> Result { @@ -464,6 +520,7 @@ impl Default for FlameCluster { executors: FlameExecutors::default(), tls: None, limits: FlameLimits::default(), + recovery: FlameRecovery::default(), pprof: None, } } @@ -579,6 +636,29 @@ cluster: Ok(()) } + #[test] + fn test_flame_context_with_session_recovery_retry_limits() -> Result<(), FlameError> { + let context_string = r#"--- +cluster: + name: flame + endpoint: "http://flame-session-manager:8080" + recovery: + session: + retry_limits: 2 + "#; + + let tmp_dir = TempDir::new().unwrap(); + let tmp_file = tmp_dir.path().join("flame-cluster.yaml"); + + fs::write(&tmp_file, context_string).map_err(|e| FlameError::Internal(e.to_string()))?; + + let ctx = FlameClusterContext::from_file(Some(tmp_file.to_string_lossy().to_string()))?; + + assert_eq!(ctx.cluster.recovery.session.retry_limits, 2); + + Ok(()) + } + #[test] fn test_flame_context_with_cache_eviction() -> Result<(), FlameError> { let context_string = r#"--- diff --git a/docs/designs/RFE25-session-bind-failure-recovery/FS.md b/docs/designs/RFE25-session-bind-failure-recovery/FS.md new file mode 100644 index 00000000..94238c38 --- /dev/null +++ b/docs/designs/RFE25-session-bind-failure-recovery/FS.md @@ -0,0 +1,584 @@ +--- +Issue: #25 +Author: klaus +Date: 2026-05-19 +Status: Draft +--- + +# RFE25: Session Bind Failure Recovery + +## 1. Motivation + +**Background:** + +Flame currently separates session assignment from service initialization: + +1. The session manager scheduler selects an executor and moves it to `Binding`. +2. The executor manager receives the assignment through the node stream. +3. The executor manager installs the application, creates a shim instance, and calls `OnSessionEnter`. +4. On success, the executor manager calls `BindExecutorCompleted`, and the session manager moves the executor to `Bound`. + +This makes `OnSessionEnter` the real admission point for a session on a host. If that call fails today, the executor manager retries locally, logs the failure, transitions the local executor to `Unbinding`, and eventually returns the same executor to `Idle`. The session manager does not record a user-visible event for the bind failure or enforce a session-level retry limit, leaving users with little visibility into why the session is not making progress. + +**Target:** + +When session bind or enter fails, Flame should: + +1. Record a session-level event that identifies the session, executor, node, session retry count, error code, and error message. +2. Detach the failed executor binding so the session can be scheduled again. +3. Remove local `OnSessionEnter` retry loops from executor-manager; each bind attempt calls `OnSessionEnter` once. +4. Enforce `recovery.session.retry_limits` with a transient retry counter on the in-memory session object. +5. Preserve existing task state semantics; no task should be marked failed merely because a session enter attempt failed before any task was launched. + +## 2. Function Specification + +### Configuration + +Use the existing recovery/session namespace for session admission recovery: + +| Parameter | Description | Default | +| --------- | ----------- | ------- | +| `recovery.session.retry_limits` | Maximum failed session bind completions allowed for one session before Flame stops retrying that session. | `5` | + +Example: + +```yaml +cluster: + recovery: + session: + retry_limits: 5 +``` + +### API + +#### Backend RPC + +Extend `BindExecutorCompletedRequest` so executor-manager reports the result of the session bind/enter attempt through the completion RPC using the existing wire `Result` message. At the Rust implementation boundary, convert this wire type into an internal `FlameResult`; session-manager controller and state logic should use `FlameResult`, not `rpc::Result`. + +```protobuf +message BindExecutorCompletedRequest { + string executor_id = 1; + optional Result result = 2; // omitted or return_code == 0 means success +} +``` + +Behavior: + +- `result` omitted or `result.return_code == 0`: `BindingState::bind_session_completed(result)` resets the session object's transient `retry_count` and moves the executor from `Binding` to `Bound`. +- `result.return_code != 0`: `BindingState::bind_session_completed(result)` increments the session object's transient `retry_count`, records the bind failure event with `result.return_code` and `result.message`, detaches the session from the executor, and transitions it from `Binding` to `Unbinding` for cleanup. +- If the session `retry_count` reaches `recovery.session.retry_limits`, record a session event that the retry limit was reached. Keep the session open, but do not assign, pipeline, or bind a new executor to it while the retry count remains at or above the limit. Existing executors already assigned to the session are not changed by retry-limit exhaustion. +- Do not clamp `retry_count` at `retry_limits`. Multiple executors may already be in `Binding` for the same session when the limit is reached, so later in-flight bind failures can push `retry_count` above the configured limit. + +Define shared constants for `Result.return_code` so the failure phase is machine-readable without adding a new RPC message: + +| Constant | Code | Meaning | +| -------- | ---- | ------- | +| `BIND_RESULT_OK` | `0` | Bind completed successfully. | +| `BIND_RESULT_APPLICATION_INSTALL_FAILED` | `10` | Executor-manager failed to install or prepare the application package. | +| `BIND_RESULT_SHIM_CREATE_FAILED` | `11` | Executor-manager failed to create the shim or service instance. | +| `BIND_RESULT_ON_SESSION_ENTER_FAILED` | `12` | The service instance returned an error from `OnSessionEnter`. | +| `BIND_RESULT_UNKNOWN_FAILED` | `19` | Bind failed before executor-manager could classify the phase. | + +`Result.message` should carry human-readable details and any source-specific code, for example `installer exited with status 127` or `service_return_code=7: missing model file`. `Result.return_code` should identify the Flame bind phase, not carry arbitrary plugin or service codes. + +No frontend RPC changes are required. Session events are returned through existing `GetSession` and `ListSession` responses by populating `SessionStatus.events`. + +### Event Model + +Existing event storage is task-scoped by `EventOwner { session_id, task_id }`, while `SessionStatus` already exposes `events`. This RFE reserves `task_id = 0` as the session-level event owner. Real task IDs start at `1`, so this does not conflict with normal task lifecycle events. + +Add common event code constants rather than scattering raw integers: + +| Constant | Code | Owner | Meaning | +| -------- | ---- | ----- | ------- | +| `SESSION_BIND_FAILED` | `1001` | session (`task_id = 0`) | A session bind/enter attempt failed. | +| `SESSION_RETRY_LIMIT_REACHED` | `1002` | session (`task_id = 0`) | A session reached its retry limit and will not receive new executor assignments. Existing assigned executors are unaffected. | + +Event messages should be concise and structured enough for humans: + +```text +Executor on node failed to bind session with return_code <12>: service_return_code=7: missing model file +Session retry limit reached after 5/5 failed attempts; new executor assignment is paused for this session; existing executors are unchanged. +``` + +### CLI + +`flmctl view --session ` should show recent session events in table output. JSON output already includes the session object and should include `status.events` after the backend populates it. + +Example table addition: + +```text +Events: + 10:31:02.125 Executor on node failed to bind session with return_code <12>: service_return_code=7: missing dependency (1001) +``` + +### Scope + +**In Scope:** + +- User-visible session events for bind/enter failures. +- A bind result report path from executor-manager to session-manager. +- Detaching failed `Binding` executors from the session so the session can be scheduled again. +- Configuring the global session retry limit with `recovery.session.retry_limits`. +- A transient `retry_count` on the in-memory session object. +- Focused tests for failure reporting, event recording, and unbind cleanup. + +**Out of Scope:** + +- Adding a terminal `SessionState::Failed`. +- Failing pending tasks because session enter failed. +- Checkpointing or resuming partially entered service state. +- Node health diagnosis. A bind failure affects only the session/node pair, not the global node state. +- Task retry counts from RFE384. This RFE handles pre-task session admission, not running task retry. +- Host avoidance behavior such as blocking hosts, excluding hosts from scheduling, or forcing rebinding to a different host. That can be handled in a later RFE. + +**Limitations:** + +- After a bind failure, the normal scheduler may select the same host again. This design only records the failure and releases the binding; host avoidance is out of scope. +- If an application is broken on every host, Flame will keep retrying according to normal scheduling behavior until the session reaches `recovery.session.retry_limits`. +- When the session reaches `recovery.session.retry_limits`, Flame keeps the session open but stops assigning or binding new executors to it. Existing assigned executors are not preempted or unbound by this condition. Pending tasks remain pending until the user closes the session or the transient retry count is cleared by a session-manager restart. + +### Feature Interaction + +**Related Features:** + +- Scheduler dispatch and allocation actions select idle, void, and unbinding executors. +- RFE384 connection recovery handles node disconnect and task retry. This RFE handles successful node connectivity with failed session admission. +- Existing task event storage and `SessionStatus.events`. + +**Compatibility:** + +- Adding an optional `Result` to `BindExecutorCompletedRequest` is wire compatible if omitted values are treated as success. +- Existing executor managers that do not send `result` continue current behavior. +- Existing clients can ignore new event codes. + +**Breaking Changes:** + +None. + +## 3. Implementation Detail + +### Architecture + +```mermaid +sequenceDiagram + participant SCH as Scheduler + participant SM as Session Manager + participant EM as Executor Manager + participant SVC as Service Instance + + SCH->>SM: bind_session(executor, session) + SM-->>EM: Executor state Binding(session) + EM->>SM: BindExecutor(executor) + SM-->>EM: Application + Session + EM->>SVC: OnSessionEnter(session) + SVC-->>EM: failure + EM->>SM: BindExecutorCompleted(executor, result=error) + SM->>SM: record session event task_id=0 + SM->>SM: increment session.retry_count + SM->>SM: detach executor from session; state=Unbinding + EM->>SVC: OnSessionLeave best effort + EM->>SM: UnbindExecutorCompleted(executor) + SM->>SM: executor state Idle + SCH->>SM: next cycle skips session if !session.is_ready(retry_limits) +``` + +### Components + +| Component | Responsibility | +| --------- | -------------- | +| `executor_manager::states::idle` | Call `OnSessionEnter` once per bind attempt; report bind success or failure through `BindExecutorCompletedRequest.result`. | +| `executor_manager::states::unbinding` | Clean up shim state. For failed enter, `OnSessionLeave` is best effort because enter did not complete. | +| `session_manager::apiserver::backend` | Convert `BindExecutorCompletedRequest.result` from `rpc::Result` to `FlameResult` and forward it without interpreting success or failure. | +| `session_manager::controller` | Convert the backend completion into a state-machine call, persist the resulting executor state, and notify the node. It should not interpret bind result success/failure itself. | +| `session_manager::controller::executors::binding` | Interpret bind completion success/failure, validate binding state/session attachment, update the session object's transient `retry_count`, record bind failure and retry-limit events, and move the executor to `Bound` or `Unbinding`. | +| `session_manager::scheduler` | Skip sessions where `SessionInfo::is_ready(retry_limits)` is false; do not modify executors already assigned to that session. | +| `session_manager::storage` | Record session-level events. Do not persist `retry_count`. | +| `flmctl` | Display session-level events in session table output. | + +### Data Structures + +Reuse the existing RPC `Result` on the wire, but use an internal `FlameResult` in Rust code: + +```rust +pub struct FlameResult { + pub return_code: i32, + pub message: Option, +} + +impl From for FlameResult { ... } +impl From for rpc::Result { ... } + +pub const BIND_RESULT_OK: i32 = 0; +pub const BIND_RESULT_APPLICATION_INSTALL_FAILED: i32 = 10; +pub const BIND_RESULT_SHIM_CREATE_FAILED: i32 = 11; +pub const BIND_RESULT_ON_SESSION_ENTER_FAILED: i32 = 12; +pub const BIND_RESULT_UNKNOWN_FAILED: i32 = 19; +``` + +Add a transient `retry_count` to the session manager's in-memory session object. This is the global session recovery retry count, not a bind-only counter. This RFE increments it on failed bind completion, resets it on successful bind completion, and clears it when the session is deleted or the session manager restarts. Do not add this field to the persisted storage engine record or public RPC `Session` object: + +```rust +pub struct Session { + // existing fields... + pub retry_count: u32, +} + +impl Session { + pub fn is_ready(&self, retry_limits: u32) -> bool { + self.retry_count < retry_limits + } +} +``` + +`SessionInfo` should mirror `retry_count` when scheduler snapshots are built, so scheduler decisions do not need a separate retry map: + +```rust +pub struct SessionInfo { + // existing fields... + pub retry_count: u32, +} + +impl SessionInfo { + pub fn is_ready(&self, retry_limits: u32) -> bool { + self.retry_count < retry_limits + } +} +``` + +Use these helpers for admission checks instead of open-coding `retry_count < retry_limits`. The helper name describes readiness for new executor assignment, not session liveness; a session can be open but not ready for new executors after it reaches the retry limit. + +`retry_count` is not capped by `retry_limits`. The limit controls whether the session is ready for new executor assignment; it does not cancel bind attempts that are already in flight. + +Add session-level event helpers: + +```rust +pub const SESSION_EVENT_TASK_ID: TaskID = 0; + +impl EventOwner { + pub fn session(session_id: SessionID) -> Self { + Self { + session_id, + task_id: SESSION_EVENT_TASK_ID, + } + } +} +``` + +`Storage::get_session` and `Storage::list_session` should populate `session.events` from `EventOwner::session(session.id.clone())`. + +### Algorithms + +#### Executor Manager Enter Failure + +```text +function bind_idle_executor(executor): + session = backend.BindExecutor(executor.id) + if session is none: + transition executor to Releasing + return + + if install application fails: + backend.BindExecutorCompleted( + executor.id, + result = { + return_code: BIND_RESULT_APPLICATION_INSTALL_FAILED, + message: install_error, + }, + ) + transition executor to Unbinding + return + + if create shim fails: + backend.BindExecutorCompleted( + executor.id, + result = { + return_code: BIND_RESULT_SHIM_CREATE_FAILED, + message: shim_error, + }, + ) + transition executor to Unbinding + return + + enter_result = shim.OnSessionEnter(session) + if enter_result succeeds: + backend.BindExecutorCompleted( + executor.id, + result = { + return_code: BIND_RESULT_OK, + }, + ) + transition executor to Bound + return + + backend.BindExecutorCompleted( + executor.id, + result = { + return_code: BIND_RESULT_ON_SESSION_ENTER_FAILED, + message: format("service_return_code={}: {}", enter_result.return_code, enter_result.error_message), + }, + ) + transition executor to Unbinding +``` + +Application install and shim creation failures use the same `BindExecutorCompleted` report path with phase-specific `Result.return_code` values and clear `Result.message` strings. If no shim instance exists, executor-manager should skip `OnSessionLeave` during local cleanup. + +#### Session Manager Completion Handling + +```text +function bind_executor_completed(executor_id, result): + executor = storage.get_executor(executor_id) + state = executor_state_machine(executor) + state.bind_session_completed(result) + storage.update_executor(executor) + notify executor node + +function BindingState.bind_session_completed(result): + executor = self.executor + + if result is omitted or result.return_code == 0: + if executor.ssn_id exists: + session = storage.get_session_ptr(executor.ssn_id) + session.retry_count = 0 + executor.state = Bound + return + + if executor.ssn_id is none: + return INVALID_STATE + + session_id = executor.ssn_id + now = clock.now() + + storage.record_event( + EventOwner::session(session_id), + Event { + code: SESSION_BIND_FAILED, + message: format( + "Executor <{}> on node <{}> failed to bind session with return_code <{}>: {}", + executor.id, + executor.node, + result.return_code, + result.message.unwrap_or_default(), + ), + creation_time: now, + }, + ) + + increment_session_retry_count(session_id) + executor.state = Unbinding + executor.ssn_id = none + executor.task_id = none + +function increment_session_retry_count(session_id): + session = storage.get_session_ptr(session_id) + previous_retry_count = session.retry_count + session.retry_count += 1 + retry_count = session.retry_count + retry_limit = config.recovery.session.retry_limits + + if previous_retry_count < retry_limit and retry_count >= retry_limit: + storage.record_event( + EventOwner::session(session_id), + Event { + code: SESSION_RETRY_LIMIT_REACHED, + message: format( + "Session retry limit reached after {}/{} failed attempts; new executor assignment is paused for this session; existing executors are unchanged.", + retry_count, + retry_limit, + ), + creation_time: clock.now(), + }, + ) + + // Additional in-flight binding executors may fail after the limit is reached. + // Keep incrementing retry_count and recording SESSION_BIND_FAILED, but do not + // duplicate SESSION_RETRY_LIMIT_REACHED unless the count crosses the threshold. + +``` + +Detaching `ssn_id` for a failed `Binding` executor is important. Existing scheduler accounting counts executors with `ssn_id` as allocated to the session, so leaving the session attached until unbind completion can delay the next bind attempt. + +#### Scheduler Readiness + +```text +function select_schedulable_sessions(snapshot): + retry_limit = config.recovery.session.retry_limits + return snapshot.find_sessions( + SessionFilter::by_state(Open).with_predicate( + |session| session.is_ready(retry_limit) + ) + ) +``` + +The scheduler should skip not-ready sessions during normal session candidate selection by applying readiness through `SessionFilter`, rather than fetching open sessions and filtering them in a separate pass. No separate `Statement` guard is required: skipped sessions never produce allocation, pipeline, or bind operations in that scheduling cycle. This readiness check is admission-only; it does not preempt, unbind, or otherwise modify executors already assigned or bound to the session. + +#### Retry After Failure + +When the session's transient `retry_count` is below `recovery.session.retry_limits`, `UnbindExecutorCompleted` returns the executor to `Idle` and the session remains open. The existing scheduler is responsible for any next bind attempt. This RFE does not add host avoidance state, so the next attempt may target the same host or a different host depending on ordinary scheduler state. + +When the session's transient `retry_count` reaches `recovery.session.retry_limits`, the session manager records `SESSION_RETRY_LIMIT_REACHED`, leaves the session open, and prevents future executor assignment or binding for that session. Existing assigned executors keep their current states and follow normal lifecycle rules. This uses the existing open session state instead of introducing `SessionState::Failed` or closing the session. + +Because scheduler decisions and executor-manager bind completions are asynchronous, several executors may already be binding to the same session when the session becomes not ready. Those in-flight bind attempts continue to completion. If they fail, session-manager still records `SESSION_BIND_FAILED` and increments `retry_count`, so `retry_count` can exceed `recovery.session.retry_limits`. The retry-limit event should be recorded when the count first crosses the threshold, not for every later in-flight failure. + +### System Considerations + +**Performance:** + +Bind failure handling adds one event write on each reported failed bind completion, plus one retry-limit event when the transient `retry_count` reaches the limit. The scheduler adds only a simple per-session readiness check while selecting schedulable sessions. + +**Scalability:** + +The number of stored events grows with reported bind failures. This matches existing task event behavior and can use the same retention or cleanup policy. + +**Reliability:** + +The failure path is idempotent at the executor state level. Repeated reports may create repeated session events, but they should still leave the executor unbound and schedulable. Event recording failure should not leave the executor permanently bound; log the error and continue unbinding where possible. + +**Resource Usage:** + +No additional scheduler-owned state is introduced. The only new state is `retry_count` on the in-memory session object, and it is not persisted. + +**Security:** + +Failure messages come from service code. Sanitize control characters and truncate them to an internal bounded size before storing or returning them to clients. + +**Observability:** + +Add logs and metrics: + +- `flame_session_retries_total{reason="bind_failure",result_code,node,application}` + +Logs should include session ID, executor ID, node, result code, session retry count, retry limit, and error message. + +**Operational:** + +Operators can inspect session events through `flmctl view --session ` and JSON output. There are no bind-failure host records to inspect or clear. + +### Dependencies + +- Existing backend `BindExecutor` and `BindExecutorCompleted` RPCs. +- Existing backend `UnbindExecutorCompleted` RPC for local cleanup after failed enter. +- Existing backend `UnbindExecutor` RPC for normal unbind behavior. +- Existing session/task event storage. +- Existing executor state machine. + +## 4. Use Cases + +**Example 1: Session Enter Fails** + +1. Session `s1` is scheduled to executor `e-a` on node `host-a`. +2. Application install succeeds, but one `OnSessionEnter` call fails because a local dependency is missing. +3. Executor-manager calls `BindExecutorCompleted(e-a, result={ return_code: BIND_RESULT_ON_SESSION_ENTER_FAILED, message: "service_return_code=7: missing dependency" })`. +4. Session-manager increments `s1.retry_count` and records `SESSION_BIND_FAILED`. +5. Session-manager detaches the failed binding and returns the executor through normal unbind cleanup. +6. A later scheduler pass retries the still-open session according to existing scheduling rules. + +Expected outcome: users see the bind failure event, and the session can be scheduled again. + +**Example 2: Application Installation Fails** + +1. Session `s2` is scheduled to executor `e-b`. +2. Executor-manager cannot install the application package. +3. Executor-manager calls `BindExecutorCompleted(e-b, result={ return_code: BIND_RESULT_APPLICATION_INSTALL_FAILED, message: "installer exited with status 127" })`. +4. Session-manager records `SESSION_BIND_FAILED` with the installation failure code and increments `s2.retry_count`. +5. Executor `e-b` moves from `Binding` to `Unbinding` for cleanup. + +Expected outcome: users and metrics can distinguish an installation failure from an `OnSessionEnter` failure using `Result.return_code`. + +**Example 3: Repeated Enter Failures** + +1. Session `s3` fails to enter on `host-a`. +2. Session-manager records `SESSION_BIND_FAILED`. +3. The executor is unbound and becomes eligible again. +4. The scheduler retries using existing policies. It may select the same host again because host avoidance is out of scope. +5. When `s3.retry_count` reaches `recovery.session.retry_limits`, session-manager records `SESSION_RETRY_LIMIT_REACHED`. +6. Later scheduler passes do not assign, pipeline, or bind a new executor to `s3` while the retry count remains at or above the limit. +7. Any other executor that was already assigned or bound to `s3` keeps running through its existing lifecycle. + +Expected outcome: users get repeated session events that expose the failure, Flame stops assigning executors after the configured limit, and the session remains open with pending tasks unchanged. + +**Example 4: Concurrent Bind Failures Exceed Limit** + +1. Session `s4` has three executors already in `Binding`. +2. `recovery.session.retry_limits = 2`. +3. The first two bind completions fail. Session-manager records two `SESSION_BIND_FAILED` events, increments `s4.retry_count` to `2`, and records `SESSION_RETRY_LIMIT_REACHED`. +4. The third in-flight bind completion fails later. Session-manager records another `SESSION_BIND_FAILED` and increments `s4.retry_count` to `3`, but does not record a duplicate retry-limit event. +5. Later scheduler passes skip `s4` because `s4.is_ready(2)` is false. + +Expected outcome: `retry_count` can exceed `retry_limits`; the limit stops only new scheduling, not in-flight bind completion handling. + +**Example 5: Normal Unbind** + +1. Scheduler preempts a bound executor or a session closes. +2. Executor-manager calls `UnbindExecutor`. +3. Session-manager follows existing unbind behavior. + +Expected outcome: no bind failure event. + +## 5. Verification Plan + +**Unit Tests:** + +- `BindExecutorCompletedRequest` with failed `result` records a session event owned by `task_id = 0`. +- `BindExecutorCompletedRequest` with omitted or successful `result` preserves the current success path. +- Executor-manager uses `BIND_RESULT_APPLICATION_INSTALL_FAILED` for application installation failures. +- Executor-manager uses `BIND_RESULT_SHIM_CREATE_FAILED` for shim or service instance creation failures. +- Executor-manager uses `BIND_RESULT_ON_SESSION_ENTER_FAILED` for `OnSessionEnter` failures and keeps the service return code in `Result.message`. +- Executor-manager calls `OnSessionEnter` once per bind attempt and reports failure through `BindExecutorCompletedRequest.result` instead of retrying locally. +- A failed bind result returns `INVALID_STATE` if the executor no longer has an assigned `ssn_id`. +- A failed bind result returns `INVALID_STATE` if the executor is not in `Binding`, so a stale failure cannot move `Bound` to `Unbinding`. +- Binding-state failure detach clears `ssn_id` and `task_id` while moving the executor to `Unbinding`. +- Successful bind completion resets the session object's transient `retry_count`. +- `Session::is_ready(retry_limits)` and `SessionInfo::is_ready(retry_limits)` return true only when `retry_count < retry_limits`. +- Scheduler session selection uses `is_ready` rather than open-coding retry-count comparisons. +- Reaching `recovery.session.retry_limits` records `SESSION_RETRY_LIMIT_REACHED`, leaves the session open, and prevents new executor assignment or binding. +- `retry_count` is not capped at `recovery.session.retry_limits`; in-flight bind failures can increment it above the limit. +- `SESSION_RETRY_LIMIT_REACHED` is recorded when `retry_count` first crosses the limit, not for every later failure above the limit. +- Reaching `recovery.session.retry_limits` does not preempt, unbind, or change executors already assigned to the session. +- The retry count is held on the in-memory session object and is not persisted or exposed through public session RPCs. +- `Storage::get_session` and `Storage::list_session` populate session-level events. + +**Integration Tests:** + +- Force `OnSessionEnter` to fail. Verify: + - session events include `SESSION_BIND_FAILED`; + - the event includes `BIND_RESULT_ON_SESSION_ENTER_FAILED`; + - the executor returns through unbind cleanup; + - the task remains pending. +- Force application installation to fail and verify the bind completion uses `BIND_RESULT_APPLICATION_INSTALL_FAILED`. +- Configure `recovery.session.retry_limits = 2` and verify session-manager records the retry-limit event after two failed bind completions, keeps the session open, does not assign or bind another executor to that session, and leaves any already assigned executor unchanged. +- With `recovery.session.retry_limits = 2`, fail three already-binding executors for the same session and verify `retry_count == 3`, all three `SESSION_BIND_FAILED` events are recorded, and only one `SESSION_RETRY_LIMIT_REACHED` event is recorded. + +**Manual Smoke Test:** + +1. Start a local cluster. +2. Register a test application whose `OnSessionEnter` fails. +3. Create a session and a task. +4. Run `flmctl view --session ` and confirm the bind failure event is visible. +5. Confirm the task remains pending and the executor is no longer stuck in `Binding`. + +## 6. References + +**Issue:** + +- https://github.com/xflops/flame/issues/25 + +**Related Documents:** + +- `docs/designs/RFE384-flame-recovery/FS.md` +- `docs/designs/RFE400-batch-session/FS.md` + +**Implementation References:** + +- `session_manager/src/scheduler/actions/dispatch.rs` +- `session_manager/src/scheduler/actions/allocate.rs` +- `session_manager/src/scheduler/statement.rs` +- `session_manager/src/controller/mod.rs` +- `session_manager/src/controller/executors/binding.rs` +- `session_manager/src/apiserver/backend.rs` +- `session_manager/src/storage/mod.rs` +- `session_manager/src/events/mod.rs` +- `executor_manager/src/states/idle.rs` +- `executor_manager/src/states/unbinding.rs` +- `rpc/protos/backend.proto` +- `rpc/protos/types.proto` diff --git a/e2e/pyproject.toml b/e2e/pyproject.toml index a4abdef3..8846830f 100644 --- a/e2e/pyproject.toml +++ b/e2e/pyproject.toml @@ -8,6 +8,7 @@ authors = [ ] requires-python = ">=3.9" dependencies = [ + "cloudpickle>=3.1", "grpcio>=1.80", "grpcio-tools>=1.80", ] diff --git a/e2e/src/e2e/error_svc.py b/e2e/src/e2e/error_svc.py index ec83d1ff..e9a5fa69 100644 --- a/e2e/src/e2e/error_svc.py +++ b/e2e/src/e2e/error_svc.py @@ -47,5 +47,14 @@ def on_session_leave(self): self._session_context = None +class EnterFailureTestService(ErrorTestService): + """Service that raises an exception in on_session_enter for bind failure tests.""" + + def on_session_enter(self, context: flamepy.SessionContext): + """Handle session enter by raising an exception.""" + logger.info(f"Failing session enter: session_id={context.session_id}") + raise RuntimeError("intentional session enter failure") + + if __name__ == "__main__": flamepy.run(ErrorTestService()) diff --git a/e2e/tests/test_failure_recovery.py b/e2e/tests/test_failure_recovery.py index 1eab5a4d..1b9a7e36 100644 --- a/e2e/tests/test_failure_recovery.py +++ b/e2e/tests/test_failure_recovery.py @@ -20,6 +20,7 @@ - Recovery after failures """ +import time from concurrent.futures import ThreadPoolExecutor, as_completed import flamepy @@ -28,10 +29,14 @@ from e2e.api import TestRequest from e2e.helpers import FAIL_INPUT, invoke_task, serialize_request +from tests.utils import random_string FLM_TEST_SVC_APP = "flme2e-svc" FLM_ERROR_SVC_APP = "flme2e-error-svc" +FLM_ENTER_FAILURE_APP = "flme2e-enter-failure-svc" +ENTER_FAILURE_SERVICE_ENTRYPOINT = "import flamepy; from e2e.error_svc import EnterFailureTestService; flamepy.run(EnterFailureTestService())" TASK_FAILED_EVENT_CODE = int(TaskState.FAILED) +SESSION_BIND_FAILED_EVENT_CODE = 1001 def _wait_for_terminal_task(session, task_id): @@ -48,6 +53,21 @@ def _create_failing_basic_task(session): return task_update +def _wait_for_session_event(session_id, event_code, timeout_seconds=60): + deadline = time.monotonic() + timeout_seconds + last_events = [] + + while time.monotonic() < deadline: + session = flamepy.get_session(session_id) + last_events = session.events + matched_events = [event for event in last_events if event.code == event_code] + if matched_events: + return session, matched_events + time.sleep(0.5) + + pytest.fail(f"Session {session_id} did not record event {event_code}; last events: {[(e.code, e.message) for e in last_events]}") + + @pytest.fixture(scope="module", autouse=True) def setup_test_env(): """Setup test environment with test services.""" @@ -75,12 +95,26 @@ def setup_test_env(): ), ) + flamepy.register_application( + FLM_ENTER_FAILURE_APP, + flamepy.ApplicationAttributes( + command="python3", + working_directory="/opt/e2e", + environments={ + "FLAME_LOG_LEVEL": "DEBUG", + "PYTHONPATH": "/opt/e2e/src", + }, + arguments=["-c", ENTER_FAILURE_SERVICE_ENTRYPOINT], + installer="python", + ), + ) + yield # Clean up sessions owned by this module before unregistering. sessions = flamepy.list_sessions() for sess in sessions: - if sess.application not in {FLM_TEST_SVC_APP, FLM_ERROR_SVC_APP}: + if sess.application not in {FLM_TEST_SVC_APP, FLM_ERROR_SVC_APP, FLM_ENTER_FAILURE_APP}: continue try: flamepy.close_session(sess.id) @@ -89,6 +123,7 @@ def setup_test_env(): flamepy.unregister_application(FLM_TEST_SVC_APP) flamepy.unregister_application(FLM_ERROR_SVC_APP) + flamepy.unregister_application(FLM_ENTER_FAILURE_APP) # ============================================================================= @@ -163,6 +198,41 @@ def test_session_continues_after_task_failure(self): session.close() +# ============================================================================= +# Session Bind Failure Tests +# ============================================================================= + + +class TestSessionBindFailureRecovery: + """Tests for session bind failure recovery.""" + + def test_on_session_enter_failure_records_session_event(self): + """Test that on_session_enter failure is reported as a session event.""" + session_id = f"test-enter-failure-{random_string(8)}" + session = flamepy.create_session( + application=FLM_ENTER_FAILURE_APP, + session_id=session_id, + common_data=None, + max_instances=1, + ) + + try: + session.create_task(b"trigger session bind") + + current_session, bind_failed_events = _wait_for_session_event( + session_id, + SESSION_BIND_FAILED_EVENT_CODE, + ) + + assert current_session.state == flamepy.SessionState.OPEN + event_messages = [event.message or "" for event in bind_failed_events] + assert any("failed to bind session" in message for message in event_messages) + assert any("intentional session enter failure" in message for message in event_messages) + + finally: + flamepy.close_session(session_id) + + # ============================================================================= # Partial Failure Tests # ============================================================================= diff --git a/executor_manager/src/appmgr/downloader.rs b/executor_manager/src/appmgr/downloader.rs index 11dec48c..89815e30 100644 --- a/executor_manager/src/appmgr/downloader.rs +++ b/executor_manager/src/appmgr/downloader.rs @@ -356,6 +356,7 @@ mod tests { let mut src_file = tokio::fs::File::create(&src_path).await.unwrap(); src_file.write_all(b"test content").await.unwrap(); + src_file.sync_all().await.unwrap(); drop(src_file); let registry = DownloaderRegistry::new(); diff --git a/executor_manager/src/client.rs b/executor_manager/src/client.rs index 27bdaf00..a50a6701 100644 --- a/executor_manager/src/client.rs +++ b/executor_manager/src/client.rs @@ -28,7 +28,8 @@ use ::rpc::flame::v1::{ use crate::executor::Executor; use common::apis::{ - Application, Node, ResourceRequirement, Session, SessionContext, Shim, TaskContext, TaskResult, + Application, FlameResult, Node, ResourceRequirement, Session, SessionContext, Shim, + TaskContext, TaskResult, }; use common::ctx::FlameClusterContext; use common::net::host_for_uri; @@ -222,9 +223,14 @@ impl BackendClient { } } - pub async fn bind_executor_completed(&mut self, exe: &Executor) -> Result<(), FlameError> { + pub async fn bind_executor_completed( + &mut self, + exe: &Executor, + result: Option, + ) -> Result<(), FlameError> { let req = BindExecutorCompletedRequest { executor_id: exe.id.clone(), + result: result.map(rpc::Result::from), }; self.client diff --git a/executor_manager/src/executor.rs b/executor_manager/src/executor.rs index 38a83813..9bb37c5c 100644 --- a/executor_manager/src/executor.rs +++ b/executor_manager/src/executor.rs @@ -169,7 +169,27 @@ pub fn start(client: BackendClient, executor: ExecutorPtr, app_manager: Arc { - tracing::error!("Failed to execute: {e}"); + let session_id = exec + .session + .as_ref() + .map(|session| session.session_id.as_str()); + let application = exec + .session + .as_ref() + .map(|session| session.application.name.as_str()); + let task_id = exec.task.as_ref().map(|task| task.task_id.as_str()); + let task_session_id = exec.task.as_ref().map(|task| task.session_id.as_str()); + tracing::error!( + executor_id = %exec.id, + node = %exec.node, + state = %exec.state, + session_id = ?session_id, + application = ?application, + task_session_id = ?task_session_id, + task_id = ?task_id, + error = %e, + "Failed to execute executor state" + ); } } } diff --git a/executor_manager/src/states/bound.rs b/executor_manager/src/states/bound.rs index 1e0594b2..d3aa91bc 100644 --- a/executor_manager/src/states/bound.rs +++ b/executor_manager/src/states/bound.rs @@ -17,7 +17,7 @@ use stdng::{logs::TraceFn, trace_fn}; use crate::client::BackendClient; use crate::executor::Executor; use crate::states::State; -use common::apis::ExecutorState; +use common::apis::{ExecutorState, TaskResult, TaskState}; use common::FlameError; #[derive(Clone)] @@ -46,7 +46,23 @@ impl State for BoundState { ))?; let task_result = { let mut shim = shim_ptr.lock().await; - shim.on_task_invoke(&task_ctx).await? + match shim.on_task_invoke(&task_ctx).await { + Ok(task_result) => task_result, + Err(e) => { + tracing::error!( + "Task <{}/{}> failed during shim invocation on executor <{}>: {}", + task_ctx.session_id, + task_ctx.task_id, + self.executor.id, + e + ); + TaskResult { + state: TaskState::Failed, + output: None, + message: Some(e.to_string()), + } + } + } }; self.client diff --git a/executor_manager/src/states/idle.rs b/executor_manager/src/states/idle.rs index 7a7d82f5..6d859648 100644 --- a/executor_manager/src/states/idle.rs +++ b/executor_manager/src/states/idle.rs @@ -21,12 +21,12 @@ use crate::client::BackendClient; use crate::executor::Executor; use crate::shims; use crate::states::State; -use common::apis::ExecutorState; +use common::apis::{ + ExecutorState, FlameResult, BIND_RESULT_APPLICATION_INSTALL_FAILED, BIND_RESULT_OK, + BIND_RESULT_ON_SESSION_ENTER_FAILED, BIND_RESULT_SHIM_CREATE_FAILED, +}; use common::FlameError; -const ON_SESSION_ENTER_MAX_RETRIES: u32 = 5; -const ON_SESSION_ENTER_RETRY_DELAY_SECS: u64 = 5; - pub struct IdleState { pub client: BackendClient, pub executor: Executor, @@ -88,54 +88,57 @@ impl State for IdleState { &ssn.session_id.clone() ); - let env_vars = self.app_manager.install(&ssn.application).await?; - - let shim_ptr = shims::new(&self.executor.clone(), &ssn.application, &env_vars).await?; - - let mut last_error: Option = None; - for attempt in 1..=ON_SESSION_ENTER_MAX_RETRIES { - let mut shim = shim_ptr.lock().await; - match shim.on_session_enter(&ssn).await { - Ok(()) => { - tracing::debug!("Shim on_session_enter completed on attempt {}.", attempt); - last_error = None; - break; - } - Err(e) => { - tracing::warn!( - "on_session_enter failed on attempt {}/{}: {}", - attempt, - ON_SESSION_ENTER_MAX_RETRIES, - e - ); - last_error = Some(e); - - if attempt < ON_SESSION_ENTER_MAX_RETRIES { - let delay = (attempt * attempt) as u64 * ON_SESSION_ENTER_RETRY_DELAY_SECS; - tracing::debug!("Retrying in {} seconds...", delay); - tokio::time::sleep(std::time::Duration::from_secs(delay)).await; - } - } + let env_vars = match self.app_manager.install(&ssn.application).await { + Ok(env_vars) => env_vars, + Err(e) => { + self.bind_executor_failed( + BIND_RESULT_APPLICATION_INSTALL_FAILED, + format!("application installation failed: {e}"), + &ssn, + None, + ) + .await?; + return Ok(self.executor.clone()); } - } - - if let Some(e) = last_error { - tracing::warn!( - "Executor <{}> failed to enter session <{}>: {}, transitioning to unbinding state", - &self.executor.id, - &ssn.session_id, - e - ); + }; - self.executor.session = Some(ssn.clone()); - self.executor.shim_instance = Some(shim_ptr.clone()); - self.executor.state = ExecutorState::Unbinding; + let shim_ptr = match shims::new(&self.executor.clone(), &ssn.application, &env_vars).await { + Ok(shim_ptr) => shim_ptr, + Err(e) => { + self.bind_executor_failed( + BIND_RESULT_SHIM_CREATE_FAILED, + format!("shim creation failed: {e}"), + &ssn, + None, + ) + .await?; + return Ok(self.executor.clone()); + } + }; + let enter_result = { + let mut shim = shim_ptr.lock().await; + shim.on_session_enter(&ssn).await + }; + if let Err(e) = enter_result { + self.bind_executor_failed( + BIND_RESULT_ON_SESSION_ENTER_FAILED, + format!("on_session_enter failed: {e}"), + &ssn, + Some(shim_ptr.clone()), + ) + .await?; return Ok(self.executor.clone()); } self.client - .bind_executor_completed(&self.executor.clone()) + .bind_executor_completed( + &self.executor.clone(), + Some(FlameResult { + return_code: BIND_RESULT_OK, + message: None, + }), + ) .await?; self.executor.shim_instance = Some(shim_ptr.clone()); @@ -151,3 +154,33 @@ impl State for IdleState { Ok(self.executor.clone()) } } + +impl IdleState { + async fn bind_executor_failed( + &mut self, + return_code: i32, + message: String, + ssn: &common::apis::SessionContext, + shim_ptr: Option, + ) -> Result<(), FlameError> { + tracing::warn!( + "Executor <{}> failed to bind session <{}>: {}", + self.executor.id, + ssn.session_id, + message + ); + self.client + .bind_executor_completed( + &self.executor.clone(), + Some(FlameResult { + return_code, + message: Some(message), + }), + ) + .await?; + self.executor.session = Some(ssn.clone()); + self.executor.shim_instance = shim_ptr; + self.executor.state = ExecutorState::Unbinding; + Ok(()) + } +} diff --git a/executor_manager/src/states/unbinding.rs b/executor_manager/src/states/unbinding.rs index ffe0ad82..9c40c677 100644 --- a/executor_manager/src/states/unbinding.rs +++ b/executor_manager/src/states/unbinding.rs @@ -32,17 +32,14 @@ impl State for UnbindingState { trace_fn!("UnbindingState::execute"); self.client.unbind_executor(&self.executor.clone()).await?; - let shim_ptr = &mut self - .executor - .shim_instance - .clone() - .ok_or(FlameError::InvalidState( - "no shim instance in unbinding state".to_string(), - ))?; - - { + if let Some(shim_ptr) = self.executor.shim_instance.clone() { let mut shim = shim_ptr.lock().await; shim.on_session_leave().await?; + } else { + tracing::debug!( + "Executor <{}> has no shim instance during unbinding; skip on_session_leave", + self.executor.id + ); } self.client diff --git a/flmctl/src/view.rs b/flmctl/src/view.rs index 27013aed..c5451a1c 100644 --- a/flmctl/src/view.rs +++ b/flmctl/src/view.rs @@ -60,15 +60,7 @@ async fn view_task( println!("{:<15}{}", "Application:", session.application); println!("{:<15}{}", "State:", task.state); println!("{:<15}", "Events:"); - - for event in task.events { - println!( - " {}: {} ({})", - event.creation_time.format("%H:%M:%S%.3f"), - event.message.unwrap_or_default(), - event.code - ); - } + print!("{}", format_events(&task.events)); Ok(()) } @@ -121,9 +113,27 @@ fn view_session_table(session: &client::Session) -> Result<(), Box> { ]); println!("{table}"); + + println!("{:<15}", "Events:"); + print!("{}", format_events(&session.events)); + Ok(()) } +fn format_events(events: &[client::Event]) -> String { + events + .iter() + .map(|event| { + format!( + " {}: {} ({})\n", + event.creation_time.format("%H:%M:%S%.3f"), + event.message.clone().unwrap_or_default(), + event.code + ) + }) + .collect() +} + fn view_session_json(session: &client::Session) -> Result<(), Box> { let json = serde_json::to_string_pretty(session).unwrap(); println!("{json}"); @@ -247,3 +257,25 @@ fn get_type(schema: Option) -> Result { None => Ok("-".to_string()), } } + +#[cfg(test)] +mod tests { + use super::*; + use chrono::TimeZone; + use flame_rs::client::Event; + + #[test] + fn format_events_includes_session_event_details() { + let event = Event { + code: 1001, + message: Some("bind failed".to_string()), + creation_time: chrono::Utc.with_ymd_and_hms(2026, 5, 8, 10, 1, 2).unwrap(), + }; + + let formatted = format_events(&[event]); + + assert!(formatted.contains("10:01:02.000")); + assert!(formatted.contains("bind failed")); + assert!(formatted.contains("(1001)")); + } +} diff --git a/rpc/protos/backend.proto b/rpc/protos/backend.proto index f5b95fae..e38bbc9b 100644 --- a/rpc/protos/backend.proto +++ b/rpc/protos/backend.proto @@ -54,6 +54,7 @@ message BindExecutorResponse { message BindExecutorCompletedRequest { string executor_id = 1; + optional Result result = 2; } message UnbindExecutorRequest { diff --git a/sdk/python/src/flamepy/core/client.py b/sdk/python/src/flamepy/core/client.py index 43234658..daf3426d 100644 --- a/sdk/python/src/flamepy/core/client.py +++ b/sdk/python/src/flamepy/core/client.py @@ -79,6 +79,17 @@ def _schema_from_application_spec(spec: ApplicationSpec) -> Optional[Application ) +def _events_from_proto(events) -> List[Event]: + return [ + Event( + code=event.code, + message=event.message, + creation_time=datetime.fromtimestamp(event.creation_time / 1000, tz=timezone.utc), + ) + for event in events + ] + + def _application_from_proto(app) -> Application: spec = app.spec environments = {env.name: env.value for env in spec.environments} @@ -453,6 +464,7 @@ def create_session(self, attrs: SessionAttributes) -> "Session": failed=response.status.failed, completion_time=(datetime.fromtimestamp(response.status.completion_time / 1000, tz=timezone.utc) if response.status.HasField("completion_time") else None), common_data=common_data_bytes, + events=_events_from_proto(response.status.events), ) return session except grpc.RpcError as e: @@ -483,6 +495,7 @@ def list_sessions(self) -> List["Session"]: failed=session.status.failed, completion_time=(datetime.fromtimestamp(session.status.completion_time / 1000, tz=timezone.utc) if session.status.HasField("completion_time") else None), common_data=common_data_bytes, + events=_events_from_proto(session.status.events), ) ) @@ -539,6 +552,7 @@ def open_session(self, session_id: SessionID, spec: Optional[SessionAttributes] failed=response.status.failed, completion_time=(datetime.fromtimestamp(response.status.completion_time / 1000, tz=timezone.utc) if response.status.HasField("completion_time") else None), common_data=common_data_bytes, + events=_events_from_proto(response.status.events), ) except grpc.RpcError as e: @@ -566,6 +580,7 @@ def get_session(self, session_id: SessionID) -> "Session": failed=response.status.failed, completion_time=(datetime.fromtimestamp(response.status.completion_time / 1000, tz=timezone.utc) if response.status.HasField("completion_time") else None), common_data=common_data_bytes, + events=_events_from_proto(response.status.events), ) except grpc.RpcError as e: @@ -593,6 +608,7 @@ def close_session(self, session_id: SessionID) -> "Session": failed=response.status.failed, completion_time=(datetime.fromtimestamp(response.status.completion_time / 1000, tz=timezone.utc) if response.status.HasField("completion_time") else None), common_data=common_data_bytes, + events=_events_from_proto(response.status.events), ) except grpc.RpcError as e: @@ -611,6 +627,7 @@ class Session: succeed: int = 0 failed: int = 0 completion_time: Optional[datetime] = None + events: Optional[List[Event]] = None _common_data: Optional[bytes] = None """Client for session-specific operations.""" @@ -627,6 +644,7 @@ def __init__( failed: int, completion_time: Optional[datetime], common_data: Optional[bytes] = None, + events: Optional[List[Event]] = None, ): self.connection = connection self.id = id @@ -640,6 +658,7 @@ def __init__( self.completion_time = completion_time self.mutex = threading.Lock() self._common_data = common_data + self.events = events or [] def common_data(self) -> Optional[bytes]: """Get the common data of Session as bytes.""" @@ -669,14 +688,7 @@ def create_task(self, input_data: bytes) -> Task: creation_time=datetime.fromtimestamp(response.status.creation_time / 1000, tz=timezone.utc), input=input_data, completion_time=(datetime.fromtimestamp(response.status.completion_time / 1000, tz=timezone.utc) if response.status.HasField("completion_time") else None), - events=[ - Event( - code=event.code, - message=event.message, - creation_time=datetime.fromtimestamp(event.creation_time / 1000, tz=timezone.utc), - ) - for event in response.status.events - ], + events=_events_from_proto(response.status.events), ) except grpc.RpcError as e: @@ -697,14 +709,7 @@ def get_task(self, task_id: TaskID) -> Task: input=response.spec.input if response.spec.HasField("input") else None, output=response.spec.output if response.spec.HasField("output") else None, completion_time=(datetime.fromtimestamp(response.status.completion_time / 1000, tz=timezone.utc) if response.status.HasField("completion_time") else None), - events=[ - Event( - code=event.code, - message=event.message, - creation_time=datetime.fromtimestamp(event.creation_time / 1000, tz=timezone.utc), - ) - for event in response.status.events - ], + events=_events_from_proto(response.status.events), ) except grpc.RpcError as e: @@ -823,14 +828,7 @@ def _task_from_proto(response, session_id: str) -> Task: input=response.spec.input if response.spec.HasField("input") else None, output=response.spec.output if response.spec.HasField("output") else None, completion_time=(datetime.fromtimestamp(response.status.completion_time / 1000, tz=timezone.utc) if response.status.HasField("completion_time") else None), - events=[ - Event( - code=event.code, - message=event.message, - creation_time=datetime.fromtimestamp(event.creation_time / 1000, tz=timezone.utc), - ) - for event in response.status.events - ], + events=_events_from_proto(response.status.events), ) diff --git a/sdk/python/tests/test_core.py b/sdk/python/tests/test_core.py index 66836913..75b93f65 100644 --- a/sdk/python/tests/test_core.py +++ b/sdk/python/tests/test_core.py @@ -195,6 +195,41 @@ def test_session_common_data_returns_bytes(self): ) assert session.common_data() == b"test-data" + def test_get_session_preserves_events(self): + from flamepy.proto.types_pb2 import Event as EventProto + from flamepy.proto.types_pb2 import Metadata, SessionSpec, SessionStatus + from flamepy.proto.types_pb2 import Session as SessionProto + + event_time = int(time.time() * 1000) + + class DummyFrontendWithSession: + def GetSession(self, req): # noqa: N802 + return SessionProto( + metadata=Metadata(id=req.session_id), + spec=SessionSpec(application="test-app"), + status=SessionStatus( + state=SessionState.OPEN, + creation_time=event_time, + events=[ + EventProto( + code=1001, + message="failed to bind session", + creation_time=event_time, + ) + ], + ), + ) + + connection = client.Connection("http://localhost:1234", DummyChannel("http://localhost:1234"), DummyFrontendWithSession()) + try: + session = connection.get_session("sess-events") + finally: + connection.close() + + assert len(session.events) == 1 + assert session.events[0].code == 1001 + assert session.events[0].message == "failed to bind session" + def test_session_get_task_preserves_empty_optional_bytes(self): from flamepy.core.client import Session, SessionState from flamepy.proto.types_pb2 import Metadata, Task, TaskSpec, TaskStatus diff --git a/sdk/rust/src/client/mod.rs b/sdk/rust/src/client/mod.rs index 3f0e19b0..487b3b91 100644 --- a/sdk/rust/src/client/mod.rs +++ b/sdk/rust/src/client/mod.rs @@ -1678,6 +1678,47 @@ mod tests { assert_eq!(parsed.creation_time, expected); } + #[test] + fn session_try_from_extracts_events() { + let when = Utc.with_ymd_and_hms(2026, 5, 8, 10, 0, 0).unwrap(); + let rpc_session = rpc::Session { + metadata: Some(rpc::Metadata { + id: "ssn-1".to_string(), + name: String::new(), + }), + spec: Some(rpc::SessionSpec { + application: "app".to_string(), + common_data: None, + min_instances: 0, + max_instances: None, + batch_size: 1, + priority: 0, + resreq: None, + }), + status: Some(rpc::SessionStatus { + state: rpc::SessionState::Open as i32, + creation_time: when.timestamp_millis(), + completion_time: None, + pending: 0, + running: 0, + succeed: 0, + failed: 0, + cancelled: 0, + events: vec![rpc::Event { + code: 1001, + message: Some("bind failed".to_string()), + creation_time: when.timestamp_millis(), + }], + }), + }; + + let parsed = Session::try_from(&rpc_session).expect("Session::try_from should succeed"); + + assert_eq!(parsed.events.len(), 1); + assert_eq!(parsed.events[0].code, 1001); + assert_eq!(parsed.events[0].message.as_deref(), Some("bind failed")); + } + /// Verifies that `From for ResourceRequirement` /// preserves all fields when converting from the wire type into the SDK type. #[test] diff --git a/session_manager/src/apiserver/backend.rs b/session_manager/src/apiserver/backend.rs index bafc1020..8c496cab 100644 --- a/session_manager/src/apiserver/backend.rs +++ b/session_manager/src/apiserver/backend.rs @@ -31,7 +31,7 @@ use ::rpc::flame::v1 as rpc; use crate::apiserver::Flame; use crate::controller::ControllerPtr; use crate::model::Executor; -use common::apis::{ExecutorState, Node, Shim, TaskResult}; +use common::apis::{ExecutorState, FlameResult, Node, Shim, TaskResult}; use common::FlameError; /// Timeout for heartbeat in seconds. If no heartbeat is received within this @@ -403,7 +403,7 @@ impl Backend for Flame { let req = req.into_inner(); self.controller - .bind_session_completed(req.executor_id) + .bind_executor_completed(req.executor_id, req.result.map(FlameResult::from)) .await?; Ok(Response::new(rpc::Result::default())) diff --git a/session_manager/src/controller/executors/binding.rs b/session_manager/src/controller/executors/binding.rs index dfa9860d..54562b6c 100644 --- a/session_manager/src/controller/executors/binding.rs +++ b/session_manager/src/controller/executors/binding.rs @@ -11,12 +11,16 @@ See the License for the specific language governing permissions and limitations under the License. */ +use chrono::Utc; use stdng::{lock_ptr, logs::TraceFn, trace_fn, MutexPtr}; use crate::controller::executors::States; use crate::model::ExecutorPtr; use crate::storage::StoragePtr; -use common::apis::{ExecutorState, SessionPtr, Task, TaskPtr, TaskResult}; +use common::apis::{ + Event, EventOwner, ExecutorState, FlameResult, SessionPtr, Task, TaskPtr, TaskResult, + BIND_RESULT_OK, SESSION_BIND_FAILED, SESSION_RETRY_LIMIT_REACHED, +}; use common::FlameError; pub struct BindingState { @@ -24,6 +28,116 @@ pub struct BindingState { pub executor: ExecutorPtr, } +impl BindingState { + async fn bind_session_success(&self) -> Result<(), FlameError> { + let ssn_id = { + let e = lock_ptr!(self.executor)?; + e.ssn_id.clone().ok_or_else(|| { + FlameError::InvalidState(format!( + "Executor <{}> has no bound session on successful bind completion", + e.id + )) + })? + }; + self.storage.get_session_ptr(ssn_id)?; + + let mut e = lock_ptr!(self.executor)?; + e.state = ExecutorState::Bound; + + Ok(()) + } + + async fn bind_session_failed(&self, result: &FlameResult) -> Result<(), FlameError> { + let executor = { lock_ptr!(self.executor)?.clone() }; + + self.record_bind_failure(&executor, result).await?; + self.increment_session_retry_count(&executor).await?; + + let mut e = lock_ptr!(self.executor)?; + e.state = ExecutorState::Unbinding; + e.ssn_id = None; + e.task_id = None; + + Ok(()) + } + + async fn increment_session_retry_count( + &self, + executor: &crate::model::Executor, + ) -> Result<(), FlameError> { + let ssn_id = executor.ssn_id.clone().ok_or_else(|| { + FlameError::InvalidState(format!("Executor <{}> has no bound session", executor.id)) + })?; + let retry_limit = self.storage.session_retry_limits(); + let ssn_ptr = self.storage.get_session_ptr(ssn_id.clone())?; + let (retry_count, crossed_retry_limit) = { + let mut ssn = lock_ptr!(ssn_ptr)?; + let previous_retry_count = ssn.retry_count; + ssn.retry_count = ssn.retry_count.saturating_add(1); + ( + ssn.retry_count, + previous_retry_count < retry_limit && ssn.retry_count >= retry_limit, + ) + }; + + if crossed_retry_limit { + self.record_retry_limit_reached(&ssn_id, retry_count, retry_limit) + .await?; + } + + Ok(()) + } + + async fn record_bind_failure( + &self, + executor: &crate::model::Executor, + result: &FlameResult, + ) -> Result<(), FlameError> { + let ssn_id = executor.ssn_id.clone().ok_or_else(|| { + FlameError::InvalidState(format!("Executor <{}> has no bound session", executor.id)) + })?; + let detail = result.message.clone().unwrap_or_default(); + + self.storage + .record_event( + EventOwner::session(ssn_id), + Event { + code: SESSION_BIND_FAILED, + message: Some(format!( + "Executor <{}> on node <{}> failed to bind session with return_code <{}>: {}", + executor.id, + executor.node, + result.return_code, + detail + )), + creation_time: Utc::now(), + }, + ) + .await + } + + async fn record_retry_limit_reached( + &self, + ssn_id: &str, + retry_count: u32, + retry_limit: u32, + ) -> Result<(), FlameError> { + self.storage + .record_event( + EventOwner::session(ssn_id.to_string()), + Event { + code: SESSION_RETRY_LIMIT_REACHED, + message: Some(format!( + "Session retry limit reached after {}/{} failed attempts; new executor assignment is paused for this session; existing executors are unchanged.", + retry_count, retry_limit + )), + creation_time: Utc::now(), + }, + ) + .await + } +} + #[async_trait::async_trait] impl States for BindingState { async fn register_executor(&self) -> Result<(), FlameError> { @@ -41,7 +155,16 @@ impl States for BindingState { async fn unregister_executor(&self) -> Result<(), FlameError> { trace_fn!("BindingState::unregister_executor"); - Err(FlameError::InvalidState("Executor is binding".to_string())) + let mut e = lock_ptr!(self.executor)?; + tracing::debug!( + "Executor <{}> unregistering from binding state, moving to released", + e.id + ); + e.state = ExecutorState::Released; + e.ssn_id = None; + e.task_id = None; + + Ok(()) } async fn bind_session(&self, ssn_ptr: SessionPtr) -> Result<(), FlameError> { @@ -59,11 +182,15 @@ impl States for BindingState { Ok(()) } - async fn bind_session_completed(&self) -> Result<(), FlameError> { - trace_fn!("BindingState::bind_session"); + async fn bind_session_completed(&self, result: Option) -> Result<(), FlameError> { + trace_fn!("BindingState::bind_session_completed"); - let mut e = lock_ptr!(self.executor)?; - e.state = ExecutorState::Bound; + match result.as_ref() { + Some(result) if result.return_code != BIND_RESULT_OK => { + self.bind_session_failed(result).await? + } + _ => self.bind_session_success().await?, + } Ok(()) } diff --git a/session_manager/src/controller/executors/bound.rs b/session_manager/src/controller/executors/bound.rs index ee716825..a6620784 100644 --- a/session_manager/src/controller/executors/bound.rs +++ b/session_manager/src/controller/executors/bound.rs @@ -14,7 +14,7 @@ limitations under the License. use stdng::{lock_ptr, logs::TraceFn, trace_fn}; use crate::model::ExecutorPtr; -use common::apis::{ExecutorState, SessionPtr, Task, TaskPtr, TaskResult, TaskState}; +use common::apis::{ExecutorState, FlameResult, SessionPtr, Task, TaskPtr, TaskResult, TaskState}; use common::FlameError; use crate::controller::executors::States; @@ -51,7 +51,7 @@ impl States for BoundState { Err(FlameError::InvalidState("Executor is bound".to_string())) } - async fn bind_session_completed(&self) -> Result<(), FlameError> { + async fn bind_session_completed(&self, _result: Option) -> Result<(), FlameError> { trace_fn!("BoundState::bind_session_completed"); Err(FlameError::InvalidState("Executor is bound".to_string())) diff --git a/session_manager/src/controller/executors/idle.rs b/session_manager/src/controller/executors/idle.rs index b84573c4..ba2ec713 100644 --- a/session_manager/src/controller/executors/idle.rs +++ b/session_manager/src/controller/executors/idle.rs @@ -17,7 +17,7 @@ use crate::controller::executors::States; use crate::model::ExecutorPtr; use crate::storage::StoragePtr; -use common::apis::{ExecutorState, SessionPtr, Task, TaskPtr, TaskResult}; +use common::apis::{ExecutorState, FlameResult, SessionPtr, Task, TaskPtr, TaskResult}; use common::FlameError; pub struct IdleState { @@ -63,7 +63,7 @@ impl States for IdleState { Ok(()) } - async fn bind_session_completed(&self) -> Result<(), FlameError> { + async fn bind_session_completed(&self, _result: Option) -> Result<(), FlameError> { trace_fn!("IdleState::bind_session_completed"); Err(FlameError::InvalidState("Executor is idle".to_string())) diff --git a/session_manager/src/controller/executors/mod.rs b/session_manager/src/controller/executors/mod.rs index 6a5c8982..a6caef00 100644 --- a/session_manager/src/controller/executors/mod.rs +++ b/session_manager/src/controller/executors/mod.rs @@ -20,7 +20,7 @@ use crate::controller::executors::{ use crate::storage::StoragePtr; use crate::model::ExecutorPtr; -use common::apis::{ExecutorState, SessionPtr, Task, TaskPtr, TaskResult}; +use common::apis::{ExecutorState, FlameResult, SessionPtr, Task, TaskPtr, TaskResult}; use common::FlameError; use stdng::{lock_ptr, new_ptr, MutexPtr}; @@ -71,7 +71,7 @@ pub trait States: Send + Sync + 'static { async fn unregister_executor(&self) -> Result<(), FlameError>; async fn bind_session(&self, ssn: SessionPtr) -> Result<(), FlameError>; - async fn bind_session_completed(&self) -> Result<(), FlameError>; + async fn bind_session_completed(&self, result: Option) -> Result<(), FlameError>; async fn unbind_executor(&self) -> Result<(), FlameError>; async fn unbind_executor_completed(&self) -> Result<(), FlameError>; @@ -91,7 +91,7 @@ mod tests { use super::*; use crate::model::Executor; use chrono::Utc; - use common::apis::{ResourceRequirement, Shim}; + use common::apis::{ApplicationAttributes, ResourceRequirement, SessionAttributes, Shim}; use common::ctx::{FlameCluster, FlameClusterContext, FlameExecutors, FlameLimits}; fn create_test_executor(id: &str, state: ExecutorState) -> ExecutorPtr { @@ -139,6 +139,7 @@ mod tests { max_sessions: None, max_executors: 10, }, + recovery: Default::default(), pprof: None, }, cache: None, @@ -147,6 +148,40 @@ mod tests { crate::storage::new_ptr(&ctx).await.unwrap() } + fn unique_test_id(prefix: &str) -> String { + format!( + "{}-{}-{:?}", + prefix, + chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0), + std::thread::current().id() + ) + } + + async fn create_stored_test_session(storage: &StoragePtr) -> (String, SessionPtr) { + let ssn_id = unique_test_id("ssn"); + let app_name = unique_test_id("app"); + storage + .register_application( + app_name.clone(), + ApplicationAttributes { + shim: Shim::default(), + ..Default::default() + }, + ) + .await + .unwrap(); + storage + .create_session(SessionAttributes { + id: ssn_id.clone(), + application: app_name, + ..Default::default() + }) + .await + .unwrap(); + let ssn_ptr = storage.get_session_ptr(ssn_id.clone()).unwrap(); + (ssn_id, ssn_ptr) + } + mod void_state_tests { use super::*; @@ -218,7 +253,7 @@ mod tests { executor: exe_ptr.clone(), }; - let result = state.bind_session_completed().await; + let result = state.bind_session_completed(None).await; assert!(result.is_err()); assert!(matches!(result, Err(FlameError::InvalidState(_)))); @@ -350,7 +385,7 @@ mod tests { executor: exe_ptr.clone(), }; - let result = state.bind_session_completed().await; + let result = state.bind_session_completed(None).await; assert!(result.is_err()); assert!(matches!(result, Err(FlameError::InvalidState(_)))); @@ -394,18 +429,81 @@ mod tests { async fn test_bind_session_completed_transitions_to_bound() { let exe_ptr = create_test_executor("exe-1", ExecutorState::Binding); let storage = create_mock_storage().await; + let (ssn_id, _) = create_stored_test_session(&storage).await; + { + let mut exe = lock_ptr!(exe_ptr).unwrap(); + exe.ssn_id = Some(ssn_id); + } let state = BindingState { storage, executor: exe_ptr.clone(), }; - let result = state.bind_session_completed().await; + let result = state.bind_session_completed(None).await; assert!(result.is_ok()); assert_eq!(get_state(&exe_ptr).unwrap(), ExecutorState::Bound); } + #[tokio::test] + async fn test_successful_bind_session_completed_without_session_fails() { + let exe_ptr = create_test_executor("exe-1", ExecutorState::Binding); + let storage = create_mock_storage().await; + + let state = BindingState { + storage, + executor: exe_ptr.clone(), + }; + + let result = state.bind_session_completed(None).await; + + assert!(result.is_err()); + assert!(matches!(result, Err(FlameError::InvalidState(_)))); + assert_eq!(get_state(&exe_ptr).unwrap(), ExecutorState::Binding); + } + + #[tokio::test] + async fn test_failed_bind_session_completed_transitions_to_unbinding() { + let exe_ptr = create_test_executor("exe-1", ExecutorState::Binding); + let storage = create_mock_storage().await; + let (ssn_id, _) = create_stored_test_session(&storage).await; + { + let mut exe = lock_ptr!(exe_ptr).unwrap(); + exe.ssn_id = Some(ssn_id.clone()); + exe.task_id = Some(1); + } + + let state = BindingState { + storage: storage.clone(), + executor: exe_ptr.clone(), + }; + + let result = state + .bind_session_completed(Some(FlameResult { + return_code: common::apis::BIND_RESULT_ON_SESSION_ENTER_FAILED, + message: Some("enter failed".to_string()), + })) + .await; + + assert!(result.is_ok()); + let exe = lock_ptr!(exe_ptr).unwrap(); + assert_eq!(exe.state, ExecutorState::Unbinding); + assert_eq!(exe.ssn_id, None); + assert_eq!(exe.task_id, None); + + let session = storage.get_session(ssn_id).unwrap(); + assert_eq!(session.retry_count, 1); + assert_eq!( + session + .events + .iter() + .filter(|event| event.code == common::apis::SESSION_BIND_FAILED) + .count(), + 1 + ); + } + #[tokio::test] async fn test_bind_session_is_idempotent() { let exe_ptr = create_test_executor("exe-1", ExecutorState::Binding); @@ -457,8 +555,13 @@ mod tests { } #[tokio::test] - async fn test_unregister_executor_fails() { + async fn test_unregister_executor_transitions_to_released() { let exe_ptr = create_test_executor("exe-1", ExecutorState::Binding); + { + let mut exe = lock_ptr!(exe_ptr).unwrap(); + exe.ssn_id = Some("ssn-1".to_string()); + exe.task_id = Some(1); + } let state = BindingState { storage: create_mock_storage().await, executor: exe_ptr.clone(), @@ -466,8 +569,11 @@ mod tests { let result = state.unregister_executor().await; - assert!(result.is_err()); - assert!(matches!(result, Err(FlameError::InvalidState(_)))); + assert!(result.is_ok()); + let exe = lock_ptr!(exe_ptr).unwrap(); + assert_eq!(exe.state, ExecutorState::Released); + assert_eq!(exe.ssn_id, None); + assert_eq!(exe.task_id, None); } #[tokio::test] @@ -585,7 +691,7 @@ mod tests { executor: exe_ptr.clone(), }; - let result = state.bind_session_completed().await; + let result = state.bind_session_completed(None).await; assert!(result.is_err()); assert!(matches!(result, Err(FlameError::InvalidState(_)))); @@ -716,7 +822,7 @@ mod tests { executor: exe_ptr.clone(), }; - let result = state.bind_session_completed().await; + let result = state.bind_session_completed(None).await; assert!(result.is_err()); assert!(matches!(result, Err(FlameError::InvalidState(_)))); @@ -809,7 +915,7 @@ mod tests { executor: exe_ptr.clone(), }; - let result = state.bind_session_completed().await; + let result = state.bind_session_completed(None).await; assert!(result.is_err()); assert!(matches!(result, Err(FlameError::InvalidState(_)))); @@ -983,19 +1089,15 @@ mod tests { storage: storage.clone(), executor: exe_ptr.clone(), }; - let ssn = common::apis::Session { - id: "ssn-1".to_string(), - ..Default::default() - }; - let ssn_ptr = new_ptr(ssn); - idle_state.bind_session(ssn_ptr.clone()).await.unwrap(); + let (_, ssn_ptr) = create_stored_test_session(&storage).await; + idle_state.bind_session(ssn_ptr).await.unwrap(); assert_eq!(get_state(&exe_ptr).unwrap(), ExecutorState::Binding); let binding_state = BindingState { storage: storage.clone(), executor: exe_ptr.clone(), }; - binding_state.bind_session_completed().await.unwrap(); + binding_state.bind_session_completed(None).await.unwrap(); assert_eq!(get_state(&exe_ptr).unwrap(), ExecutorState::Bound); let bound_state = BoundState { diff --git a/session_manager/src/controller/executors/releasing.rs b/session_manager/src/controller/executors/releasing.rs index 22abb287..a48dbb91 100644 --- a/session_manager/src/controller/executors/releasing.rs +++ b/session_manager/src/controller/executors/releasing.rs @@ -17,7 +17,7 @@ use crate::controller::executors::States; use crate::model::ExecutorPtr; use crate::storage::StoragePtr; -use common::apis::{ExecutorState, SessionPtr, Task, TaskPtr, TaskResult}; +use common::apis::{ExecutorState, FlameResult, SessionPtr, Task, TaskPtr, TaskResult}; use common::FlameError; pub struct ReleasingState { @@ -60,7 +60,7 @@ impl States for ReleasingState { )) } - async fn bind_session_completed(&self) -> Result<(), FlameError> { + async fn bind_session_completed(&self, _result: Option) -> Result<(), FlameError> { trace_fn!("ReleasingState::bind_session_completed"); Err(FlameError::InvalidState( diff --git a/session_manager/src/controller/executors/unbinding.rs b/session_manager/src/controller/executors/unbinding.rs index d8164233..d7e5ff57 100644 --- a/session_manager/src/controller/executors/unbinding.rs +++ b/session_manager/src/controller/executors/unbinding.rs @@ -16,7 +16,7 @@ use crate::storage::StoragePtr; use stdng::{lock_ptr, logs::TraceFn, trace_fn, MutexPtr}; use crate::model::ExecutorPtr; -use common::apis::{ExecutorState, SessionPtr, Task, TaskPtr, TaskResult, TaskState}; +use common::apis::{ExecutorState, FlameResult, SessionPtr, Task, TaskPtr, TaskResult, TaskState}; use common::FlameError; pub struct UnbindingState { @@ -58,7 +58,7 @@ impl States for UnbindingState { )) } - async fn bind_session_completed(&self) -> Result<(), FlameError> { + async fn bind_session_completed(&self, _result: Option) -> Result<(), FlameError> { trace_fn!("UnbindingState::bind_session_completed"); Err(FlameError::InvalidState( @@ -71,6 +71,8 @@ impl States for UnbindingState { let mut e = lock_ptr!(self.executor)?; e.state = ExecutorState::Unbinding; + e.ssn_id = None; + e.task_id = None; Ok(()) } diff --git a/session_manager/src/controller/executors/void.rs b/session_manager/src/controller/executors/void.rs index b9145f0d..bf9b71ac 100644 --- a/session_manager/src/controller/executors/void.rs +++ b/session_manager/src/controller/executors/void.rs @@ -16,7 +16,7 @@ use crate::storage::StoragePtr; use stdng::{lock_ptr, logs::TraceFn, trace_fn, MutexPtr}; use crate::model::ExecutorPtr; -use common::apis::{ExecutorState, SessionPtr, Task, TaskPtr, TaskResult}; +use common::apis::{ExecutorState, FlameResult, SessionPtr, Task, TaskPtr, TaskResult}; use common::FlameError; pub struct VoidState { @@ -52,7 +52,7 @@ impl States for VoidState { Err(FlameError::InvalidState("Executor is void".to_string())) } - async fn bind_session_completed(&self) -> Result<(), FlameError> { + async fn bind_session_completed(&self, _result: Option) -> Result<(), FlameError> { trace_fn!("VoidState::bind_session_completed"); Err(FlameError::InvalidState("Executor is void".to_string())) diff --git a/session_manager/src/controller/mod.rs b/session_manager/src/controller/mod.rs index c80e8bf9..1b9e7594 100644 --- a/session_manager/src/controller/mod.rs +++ b/session_manager/src/controller/mod.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use common::apis::{ Application, ApplicationAttributes, ApplicationID, CommonData, Event, EventOwner, ExecutorID, - ExecutorState, Node, NodeState, Session, SessionAttributes, SessionID, SessionPtr, + ExecutorState, FlameResult, Node, NodeState, Session, SessionAttributes, SessionID, SessionPtr, SessionState, Task, TaskGID, TaskID, TaskInput, TaskPtr, TaskResult, TaskState, }; @@ -535,8 +535,16 @@ impl Controller { } }; - let ssn_ptr = self.storage.get_session_ptr(ssn_id)?; + let ssn_ptr = match self.storage.get_session_ptr(ssn_id.clone()) { + Ok(ssn_ptr) => ssn_ptr, + Err(FlameError::NotFound(_)) => return Ok(None), + Err(e) => return Err(e), + }; + let ssn = lock_ptr!(ssn_ptr)?; + if ssn.status.state == SessionState::Closed { + return Ok(None); + } Ok(Some((*ssn).clone())) } @@ -572,13 +580,17 @@ impl Controller { Ok(()) } - pub async fn bind_session_completed(&self, id: ExecutorID) -> Result<(), FlameError> { + pub async fn bind_session_completed( + &self, + id: ExecutorID, + result: Option, + ) -> Result<(), FlameError> { trace_fn!("Controller::bind_session_completed"); let exe_ptr = self.storage.get_executor_ptr(id.clone())?; let state = executors::from(self.storage.clone(), exe_ptr.clone())?; - state.bind_session_completed().await?; + state.bind_session_completed(result).await?; let executor = { let exe = lock_ptr!(exe_ptr)?; @@ -601,6 +613,15 @@ impl Controller { Ok(()) } + pub async fn bind_executor_completed( + &self, + id: ExecutorID, + result: Option, + ) -> Result<(), FlameError> { + trace_fn!("Controller::bind_executor_completed"); + self.bind_session_completed(id, result).await + } + pub async fn launch_task(&self, id: ExecutorID) -> Result, FlameError> { trace_fn!("Controller::launch_task"); let exe_ptr = self.storage.get_executor_ptr(id)?; @@ -904,12 +925,22 @@ impl Controller { #[cfg(test)] mod tests { use super::*; - use common::apis::{Node, NodeInfo, NodeState, ResourceRequirement, Shim}; - use common::ctx::{FlameCluster, FlameClusterContext, FlameExecutors, FlameLimits}; + use common::apis::{ + Node, NodeInfo, NodeState, ResourceRequirement, Shim, BIND_RESULT_OK, SESSION_BIND_FAILED, + SESSION_RETRY_LIMIT_REACHED, + }; + use common::ctx::{ + FlameCluster, FlameClusterContext, FlameExecutors, FlameLimits, FlameRecovery, + FlameSessionRecovery, + }; use tokio::sync::mpsc; /// Creates a test storage with a unique SQLite database. async fn create_test_storage() -> StoragePtr { + create_test_storage_with_retry_limits(common::ctx::DEFAULT_SESSION_RETRY_LIMITS).await + } + + async fn create_test_storage_with_retry_limits(retry_limits: u32) -> StoragePtr { let unique_id = format!( "{}_{:?}", chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0), @@ -936,6 +967,9 @@ mod tests { max_sessions: None, max_executors: 10, }, + recovery: FlameRecovery { + session: FlameSessionRecovery { retry_limits }, + }, pprof: None, }, cache: None, @@ -966,10 +1000,292 @@ mod tests { } } + fn create_test_application() -> ApplicationAttributes { + ApplicationAttributes { + shim: Shim::default(), + ..Default::default() + } + } + + fn create_test_session_attr(id: &str) -> SessionAttributes { + SessionAttributes { + id: id.to_string(), + application: "test-app".to_string(), + common_data: None, + min_instances: 0, + max_instances: None, + batch_size: 1, + priority: 0, + resreq: Some(ResourceRequirement { + cpu: 1, + memory: 1024, + gpu: 0, + }), + } + } + + async fn create_binding_executor(controller: &ControllerPtr, ssn_id: &str) -> String { + controller + .register_application("test-app".to_string(), create_test_application()) + .await + .unwrap(); + controller + .storage() + .register_node(&create_test_node("bind-node")) + .await + .unwrap(); + controller + .create_session(create_test_session_attr(ssn_id)) + .await + .unwrap(); + let executor = controller + .create_executor("bind-node".to_string(), ssn_id.to_string()) + .await + .unwrap(); + controller.register_executor(&executor).await.unwrap(); + controller + .bind_session(executor.id.clone(), ssn_id.to_string()) + .await + .unwrap(); + executor.id + } + // ======================================================================== // Controller::register_node Tests // ======================================================================== + mod bind_session_failed_tests { + use super::*; + use uuid::Uuid; + + async fn fail_binding( + controller: &ControllerPtr, + executor_id: &str, + ) -> Result<(), FlameError> { + controller + .bind_executor_completed( + executor_id.to_string(), + Some(FlameResult { + return_code: common::apis::BIND_RESULT_ON_SESSION_ENTER_FAILED, + message: Some("on_session_enter failed: boom".to_string()), + }), + ) + .await + } + + #[tokio::test] + async fn records_failure_event_and_moves_executor_to_unbinding() { + let storage = create_test_storage_with_retry_limits(2).await; + let controller = new_ptr(storage.clone()); + let ssn_id = format!("bind-failed-{}", Uuid::new_v4()); + let executor_id = create_binding_executor(&controller, &ssn_id).await; + + fail_binding(&controller, &executor_id).await.unwrap(); + + let executor = controller.get_executor(executor_id).unwrap(); + assert_eq!(executor.state, ExecutorState::Unbinding); + assert_eq!(executor.ssn_id, None); + assert_eq!(executor.task_id, None); + + let session = controller.get_session(ssn_id).unwrap(); + assert_eq!(session.retry_count, 1); + assert_eq!( + session + .events + .iter() + .filter(|event| event.code == SESSION_BIND_FAILED) + .count(), + 1 + ); + assert!(session.events.iter().any(|event| event + .message + .as_deref() + .is_some_and(|message| message.contains("on_session_enter failed: boom")))); + } + + #[tokio::test] + async fn records_retry_limit_once_and_allows_count_to_exceed_limit() { + let storage = create_test_storage_with_retry_limits(2).await; + let controller = new_ptr(storage.clone()); + let ssn_id = format!("retry-limit-{}", Uuid::new_v4()); + let executor_id = create_binding_executor(&controller, &ssn_id).await; + + fail_binding(&controller, &executor_id).await.unwrap(); + controller + .unbind_executor_completed(executor_id.clone()) + .await + .unwrap(); + controller + .bind_session(executor_id.clone(), ssn_id.clone()) + .await + .unwrap(); + + fail_binding(&controller, &executor_id).await.unwrap(); + controller + .unbind_executor_completed(executor_id.clone()) + .await + .unwrap(); + controller + .bind_session(executor_id.clone(), ssn_id.clone()) + .await + .unwrap(); + + fail_binding(&controller, &executor_id).await.unwrap(); + + let session = controller.get_session(ssn_id).unwrap(); + assert_eq!(session.retry_count, 3); + assert_eq!( + session + .events + .iter() + .filter(|event| event.code == SESSION_BIND_FAILED) + .count(), + 3 + ); + assert_eq!( + session + .events + .iter() + .filter(|event| event.code == SESSION_RETRY_LIMIT_REACHED) + .count(), + 1 + ); + } + + #[tokio::test] + async fn bind_success_keeps_retry_count() { + let storage = create_test_storage_with_retry_limits(2).await; + let controller = new_ptr(storage.clone()); + let ssn_id = format!("retry-keep-{}", Uuid::new_v4()); + let executor_id = create_binding_executor(&controller, &ssn_id).await; + + fail_binding(&controller, &executor_id).await.unwrap(); + controller + .unbind_executor_completed(executor_id.clone()) + .await + .unwrap(); + controller + .bind_session(executor_id.clone(), ssn_id.clone()) + .await + .unwrap(); + + controller + .bind_executor_completed( + executor_id.clone(), + Some(FlameResult { + return_code: BIND_RESULT_OK, + message: None, + }), + ) + .await + .unwrap(); + + let session = controller.get_session(ssn_id).unwrap(); + assert_eq!(session.retry_count, 1); + assert_eq!( + controller.get_executor(executor_id).unwrap().state, + ExecutorState::Bound + ); + } + + #[tokio::test] + async fn bind_success_requires_attached_session() { + let storage = create_test_storage_with_retry_limits(2).await; + let controller = new_ptr(storage.clone()); + let ssn_id = format!("missing-session-{}", Uuid::new_v4()); + let executor_id = create_binding_executor(&controller, &ssn_id).await; + + { + let executor = storage.get_executor_ptr(executor_id.clone()).unwrap(); + let mut executor = lock_ptr!(executor).unwrap(); + executor.ssn_id = None; + } + + let err = controller + .bind_executor_completed( + executor_id.clone(), + Some(FlameResult { + return_code: BIND_RESULT_OK, + message: None, + }), + ) + .await + .unwrap_err(); + + assert!(matches!(err, FlameError::InvalidState(_))); + assert_eq!( + controller.get_executor(executor_id).unwrap().state, + ExecutorState::Binding + ); + } + } + + mod wait_for_session_tests { + use super::*; + use uuid::Uuid; + + #[tokio::test] + async fn returns_none_when_assigned_session_is_closed() { + let storage = create_test_storage().await; + let controller = new_ptr(storage); + let ssn_id = format!("closed-session-{}", Uuid::new_v4()); + let executor_id = create_binding_executor(&controller, &ssn_id).await; + + controller.close_session(ssn_id.clone()).await.unwrap(); + + let session = controller + .wait_for_session(executor_id.clone()) + .await + .unwrap(); + assert!(session.is_none()); + + let executor = controller.get_executor(executor_id.clone()).unwrap(); + assert_eq!(executor.state, ExecutorState::Binding); + assert_eq!(executor.ssn_id, Some(ssn_id)); + + controller + .unregister_executor(executor_id.clone()) + .await + .unwrap(); + + assert!(matches!( + controller.get_executor(executor_id).unwrap_err(), + FlameError::NotFound(_) + )); + } + + #[tokio::test] + async fn returns_none_when_assigned_session_is_missing() { + let storage = create_test_storage().await; + let controller = new_ptr(storage); + let ssn_id = format!("missing-session-{}", Uuid::new_v4()); + let executor_id = create_binding_executor(&controller, &ssn_id).await; + + controller.close_session(ssn_id.clone()).await.unwrap(); + controller.delete_session(ssn_id.clone()).await.unwrap(); + + let session = controller + .wait_for_session(executor_id.clone()) + .await + .unwrap(); + assert!(session.is_none()); + + let executor = controller.get_executor(executor_id.clone()).unwrap(); + assert_eq!(executor.state, ExecutorState::Binding); + assert_eq!(executor.ssn_id, Some(ssn_id)); + + controller + .unregister_executor(executor_id.clone()) + .await + .unwrap(); + + assert!(matches!( + controller.get_executor(executor_id).unwrap_err(), + FlameError::NotFound(_) + )); + } + } + mod register_node_tests { use super::*; use std::collections::HashSet; diff --git a/session_manager/src/controller/nodes/mod.rs b/session_manager/src/controller/nodes/mod.rs index 2621a3bd..7ee23b88 100644 --- a/session_manager/src/controller/nodes/mod.rs +++ b/session_manager/src/controller/nodes/mod.rs @@ -133,6 +133,7 @@ mod tests { max_sessions: None, max_executors: 10, }, + recovery: Default::default(), pprof: None, }, cache: None, diff --git a/session_manager/src/events/fs.rs b/session_manager/src/events/fs.rs index 38d4a2e5..9c81654a 100644 --- a/session_manager/src/events/fs.rs +++ b/session_manager/src/events/fs.rs @@ -167,9 +167,9 @@ impl EventManager for FsEventManager { fn find_events(&self, owner: EventOwner) -> Result, FlameError> { let mut event_storage = lock_ptr!(self.event_storage)?; - let storage = event_storage - .get_mut(&owner.session_id) - .ok_or(FlameError::Internal("Event storage not found".to_string()))?; + let Some(storage) = event_storage.get_mut(&owner.session_id) else { + return Ok(vec![]); + }; let events = lock_ptr!(self.events)?; let event_daos = events diff --git a/session_manager/src/events/mod.rs b/session_manager/src/events/mod.rs index e1209594..54ce6a5e 100644 --- a/session_manager/src/events/mod.rs +++ b/session_manager/src/events/mod.rs @@ -204,7 +204,21 @@ mod tests { manager.remove_events(session_id.clone()).unwrap(); let result = manager.find_events(owner); - assert!(result.is_err() || result.unwrap().is_empty()); + assert!(result.unwrap().is_empty()); + } + + #[test] + fn test_fs_event_manager_find_nonexistent_session() { + let temp_dir = tempfile::tempdir().unwrap(); + let manager = FsEventManager::new(temp_dir.path().to_str().unwrap()).unwrap(); + + let events = manager + .find_events(EventOwner { + session_id: "missing-session".to_string(), + task_id: 0, + }) + .unwrap(); + assert!(events.is_empty()); } #[test] diff --git a/session_manager/src/model/mod.rs b/session_manager/src/model/mod.rs index be54b6f8..decac939 100644 --- a/session_manager/src/model/mod.rs +++ b/session_manager/src/model/mod.rs @@ -28,16 +28,19 @@ use common::apis::{ Application, ExecutorID, ExecutorState, Node, NodeState, ResourceRequirement, Session, SessionID, SessionState, Shim, Task, TaskID, TaskState, }; -use common::FlameError; +use common::{ctx::DEFAULT_SESSION_RETRY_LIMITS, FlameError}; use rpc::flame::v1 as rpc; pub type SessionInfoPtr = Arc; pub type ExecutorInfoPtr = Arc; pub type NodeInfoPtr = Arc; pub type AppInfoPtr = Arc; +pub type SessionPredicate = fn(&SessionInfo, u32) -> bool; #[derive(Clone)] pub struct SnapShot { + pub session_retry_limits: u32, + pub applications: MutexPtr>, pub sessions: MutexPtr>, @@ -59,7 +62,12 @@ impl Default for SnapShot { impl SnapShot { pub fn new() -> Self { + Self::new_with_session_retry_limits(DEFAULT_SESSION_RETRY_LIMITS) + } + + pub fn new_with_session_retry_limits(session_retry_limits: u32) -> Self { SnapShot { + session_retry_limits, applications: Arc::new(Mutex::new(HashMap::new())), sessions: Arc::new(Mutex::new(HashMap::new())), ssn_index: Arc::new(Mutex::new(HashMap::new())), @@ -132,6 +140,13 @@ pub struct SessionInfo { pub batch_size: u32, pub priority: u32, pub resreq: Option, + pub retry_count: u32, +} + +impl SessionInfo { + pub fn is_ready(&self, retry_limits: u32) -> bool { + self.retry_count < retry_limits + } } #[derive(Clone, Debug, Default)] @@ -235,6 +250,7 @@ impl From<&Session> for SessionInfo { batch_size: ssn.batch_size.max(1), priority: ssn.priority, resreq: ssn.resreq.clone(), + retry_count: ssn.retry_count, } } } @@ -248,6 +264,8 @@ pub struct SessionFilter { pub state: Option, /// Filter by session IDs pub ids: Option>, + /// Additional in-memory predicate filter. + pub predicate: Option, } impl SessionFilter { @@ -256,6 +274,7 @@ impl SessionFilter { Self { state: None, ids: None, + predicate: None, } } @@ -264,6 +283,7 @@ impl SessionFilter { Self { state: Some(state), ids: None, + predicate: None, } } @@ -272,8 +292,15 @@ impl SessionFilter { Self { state: None, ids: Some(ids), + predicate: None, } } + + /// Adds an in-memory predicate filter. + pub const fn with_predicate(mut self, predicate: SessionPredicate) -> Self { + self.predicate = Some(predicate); + self + } } impl Default for SessionFilter { @@ -283,6 +310,10 @@ impl Default for SessionFilter { } pub const OPEN_SESSION: Option = Some(SessionFilter::by_state(SessionState::Open)); +pub const READY_SESSION: Option = Some( + SessionFilter::by_state(SessionState::Open) + .with_predicate(|ssn, retry_limits| ssn.is_ready(retry_limits)), +); /// Filter for listing executors. /// All fields are Option: @@ -567,6 +598,14 @@ impl SnapShot { .collect(), }; + let filtered: Vec = match filter.predicate { + None => filtered, + Some(predicate) => filtered + .into_iter() + .filter(|ssn| predicate(ssn.as_ref(), self.session_retry_limits)) + .collect(), + }; + Ok(filtered .into_iter() .map(|ssn| (ssn.id.clone(), ssn)) @@ -950,9 +989,37 @@ mod tests { batch_size: 1, priority: 0, resreq, + retry_count: 0, }) } + #[test] + fn session_info_ready_uses_retry_limit() { + let mut session = create_test_session("ssn-ready", Some(unit_rr()), SessionState::Open) + .as_ref() + .clone(); + + session.retry_count = 1; + assert!(session.is_ready(2)); + + session.retry_count = 2; + assert!(!session.is_ready(2)); + } + + #[test] + fn session_info_from_session_copies_retry_count() { + let session = Session { + id: "ssn-1".to_string(), + application: "test-app".to_string(), + retry_count: 7, + ..Default::default() + }; + + let info = SessionInfo::from(&session); + + assert_eq!(info.retry_count, 7); + } + /// Default per-slot unit kept for backward-compatible test scaffolding. fn unit_rr() -> ResourceRequirement { ResourceRequirement { @@ -1030,6 +1097,29 @@ mod tests { assert_eq!(all_ssns.len(), 3); } + #[test] + fn test_snapshot_find_ready_sessions() { + let ss = SnapShot::new_with_session_retry_limits(2); + + let ready_open = create_test_session("ssn-ready", Some(slots_rr(2)), SessionState::Open); + let mut not_ready_open = + create_test_session("ssn-not-ready", Some(slots_rr(2)), SessionState::Open) + .as_ref() + .clone(); + not_ready_open.retry_count = 2; + let closed_ready = + create_test_session("ssn-closed-ready", Some(slots_rr(2)), SessionState::Closed); + + ss.add_session(ready_open.clone()).unwrap(); + ss.add_session(Arc::new(not_ready_open)).unwrap(); + ss.add_session(closed_ready.clone()).unwrap(); + + let ready_ssns = ss.find_sessions(READY_SESSION).unwrap(); + + assert_eq!(ready_ssns.len(), 1); + assert!(ready_ssns.contains_key("ssn-ready")); + } + /// Test that update_executor_state correctly updates the exec_index. #[test] fn test_snapshot_update_executor_state() { diff --git a/session_manager/src/scheduler/actions/allocate.rs b/session_manager/src/scheduler/actions/allocate.rs index 38c3e788..a0375ad8 100644 --- a/session_manager/src/scheduler/actions/allocate.rs +++ b/session_manager/src/scheduler/actions/allocate.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use stdng::collections::{BinaryHeap, Cmp}; use stdng::{logs::TraceFn, trace_fn}; -use crate::model::{ALL_NODE, OPEN_SESSION, UNBINDING_EXECUTOR, VOID_EXECUTOR}; +use crate::model::{ALL_NODE, READY_SESSION, UNBINDING_EXECUTOR, VOID_EXECUTOR}; use crate::scheduler::actions::{Action, ActionPtr}; use crate::scheduler::plugins::node_order_fn; use crate::scheduler::plugins::ssn_order_fn; @@ -42,7 +42,7 @@ impl Action for AllocateAction { ss.debug()?; let mut open_ssns = BinaryHeap::new(ssn_order_fn(ctx)); - let ssn_list = ss.find_sessions(OPEN_SESSION)?; + let ssn_list = ss.find_sessions(READY_SESSION)?; for ssn in ssn_list.values() { open_ssns.push(ssn.clone()); } diff --git a/session_manager/src/scheduler/actions/dispatch.rs b/session_manager/src/scheduler/actions/dispatch.rs index 8c0ab475..e7b11f60 100644 --- a/session_manager/src/scheduler/actions/dispatch.rs +++ b/session_manager/src/scheduler/actions/dispatch.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use stdng::collections::{BinaryHeap, Cmp}; use stdng::{logs::TraceFn, trace_fn}; -use crate::model::{IDLE_EXECUTOR, OPEN_SESSION}; +use crate::model::{IDLE_EXECUTOR, READY_SESSION}; use crate::scheduler::actions::{Action, ActionPtr}; use crate::scheduler::plugins::ssn_order_fn; use crate::scheduler::statement::Statement; @@ -41,7 +41,7 @@ impl Action for DispatchAction { ss.debug()?; let mut open_ssns = BinaryHeap::new(ssn_order_fn(ctx)); - let ssn_list = ss.find_sessions(OPEN_SESSION)?; + let ssn_list = ss.find_sessions(READY_SESSION)?; for ssn in ssn_list.values() { open_ssns.push(ssn.clone()); } diff --git a/session_manager/src/scheduler/actions/shuffle.rs b/session_manager/src/scheduler/actions/shuffle.rs index cadd2795..2674c78d 100644 --- a/session_manager/src/scheduler/actions/shuffle.rs +++ b/session_manager/src/scheduler/actions/shuffle.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use stdng::collections::{BinaryHeap, Cmp}; use stdng::{logs::TraceFn, trace_fn}; -use crate::model::{BOUND_EXECUTOR, IDLE_EXECUTOR, OPEN_SESSION}; +use crate::model::{BOUND_EXECUTOR, IDLE_EXECUTOR, READY_SESSION}; use crate::scheduler::actions::{Action, ActionPtr}; use crate::scheduler::ctx::Context; use crate::scheduler::plugins::ssn_order_fn; @@ -38,7 +38,7 @@ impl Action for ShuffleAction { let ss = ctx.snapshot.clone(); let mut underused = BinaryHeap::new(ssn_order_fn(ctx)); - let open_ssns = ss.find_sessions(OPEN_SESSION)?; + let open_ssns = ss.find_sessions(READY_SESSION)?; for ssn in open_ssns.values() { if ctx.is_underused(ssn)? { underused.push(ssn.clone()); diff --git a/session_manager/src/scheduler/mod.rs b/session_manager/src/scheduler/mod.rs index 804c6934..a10e7e18 100644 --- a/session_manager/src/scheduler/mod.rs +++ b/session_manager/src/scheduler/mod.rs @@ -79,8 +79,7 @@ mod tests { use common::apis::{ Application, ApplicationAttributes, Node, NodeInfo, NodeState, ResourceRequirement, Shim, }; - use common::ctx::FlameCluster; - use common::ctx::FlameClusterContext; + use common::ctx::{FlameCluster, FlameClusterContext, FlameRecovery, FlameSessionRecovery}; use common::FlameError; use std::collections::HashMap; use std::sync::Arc; @@ -133,6 +132,10 @@ mod tests { impl TestEnv { pub fn new() -> Result { + Self::new_with_retry_limit(common::ctx::DEFAULT_SESSION_RETRY_LIMITS) + } + + pub fn new_with_retry_limit(retry_limits: u32) -> Result { let filter = tracing_subscriber::EnvFilter::from_default_env() .add_directive("h2=error".parse()?) .add_directive("hyper_util=error".parse()?) @@ -150,6 +153,9 @@ mod tests { let config = FlameClusterContext { cluster: FlameCluster { storage: format!("sqlite:///{url}"), + recovery: FlameRecovery { + session: FlameSessionRecovery { retry_limits }, + }, ..Default::default() }, ..Default::default() @@ -272,4 +278,65 @@ mod tests { assert_eq!(plugins_ptr, Arc::as_ptr(&ctx.plugins)); Ok(()) } + + #[test] + fn test_scheduler_skips_not_ready_session() -> Result<(), FlameError> { + let env = TestEnv::new_with_retry_limit(1)?; + let controller = env.controller.clone(); + + tokio_test::block_on( + controller.register_application("flmtest".to_string(), new_test_application()), + )?; + tokio_test::block_on( + controller + .storage() + .register_node(&new_test_node("node_1".to_string())), + )?; + + let ssn_id = format!("not-ready-{}", Uuid::new_v4()); + tokio_test::block_on(controller.create_session(common::apis::SessionAttributes { + id: ssn_id.clone(), + application: "flmtest".to_string(), + common_data: None, + min_instances: 0, + max_instances: None, + batch_size: 1, + priority: 0, + resreq: Some(common::apis::ResourceRequirement { + cpu: 1, + memory: 1024 * 1024 * 1024, + gpu: 0, + }), + }))?; + tokio_test::block_on(controller.create_task(ssn_id.clone(), None))?; + + { + let ssn_ptr = controller.storage().get_session_ptr(ssn_id.clone())?; + let mut ssn = stdng::lock_ptr!(ssn_ptr)?; + ssn.retry_count = 1; + } + + let executor = + tokio_test::block_on(controller.create_executor("node_1".to_string(), ssn_id.clone()))?; + tokio_test::block_on(controller.register_executor(&executor))?; + + let default_policies: Vec = common::ctx::DEFAULT_POLICIES + .iter() + .map(|s| s.to_string()) + .collect(); + let mut ctx = Context::new(controller.clone(), &default_policies)?; + + let dispatch = DispatchAction::new_ptr(); + tokio_test::block_on(dispatch.execute(&mut ctx))?; + + let alloc = AllocateAction::new_ptr(); + tokio_test::block_on(alloc.execute(&mut ctx))?; + + let executors = controller.list_executor()?; + assert_eq!(executors.len(), 1); + assert_eq!(executors[0].state, common::apis::ExecutorState::Idle); + assert_eq!(executors[0].ssn_id, None); + + Ok(()) + } } diff --git a/session_manager/src/scheduler/plugins/gang.rs b/session_manager/src/scheduler/plugins/gang.rs index 198f735f..51eec191 100644 --- a/session_manager/src/scheduler/plugins/gang.rs +++ b/session_manager/src/scheduler/plugins/gang.rs @@ -151,6 +151,7 @@ mod tests { memory: 1024, gpu: 0, }), + retry_count: 0, }) } diff --git a/session_manager/src/scheduler/plugins/priority.rs b/session_manager/src/scheduler/plugins/priority.rs index dd711d8d..d719a579 100644 --- a/session_manager/src/scheduler/plugins/priority.rs +++ b/session_manager/src/scheduler/plugins/priority.rs @@ -459,6 +459,7 @@ mod tests { batch_size, priority, resreq: Some(slots_to_rr(slots.into())), + retry_count: 0, }) } diff --git a/session_manager/src/storage/engine/filesystem.rs b/session_manager/src/storage/engine/filesystem.rs index 64430e7c..1114ac8b 100644 --- a/session_manager/src/storage/engine/filesystem.rs +++ b/session_manager/src/storage/engine/filesystem.rs @@ -778,6 +778,7 @@ impl FilesystemEngine { batch_size: meta.batch_size.max(1), priority: meta.priority, resreq, + retry_count: 0, }) } diff --git a/session_manager/src/storage/engine/none.rs b/session_manager/src/storage/engine/none.rs index c078db4c..bba02ce2 100644 --- a/session_manager/src/storage/engine/none.rs +++ b/session_manager/src/storage/engine/none.rs @@ -200,6 +200,7 @@ impl Engine for NoneEngine { tasks: HashMap::new(), tasks_index: HashMap::new(), events: vec![], + retry_count: 0, }) } diff --git a/session_manager/src/storage/engine/types.rs b/session_manager/src/storage/engine/types.rs index f0a92837..6c0ef92e 100644 --- a/session_manager/src/storage/engine/types.rs +++ b/session_manager/src/storage/engine/types.rs @@ -174,6 +174,7 @@ impl TryFrom<&SessionDao> for Session { batch_size: ssn.batch_size.max(1) as u32, priority: ssn.priority as u32, resreq, + retry_count: 0, }) } } diff --git a/session_manager/src/storage/load_data_tests.rs b/session_manager/src/storage/load_data_tests.rs index b9ca6b01..e5047524 100644 --- a/session_manager/src/storage/load_data_tests.rs +++ b/session_manager/src/storage/load_data_tests.rs @@ -39,6 +39,7 @@ mod tests { max_sessions: None, max_executors: 10, }, + recovery: Default::default(), pprof: None, }, cache: None, diff --git a/session_manager/src/storage/mod.rs b/session_manager/src/storage/mod.rs index 016a5855..57098c97 100644 --- a/session_manager/src/storage/mod.rs +++ b/session_manager/src/storage/mod.rs @@ -84,8 +84,12 @@ fn derive_events_path(storage_url: &str) -> String { } impl Storage { + pub fn session_retry_limits(&self) -> u32 { + self.context.cluster.recovery.session.retry_limits + } + pub fn snapshot(&self) -> Result { - let res = SnapShot::new(); + let res = SnapShot::new_with_session_retry_limits(self.session_retry_limits()); { let node_map = lock_ptr!(self.nodes)?; @@ -536,7 +540,11 @@ impl Storage { pub fn get_session(&self, id: SessionID) -> Result { let ssn_ptr = self.get_session_ptr(id)?; let ssn = lock_ptr!(ssn_ptr)?; - Ok(ssn.clone()) + let mut ssn = ssn.clone(); + ssn.events = self + .event_manager + .find_events(EventOwner::session(ssn.id.clone()))?; + Ok(ssn) } pub fn get_session_ptr(&self, id: SessionID) -> Result { @@ -637,7 +645,11 @@ impl Storage { for ssn in ssn_map.deref().values() { let ssn = lock_ptr!(ssn)?; - ssn_list.push(ssn.clone()); + let mut ssn = ssn.clone(); + ssn.events = self + .event_manager + .find_events(EventOwner::session(ssn.id.clone()))?; + ssn_list.push(ssn); } Ok(ssn_list) diff --git a/session_manager/src/storage/session_tests.rs b/session_manager/src/storage/session_tests.rs index c0126b62..834b8af9 100644 --- a/session_manager/src/storage/session_tests.rs +++ b/session_manager/src/storage/session_tests.rs @@ -14,7 +14,10 @@ limitations under the License. #[cfg(test)] mod tests { use crate::storage; - use common::apis::{ResourceRequirement, SessionAttributes, SessionState, TaskState}; + use chrono::Utc; + use common::apis::{ + Event, EventOwner, ResourceRequirement, SessionAttributes, SessionState, TaskState, + }; use common::ctx::{FlameCluster, FlameClusterContext}; fn test_context() -> FlameClusterContext { @@ -125,6 +128,32 @@ mod tests { let ssn = stdng::lock_ptr!(ssn_ptr).unwrap(); assert_eq!(ssn.id, "ptr-test-ssn"); } + + #[tokio::test] + async fn includes_session_events() { + let ctx = test_context(); + let storage = storage::new_ptr(&ctx).await.unwrap(); + + let attr = create_session_attr("event-test-ssn"); + storage.create_session(attr).await.unwrap(); + storage + .record_event( + EventOwner::session("event-test-ssn".to_string()), + Event { + code: 1001, + message: Some("bind failed".to_string()), + creation_time: Utc::now(), + }, + ) + .await + .unwrap(); + + let ssn = storage.get_session("event-test-ssn".to_string()).unwrap(); + + assert_eq!(ssn.events.len(), 1); + assert_eq!(ssn.events[0].code, 1001); + assert_eq!(ssn.events[0].message.as_deref(), Some("bind failed")); + } } mod open_session { @@ -372,5 +401,38 @@ mod tests { assert_eq!(open_count, 1); assert_eq!(closed_count, 1); } + + #[tokio::test] + async fn includes_session_events() { + let ctx = test_context(); + let storage = storage::new_ptr(&ctx).await.unwrap(); + + let attr = create_session_attr("list-event-ssn"); + storage.create_session(attr).await.unwrap(); + storage + .record_event( + EventOwner::session("list-event-ssn".to_string()), + Event { + code: 1002, + message: Some("retry limit reached".to_string()), + creation_time: Utc::now(), + }, + ) + .await + .unwrap(); + + let sessions = storage.list_session().unwrap(); + let session = sessions + .iter() + .find(|session| session.id == "list-event-ssn") + .expect("session should be listed"); + + assert_eq!(session.events.len(), 1); + assert_eq!(session.events[0].code, 1002); + assert_eq!( + session.events[0].message.as_deref(), + Some("retry limit reached") + ); + } } }