Skip to content

Commit

Permalink
spreadscaler can compute commands
Browse files Browse the repository at this point in the history
Signed-off-by: Brooks Townsend <brooks@cosmonic.com>
  • Loading branch information
brooksmtownsend committed Apr 6, 2023
1 parent bb5d18f commit 70e8ee4
Showing 1 changed file with 195 additions and 23 deletions.
218 changes: 195 additions & 23 deletions src/scaler/spreadscaler.rs
Expand Up @@ -2,13 +2,14 @@ use anyhow::Result;
use async_trait::async_trait;
use std::{
cmp::{self, Ordering},
collections::HashSet,
collections::{HashMap, HashSet},
};
use tracing::log::warn;

use crate::{
commands::{Command, StartActor, StopActor},
events::Event,
model::{Spread, SpreadScalerProperty},
model::{Spread, SpreadScalerProperty, DEFAULT_SPREAD_WEIGHT},
scaler::Scaler,
storage::{Actor, Host, ReadStore},
};
Expand Down Expand Up @@ -85,34 +86,109 @@ impl<S: ReadStore + Send + Sync> ActorSpreadScaler<S> {

/// Given a spread config, compute the necessary commands to properly spread actors across a set
/// of hosts
fn spread_commands(&self, spread_config: SpreadScalerProperty) -> Result<HashSet<Command>> {
// To go from a Vec<Spread> and a Vec<Host> to a set of commands, we need to:

// convert replicas + weights to concrete numbers of actors. Relative spread
let spread = compute_spread(&spread_config)?;

// get all hosts

// partition hosts based on labels that meet the spread requirements

// create host commands
todo!()
async fn spread_commands(&self) -> Result<HashSet<Command>> {
let spread_requirements = compute_spread(&self.config.spread_config)?;

let hosts = self.store.list::<Host>(&self.config.lattice_id).await?;

let actor_id = self
.store
.list::<Actor>(&self.config.lattice_id)
.await?
.iter()
.find(|(_id, actor)| actor.reference == self.config.actor_reference)
.map(|(id, _actor)| id.to_owned())
// Default here means the below `get` will find zero running actors, which is fine because
// that accurately describes the current lattice having zero instances.
.unwrap_or_default();

// NOTE(brooksmtownsend) it's easier to assign one host per list of requirements than
// balance within those requirements. Users should be specific with their requirements
// as wadm is not responsible for ambiguity

let commands = spread_requirements
.iter()
.filter_map(|(spread, count)| {
let eligible_hosts = eligible_hosts(&hosts, spread);
if let Some(first_host) = eligible_hosts.get(0) {
// NOTE(brooksmtownsend): Once we care about annotations, we'll need to check that annotations
// match here to compute the current count
// Compute all current actors running on this spread's eligible hosts
let current_count = eligible_hosts.iter().fold(0, |total, host| {
total + *host.actors.get(&actor_id).unwrap_or(&0)
});

let final_cmd = match current_count.cmp(count) {
// No action needed
Ordering::Equal => None,
// Start actors to reach desired replicas
Ordering::Less => Some(Command::StartActor(StartActor {
reference: self.config.actor_reference.to_owned(),
host_id: first_host.id.to_owned(),
count: count - current_count,
})),
// Stop actors to reach desired replicas
Ordering::Greater => Some(Command::StopActor(StopActor {
actor_id: actor_id.to_owned(),
host_id: first_host.id.to_owned(),
count: current_count - count,
})),
};
final_cmd
} else {
// No hosts were eligible, so we can't attempt to add or remove actors
None
}
})
// Collapse multiple commands for the same actor and same host
// into single StopActor commands
.fold(HashSet::new(), |mut cmd_set, cmd| {
if let Some(prev_cmd) = cmd_set.get(&cmd) {
match (prev_cmd, cmd) {
(Command::StartActor(prev), Command::StartActor(new)) => {
let thing_to_add = Command::StartActor(StartActor {
count: prev.count + new.count,
..new
});
cmd_set.replace(thing_to_add);
cmd_set
}
_ => cmd_set,
}
} else {
cmd_set.insert(cmd);
cmd_set
}
});

Ok(commands)
}
}

fn eligible_hosts<'a>(all_hosts: &'a HashMap<String, Host>, spread: &Spread) -> Vec<&'a Host> {
all_hosts
.iter()
.filter(|(_id, host)| {
spread
.requirements
.iter()
.all(|(key, value)| host.labels.get(key).map(|v| v.eq(value)).unwrap_or(false))
})
.map(|(_id, host)| host)
.collect::<Vec<&Host>>()
}

