Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ci/flame-cluster-benchmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ cluster:
name: flame
endpoint: "https://flame-session-manager:8080"
slot: "cpu=1,mem=1g"
policy: priority
policies:
- priority
- fairshare
- gang
storage: none
schedule_interval: 100
executors:
Expand Down
5 changes: 4 additions & 1 deletion ci/flame-cluster-docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ cluster:
name: flame
endpoint: "https://flame-session-manager:8080"
slot: "cpu=1,mem=1g"
policy: priority
policies:
- priority
- fairshare
- gang
storage: fs://data/
schedule_interval: 500 # Scheduler loop interval in milliseconds (default: 500)
executors:
Expand Down
5 changes: 4 additions & 1 deletion ci/flame-cluster-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ cluster:
name: flame
endpoint: "http://127.0.0.1:30080"
slot: "cpu=1,mem=1g"
policy: priority
policies:
- priority
- fairshare
- gang
storage: fs://data/
schedule_interval: 100 # Scheduler loop interval in milliseconds (default: 500)
executors:
Expand Down
4 changes: 3 additions & 1 deletion ci/flame-cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ cluster:
name: flame
endpoint: "https://flame-session-manager:8080"
slot: "cpu=1,mem=1g"
policy: priority
policies:
- priority
- gang
storage: none
schedule_interval: 100
executors:
Expand Down
21 changes: 14 additions & 7 deletions common/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ const DEFAULT_FLAME_CONF: &str = "flame-cluster.yaml";
const DEFAULT_CONTEXT_NAME: &str = "flame";
const DEFAULT_FLAME_ENDPOINT: &str = "http://127.0.0.1:8080";
const DEFAULT_SLOT: &str = "cpu=1,mem=2g";
const DEFAULT_POLICY: &str = "proportion";
/// Default policies to enable when none specified in config.
/// Available configurable policies: "priority", "fairshare", "gang"
/// Note: "shim" plugin is always enabled (required for executor matching)
pub const DEFAULT_POLICIES: &[&str] = &["priority", "fairshare", "gang"];
const DEFAULT_STORAGE: &str = "sqlite://flame.db";
const DEFAULT_MAX_EXECUTORS_PER_NODE: u32 = 128;
const DEFAULT_SCHEDULE_INTERVAL: u64 = 100;
Expand All @@ -52,7 +55,7 @@ struct FlameClusterYaml {
pub name: String,
pub endpoint: String,
pub slot: Option<String>,
pub policy: Option<String>,
pub policies: Option<Vec<String>>,
pub storage: Option<String>,
/// Schedule interval in milliseconds for the session scheduler loop
pub schedule_interval: Option<u64>,
Expand Down Expand Up @@ -135,7 +138,7 @@ pub struct FlameCluster {
pub name: String,
pub endpoint: String,
pub slot: ResourceRequirement,
pub policy: String,
pub policies: Vec<String>,
pub storage: String,
pub schedule_interval: u64,
pub executors: FlameExecutors,
Expand Down Expand Up @@ -386,7 +389,9 @@ impl TryFrom<FlameClusterYaml> for FlameCluster {
name: cluster.name,
endpoint: cluster.endpoint,
slot: ResourceRequirement::from(&cluster.slot.unwrap_or(DEFAULT_SLOT.to_string())),
policy: cluster.policy.unwrap_or(DEFAULT_POLICY.to_string()),
policies: cluster
.policies
.unwrap_or_else(|| DEFAULT_POLICIES.iter().map(|s| s.to_string()).collect()),
storage: cluster.storage.unwrap_or(DEFAULT_STORAGE.to_string()),
schedule_interval: cluster
.schedule_interval
Expand Down Expand Up @@ -440,7 +445,7 @@ impl Default for FlameCluster {
name: DEFAULT_CONTEXT_NAME.to_string(),
endpoint: DEFAULT_FLAME_ENDPOINT.to_string(),
slot: ResourceRequirement::from(&DEFAULT_SLOT.to_string()),
policy: DEFAULT_POLICY.to_string(),
policies: DEFAULT_POLICIES.iter().map(|s| s.to_string()).collect(),
storage: DEFAULT_STORAGE.to_string(),
schedule_interval: DEFAULT_SCHEDULE_INTERVAL,
executors: FlameExecutors::default(),
Expand Down Expand Up @@ -530,7 +535,9 @@ cluster:
name: flame
endpoint: "http://flame-session-manager:8080"
slot: "cpu=1,mem=1g"
policy: priority
policies:
- priority
- gang
storage: sqlite://flame.db
executors:
shim: host
Expand All @@ -548,7 +555,7 @@ cluster:
assert_eq!(ctx.cluster.name, "flame");
assert_eq!(ctx.cluster.endpoint, "http://flame-session-manager:8080");
assert_eq!(ctx.cluster.slot, ResourceRequirement::from("cpu=1,mem=1g"));
assert_eq!(ctx.cluster.policy, "priority");
assert_eq!(ctx.cluster.policies, vec!["priority", "gang"]);
assert_eq!(ctx.cluster.storage, "sqlite://flame.db");
assert_eq!(ctx.cluster.executors.shim, Shim::Host);
assert_eq!(ctx.cluster.limits.max_executors, 10);
Expand Down
5 changes: 4 additions & 1 deletion flmadm/src/managers/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ cluster:
name: flame
endpoint: "http://127.0.0.1:8080"
slot: "cpu=1,mem=2g"
policy: proportion
policies:
- priority
- fairshare
- gang
storage: "fs://{prefix}/data"
executors:
shim: host
Expand Down
39 changes: 0 additions & 39 deletions installer/flame-cluster.yaml

This file was deleted.

30 changes: 0 additions & 30 deletions installer/flame-console.yaml

This file was deleted.

30 changes: 0 additions & 30 deletions installer/flame-executor-manager.yaml

This file was deleted.

36 changes: 0 additions & 36 deletions installer/flame-session-manager.yaml

This file was deleted.

4 changes: 0 additions & 4 deletions installer/flame-system.yaml

This file was deleted.

13 changes: 0 additions & 13 deletions installer/fsm-service.yaml

This file was deleted.

15 changes: 0 additions & 15 deletions installer/kustomization.yaml

This file was deleted.

5 changes: 4 additions & 1 deletion perf/flamepy/conf/flame-cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ cluster:
name: flame
endpoint: "http://flame-session-manager:8080"
slot: "cpu=2,mem=1g"
policy: priority
policies:
- priority
- fairshare
- gang
storage: none
schedule_interval: 100
pprof:
Expand Down
2 changes: 1 addition & 1 deletion session_manager/src/controller/executors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ mod tests {
endpoint: "http://localhost:8080".to_string(),
storage: url,
slot: ResourceRequirement::default(),
policy: "fifo".to_string(),
policies: vec!["priority".to_string(), "gang".to_string()],
schedule_interval: 1000,
executors: FlameExecutors {
shim: Shim::default(),
Expand Down
2 changes: 1 addition & 1 deletion session_manager/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,7 @@ mod tests {
endpoint: "http://localhost:8080".to_string(),
storage: url,
slot: ResourceRequirement::default(),
policy: "fifo".to_string(),
policies: vec!["priority".to_string(), "gang".to_string()],
schedule_interval: 1000,
executors: FlameExecutors {
shim: Shim::default(),
Expand Down
2 changes: 1 addition & 1 deletion session_manager/src/controller/nodes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ mod tests {
endpoint: "http://localhost:8080".to_string(),
storage: url,
slot: ResourceRequirement::default(),
policy: "fifo".to_string(),
policies: vec!["priority".to_string(), "gang".to_string()],
schedule_interval: 1000,
executors: FlameExecutors {
shim: Shim::default(),
Expand Down
4 changes: 2 additions & 2 deletions session_manager/src/scheduler/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ pub struct Context {
}

impl Context {
pub fn new(controller: ControllerPtr) -> Result<Self, FlameError> {
pub fn new(controller: ControllerPtr, policies: &[String]) -> Result<Self, FlameError> {
let snapshot = controller.snapshot()?;
let plugins = PluginManager::setup(&snapshot.clone())?;
let plugins = PluginManager::setup(&snapshot.clone(), policies)?;

Ok(Context {
snapshot,
Expand Down
21 changes: 17 additions & 4 deletions session_manager/src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,15 @@ struct ScheduleRunner {
impl FlameThread for ScheduleRunner {
async fn run(&self, flame_ctx: FlameClusterContext) -> Result<(), FlameError> {
let schedule_interval = flame_ctx.cluster.schedule_interval;
tracing::info!("Scheduler started with interval: {}ms", schedule_interval);
let policies = &flame_ctx.cluster.policies;
tracing::info!(
"Scheduler started with interval: {}ms, enabled policies: {:?}",
schedule_interval,
policies
);

loop {
let mut ctx = Context::new(self.controller.clone())?;
let mut ctx = Context::new(self.controller.clone(), policies)?;

// Same `ctx` (and thus same in-memory `plugins`) for every action: Dispatch mutations
// are visible to Allocate (e.g. Gang `is_fulfilled` / `is_ready` after binds).
Expand Down Expand Up @@ -199,7 +204,11 @@ mod tests {

for i in 0..10 {
let snapshot = controller.snapshot()?;
let plugins = PluginManager::setup(&snapshot.clone())?;
let default_policies: Vec<String> = common::ctx::DEFAULT_POLICIES
.iter()
.map(|s| s.to_string())
.collect();
let plugins = PluginManager::setup(&snapshot.clone(), &default_policies)?;

let mut ctx = Context {
snapshot: snapshot.clone(),
Expand Down Expand Up @@ -245,7 +254,11 @@ mod tests {
.register_node(&new_test_node("node_1".to_string())),
)?;

let mut ctx = Context::new(controller.clone())?;
let default_policies: Vec<String> = common::ctx::DEFAULT_POLICIES
.iter()
.map(|s| s.to_string())
.collect();
let mut ctx = Context::new(controller.clone(), &default_policies)?;
let plugins_ptr = Arc::as_ptr(&ctx.plugins);
for action in ctx.actions.clone() {
tokio_test::block_on(action.execute(&mut ctx))?;
Expand Down
Loading
Loading