Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .github/workflows/e2e-bm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ jobs:
echo "=== Check flmctl connectivity ==="
flmctl list -a || echo "Warning: Could not list applications"
echo ""
echo "=== Flame nodes ==="
flmctl list -n || echo "Warning: Could not list nodes"
echo ""
echo "=== Flame node details ==="
for node in $(flmctl list -n 2>/dev/null | awk 'NR > 1 {print $1}'); do
echo "--- Node: $node ---"
flmctl view -n "$node" || echo "Warning: Could not view node $node"
done
echo ""
echo "=== Verify object cache is accessible ==="
curl -s http://127.0.0.1:9090/ || echo "Note: Object cache gRPC endpoint (expected no HTTP response)"

Expand Down
9 changes: 9 additions & 0 deletions common/src/apis/from_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ impl From<rpc::Event> for Event {
}
}

impl From<rpc::Result> for FlameResult {
fn from(result: rpc::Result) -> Self {
Self {
return_code: result.return_code,
message: result.message,
}
}
}

impl TryFrom<rpc::Task> for TaskContext {
type Error = FlameError;

Expand Down
28 changes: 28 additions & 0 deletions common/src/apis/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ impl Session {
self.status.state == SessionState::Closed
}

pub fn is_ready(&self, retry_limits: u32) -> bool {
self.retry_count < retry_limits
}

pub fn update_task(&mut self, task: &Task) -> Result<(), FlameError> {
let task_ptr = TaskPtr::new(task.clone().into());

Expand Down Expand Up @@ -127,3 +131,27 @@ impl Session {
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn is_ready_uses_transient_retry_count() {
assert!(Session {
retry_count: 1,
..Default::default()
}
.is_ready(2));
assert!(!Session {
retry_count: 2,
..Default::default()
}
.is_ready(2));
assert!(!Session {
retry_count: 3,
..Default::default()
}
.is_ready(2));
}
}
9 changes: 9 additions & 0 deletions common/src/apis/to_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ impl From<Event> for rpc::Event {
}
}

impl From<FlameResult> for rpc::Result {
fn from(result: FlameResult) -> Self {
Self {
return_code: result.return_code,
message: result.message,
}
}
}

impl From<TaskContext> for rpc::TaskContext {
fn from(ctx: TaskContext) -> Self {
Self {
Expand Down
39 changes: 39 additions & 0 deletions common/src/apis/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ use stdng::MutexPtr;

pub const DEFAULT_MAX_INSTANCES: u32 = 1_000_000;
pub const DEFAULT_DELAY_RELEASE: Duration = Duration::seconds(60);
pub const BIND_RESULT_OK: i32 = 0;
pub const BIND_RESULT_APPLICATION_INSTALL_FAILED: i32 = 10;
pub const BIND_RESULT_SHIM_CREATE_FAILED: i32 = 11;
pub const BIND_RESULT_ON_SESSION_ENTER_FAILED: i32 = 12;
pub const BIND_RESULT_UNKNOWN_FAILED: i32 = 19;
pub const SESSION_EVENT_TASK_ID: i64 = 0;
pub const SESSION_BIND_FAILED: i32 = 1001;
pub const SESSION_RETRY_LIMIT_REACHED: i32 = 1002;

pub type SessionID = String;
pub type TaskID = i64;
Expand All @@ -42,13 +50,28 @@ pub struct EventOwner {
pub session_id: SessionID,
}

impl EventOwner {
pub fn session(session_id: SessionID) -> Self {
Self {
session_id,
task_id: SESSION_EVENT_TASK_ID,
}
}
}

#[derive(Clone, Debug)]
pub struct Event {
pub code: i32,
pub message: Option<String>,
pub creation_time: DateTime<Utc>,
}

#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct FlameResult {
pub return_code: i32,
pub message: Option<String>,
}

#[derive(Clone, Debug, Default)]
pub struct TaskResult {
pub state: TaskState,
Expand Down Expand Up @@ -226,6 +249,7 @@ pub struct Session {
pub batch_size: u32,
pub priority: u32,
pub resreq: Option<ResourceRequirement>,
pub retry_count: u32,
}

#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Hash, strum_macros::Display)]
Expand Down Expand Up @@ -735,6 +759,21 @@ mod tests {
assert!(!rr(2, 2, 5).great(&rr(1, 2, 3)));
}

#[test]
fn flame_result_converts_to_and_from_rpc_result() {
let result = FlameResult {
return_code: 12,
message: Some("bind failed".to_string()),
};

let rpc_result: rpc::flame::v1::Result = result.clone().into();
assert_eq!(rpc_result.return_code, 12);
assert_eq!(rpc_result.message.as_deref(), Some("bind failed"));

let parsed = FlameResult::from(rpc_result);
assert_eq!(parsed, result);
}

#[test]
fn add_sums_all_three_fields() {
let mut a = rr(1, 2, 3);
Expand Down
80 changes: 80 additions & 0 deletions common/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const DEFAULT_FLAME_ENDPOINT: &str = "http://127.0.0.1:8080";
pub const DEFAULT_POLICIES: &[&str] = &["priority", "drf", "gang"];
const DEFAULT_STORAGE: &str = "sqlite://flame.db";
const DEFAULT_MAX_EXECUTORS_PER_NODE: u32 = 128;
pub const DEFAULT_SESSION_RETRY_LIMITS: u32 = 5;
const DEFAULT_SCHEDULE_INTERVAL: u64 = 100;
const DEFAULT_SHIM: &str = "host";
const DEFAULT_FLAME_CACHE_ENDPOINT: &str = "http://127.0.0.1:9090";
Expand Down Expand Up @@ -69,6 +70,18 @@ struct FlameClusterYaml {
pub limits: Option<FlameLimitsYaml>,
/// pprof profiling configuration
pub pprof: Option<FlamePprofYaml>,
/// Recovery configuration
pub recovery: Option<FlameRecoveryYaml>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct FlameRecoveryYaml {
pub session: Option<FlameSessionRecoveryYaml>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct FlameSessionRecoveryYaml {
pub retry_limits: Option<u32>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -148,6 +161,7 @@ pub struct FlameCluster {
pub executors: FlameExecutors,
pub tls: Option<FlameTls>,
pub limits: FlameLimits,
pub recovery: FlameRecovery,
pub pprof: Option<FlamePprof>,
}

Expand All @@ -162,6 +176,16 @@ pub struct FlameLimits {
pub max_executors: u32,
}

#[derive(Debug, Clone, Default)]
pub struct FlameRecovery {
pub session: FlameSessionRecovery,
}

#[derive(Debug, Clone)]
pub struct FlameSessionRecovery {
pub retry_limits: u32,
}

#[derive(Debug, Clone)]
pub struct FlamePprof {
pub port: u16,
Expand Down Expand Up @@ -393,6 +417,10 @@ impl TryFrom<FlameClusterYaml> for FlameCluster {
let limits = cluster.limits.map(FlameLimits::from).unwrap_or_default();

let pprof = cluster.pprof.map(FlamePprof::from);
let recovery = cluster
.recovery
.map(FlameRecovery::from)
.unwrap_or_default();

Ok(FlameCluster {
name: cluster.name,
Expand All @@ -412,11 +440,39 @@ impl TryFrom<FlameClusterYaml> for FlameCluster {
executors,
tls,
limits,
recovery,
pprof,
})
}
}

impl From<FlameRecoveryYaml> for FlameRecovery {
fn from(yaml: FlameRecoveryYaml) -> Self {
FlameRecovery {
session: yaml
.session
.map(FlameSessionRecovery::from)
.unwrap_or_default(),
}
}
}

impl From<FlameSessionRecoveryYaml> for FlameSessionRecovery {
fn from(yaml: FlameSessionRecoveryYaml) -> Self {
FlameSessionRecovery {
retry_limits: yaml.retry_limits.unwrap_or(DEFAULT_SESSION_RETRY_LIMITS),
}
}
}

impl Default for FlameSessionRecovery {
fn default() -> Self {
FlameSessionRecovery {
retry_limits: DEFAULT_SESSION_RETRY_LIMITS,
}
}
}

impl TryFrom<FlameExecutorsYaml> for FlameExecutors {
type Error = FlameError;
fn try_from(executors: FlameExecutorsYaml) -> Result<Self, Self::Error> {
Expand Down Expand Up @@ -464,6 +520,7 @@ impl Default for FlameCluster {
executors: FlameExecutors::default(),
tls: None,
limits: FlameLimits::default(),
recovery: FlameRecovery::default(),
pprof: None,
}
}
Expand Down Expand Up @@ -579,6 +636,29 @@ cluster:
Ok(())
}

#[test]
fn test_flame_context_with_session_recovery_retry_limits() -> Result<(), FlameError> {
let context_string = r#"---
cluster:
name: flame
endpoint: "http://flame-session-manager:8080"
recovery:
session:
retry_limits: 2
"#;

let tmp_dir = TempDir::new().unwrap();
let tmp_file = tmp_dir.path().join("flame-cluster.yaml");

fs::write(&tmp_file, context_string).map_err(|e| FlameError::Internal(e.to_string()))?;

let ctx = FlameClusterContext::from_file(Some(tmp_file.to_string_lossy().to_string()))?;

assert_eq!(ctx.cluster.recovery.session.retry_limits, 2);

Ok(())
}

#[test]
fn test_flame_context_with_cache_eviction() -> Result<(), FlameError> {
let context_string = r#"---
Expand Down
Loading
Loading