/// Given a spread config, return a vector of tuples that represents the spread
/// and the actual number of actors to start for a specific spread requirement
fn compute_spread(spread_config: &SpreadScalerProperty) -> Result<Vec<(&Spread, usize)>> {
// ideal return would be a Vec<(num_actors, Spread)>?
let replicas = spread_config.replicas;
// TODO: what should the default weight be? 100 would clobber anyone who uses 1, maybe default is the
// same as the minimum weight specified?
let default_weight: usize = 100;

let total_weight = spread_config
.spread
.iter()
.map(|s| s.weight.unwrap_or(default_weight))
.map(|s| s.weight.unwrap_or(DEFAULT_SPREAD_WEIGHT))
.sum::<usize>();

let spreads: Vec<(&Spread, usize)> = spread_config
Expand All @@ -122,7 +198,7 @@ fn compute_spread(spread_config: &SpreadScalerProperty) -> Result<Vec<(&Spread,
(
s,
// Order is important here since usizes chop off remaining decimals
(replicas * s.weight.unwrap_or(default_weight)) / total_weight,
(replicas * s.weight.unwrap_or(DEFAULT_SPREAD_WEIGHT)) / total_weight,
)
})
.collect();
Expand All @@ -146,9 +222,13 @@ fn compute_spread(spread_config: &SpreadScalerProperty) -> Result<Vec<(&Spread,
})
.collect()
}
// Greater is not possible with the way we do decimal division,
// usizes run down
Ordering::Greater | Ordering::Equal => spreads,
// This isn't possible (usizes round down) but I added an arm _just in case_
// there was a case that I didn't imagine
Ordering::Greater => {
warn!("Requesting more actor instances than were specified");
spreads
}
Ordering::Equal => spreads,
};

Ok(spreads)
Expand All @@ -157,9 +237,20 @@ fn compute_spread(spread_config: &SpreadScalerProperty) -> Result<Vec<(&Spread,
#[cfg(test)]
mod test {
use anyhow::Result;
use std::collections::BTreeMap;
use chrono::Utc;
use std::{
collections::{BTreeMap, HashMap, HashSet},
sync::Arc,
};

use crate::model::{Spread, SpreadScalerProperty};
use crate::{
commands::{Command, StartActor},
model::{Spread, SpreadScalerProperty},
scaler::spreadscaler::ActorSpreadScaler,
storage::{Host, Store},
test_util::TestStore,
workers::EventWorker,
};

use super::compute_spread;

Expand Down Expand Up @@ -313,4 +404,85 @@ mod test {

Ok(())
}

#[tokio::test]
async fn can_compute_spread_commands() -> Result<()> {
let lattice_id = "hoohah_multi_stop_actor";
let actor_reference = "fakecloud.azurecr.io/echo:0.3.4".to_string();
let actor_id = "MASDASDIAMAREALACTOR";
let host_id = "NASDASDIMAREALHOST";

let store = Arc::new(TestStore::default());
let worker = EventWorker::new(store.clone(), HashMap::default());

// STATE SETUP BEGIN, ONE HOST
store
.store(
lattice_id,
host_id.to_string(),
Host {
actors: HashMap::new(),
friendly_name: "hey".to_string(),
labels: HashMap::new(),
annotations: HashMap::new(),
providers: HashSet::new(),
uptime_seconds: 123,
version: None,
id: host_id.to_string(),
last_seen: Utc::now(),
},
)
.await?;

// Ensure we compute if a weights aren't specified
let complex_spread = SpreadScalerProperty {
replicas: 103,
spread: vec![
Spread {
// 9 + 1 (remainder trip)
name: "ComplexOne".to_string(),
requirements: BTreeMap::new(),
weight: Some(42),
},
Spread {
// 0 + 1 (remainder trip)
name: "ComplexTwo".to_string(),
requirements: BTreeMap::new(),
weight: Some(3),
},
Spread {
// 8
name: "ComplexThree".to_string(),
requirements: BTreeMap::new(),
weight: Some(37),
},
Spread {
// 84
name: "ComplexFour".to_string(),
requirements: BTreeMap::new(),
weight: Some(384),
},
],
};

let spreadscaler = ActorSpreadScaler::new(
store.clone(),
actor_reference.to_string(),
lattice_id.to_string(),
complex_spread,
);

let cmds = spreadscaler.spread_commands().await?;
assert_eq!(cmds.len(), 1);
assert_eq!(
cmds.iter().next().expect("should have one command"),
&Command::StartActor(StartActor {
reference: actor_reference.to_string(),
host_id: host_id.to_string(),
count: 103,
})
);

Ok(())
}
}

0 comments on commit 70e8ee4

Please sign in to comment.