Skip to content

Commit

Permalink
reworked spreadscaler to use annotations
Browse files Browse the repository at this point in the history
Signed-off-by: Brooks Townsend <brooks@cosmonic.com>

satisfied clippy

Signed-off-by: Brooks Townsend <brooks@cosmonic.com>

mostly finished with state refactor, Scaler vec

Signed-off-by: Brooks Townsend <brooks@cosmonic.com>

Corrected simplescaler to match new state

Signed-off-by: Brooks Townsend <brooks@cosmonic.com>

modified tests in workers to use inventory

Signed-off-by: Brooks Townsend <brooks@cosmonic.com>

mostly updated spreadscaler tests

Signed-off-by: Brooks Townsend <brooks@cosmonic.com>

updated existing tests to work with annotations

Signed-off-by: Brooks Townsend <brooks@cosmonic.com>

reworked spreadscaler to use annotations

Signed-off-by: Brooks Townsend <brooks@cosmonic.com>

ensure spreadscaler annotations include wadm

Signed-off-by: Brooks Townsend <brooks@cosmonic.com>

pr cleanup, satisfy clippy overlord

Signed-off-by: Brooks Townsend <brooks@cosmonic.com>

updated state for event test

Signed-off-by: Brooks Townsend <brooks@cosmonic.com>

resolved nits in PR comments

Signed-off-by: Brooks Townsend <brooks@cosmonic.com>

compute spread requirements and store, once

Signed-off-by: Brooks Townsend <brooks@cosmonic.com>

removed Hash impl's and derives from command

Signed-off-by: Brooks Townsend <brooks@cosmonic.com>

efficiently initialize actor ID, remove extra annotation

Signed-off-by: Brooks Townsend <brooks@cosmonic.com>
  • Loading branch information
brooksmtownsend committed Apr 12, 2023
1 parent 5fe0a34 commit 5674ec5
Show file tree
Hide file tree
Showing 14 changed files with 1,236 additions and 495 deletions.
40 changes: 19 additions & 21 deletions src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ macro_rules! from_impl {
}

/// All possible compensatory commands for a lattice
#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
// In order to keep things clean, we are using untagged enum serialization. So it will just try to
// match on data without tagging (see https://serde.rs/enum-representations.html for more info on
// what the other options look like). These types are purely internal to wadm, but for greater
Expand All @@ -35,7 +35,7 @@ pub enum Command {
}

/// Struct for the StartActor command
#[derive(Clone, Debug, Eq, Serialize, Deserialize, Default)]
#[derive(Clone, Debug, Serialize, Deserialize, Default, Eq)]
pub struct StartActor {
/// The OCI or bindle reference to start
pub reference: String,
Expand All @@ -45,25 +45,24 @@ pub struct StartActor {
pub count: usize,
/// The name of the model/manifest that generated this command
pub model_name: String,
/// Additional annotations to attach on this command
pub annotations: HashMap<String, String>,
}

from_impl!(StartActor);

impl PartialEq for StartActor {
fn eq(&self, other: &StartActor) -> bool {
self.reference == other.reference && self.host_id == other.host_id
}
}

impl Hash for StartActor {
fn hash<H: Hasher>(&self, state: &mut H) {
self.reference.hash(state);
self.host_id.hash(state);
fn eq(&self, other: &Self) -> bool {
self.reference == other.reference
&& self.host_id == other.host_id
&& self.count == other.count
&& self.model_name == other.model_name
&& self.annotations == other.annotations
}
}

/// Struct for the StopActor command
#[derive(Clone, Debug, Eq, Serialize, Deserialize, Default)]
#[derive(Clone, Debug, Serialize, Deserialize, Default, Eq)]
pub struct StopActor {
/// The ID of the actor to stop
pub actor_id: String,
Expand All @@ -73,20 +72,19 @@ pub struct StopActor {
pub count: usize,
/// The name of the model/manifest that generated this command
pub model_name: String,
/// Additional annotations to attach on this command
pub annotations: HashMap<String, String>,
}

from_impl!(StopActor);

impl PartialEq for StopActor {
fn eq(&self, other: &StopActor) -> bool {
self.actor_id == other.actor_id && self.host_id == other.host_id
}
}

impl Hash for StopActor {
fn hash<H: Hasher>(&self, state: &mut H) {
self.actor_id.hash(state);
self.host_id.hash(state);
fn eq(&self, other: &Self) -> bool {
self.actor_id == other.actor_id
&& self.host_id == other.host_id
&& self.count == other.count
&& self.model_name == other.model_name
&& self.annotations == other.annotations
}
}

Expand Down
7 changes: 3 additions & 4 deletions src/scaler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use anyhow::Result;
use async_trait::async_trait;
use std::collections::HashSet;

use crate::{commands::Command, events::Event};

Expand All @@ -23,11 +22,11 @@ pub trait Scaler {

/// Provide a scaler with configuration to use internally when computing commands
/// This should trigger a reconcile with the new configuration
async fn update_config(&mut self, config: Self::Config) -> Result<HashSet<Command>>;
async fn update_config(&mut self, config: Self::Config) -> Result<Vec<Command>>;

/// Compute commands that must be taken given an event that changes the lattice state
async fn handle_event(&self, event: Event) -> Result<HashSet<Command>>;
async fn handle_event(&self, event: &Event) -> Result<Vec<Command>>;

/// Compute commands that must be taken to achieve desired state as specified in config
async fn reconcile(&self) -> Result<HashSet<Command>>;
async fn reconcile(&self) -> Result<Vec<Command>>;
}
87 changes: 57 additions & 30 deletions src/scaler/simplescaler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Result;
use async_trait::async_trait;
use std::collections::HashSet;
use std::collections::HashMap;

use crate::{
commands::{Command, StartActor, StopActor},
Expand All @@ -18,6 +18,8 @@ struct SimpleScalerConfig {
lattice_id: String,
// Required configuration in the `simplescaler` block
replicas: usize,
// Name of the model this SimpleScaler is associated with
model_name: String,
}

/// The SimpleScaler ensures that a certain number of replicas are running
Expand All @@ -35,45 +37,52 @@ struct SimpleActorScaler<S: ReadStore + Send + Sync> {
impl<S: ReadStore + Send + Sync> Scaler for SimpleActorScaler<S> {
type Config = SimpleScalerConfig;

async fn update_config(&mut self, config: Self::Config) -> Result<HashSet<Command>> {
async fn update_config(&mut self, config: Self::Config) -> Result<Vec<Command>> {
self.config = config;

self.reconcile().await
}

async fn handle_event(&self, event: Event) -> Result<HashSet<Command>> {
async fn handle_event(&self, event: &Event) -> Result<Vec<Command>> {
match event {
Event::ActorStarted(_) | Event::ActorStopped(_) | Event::HostStopped(_) => {
self.compute_actor_commands(&self.store).await
}
// No other event impacts the job of this scaler so we can ignore it
_ => Ok(HashSet::new()),
_ => Ok(Vec::new()),
}
}

async fn reconcile(&self) -> Result<HashSet<Command>> {
async fn reconcile(&self) -> Result<Vec<Command>> {
self.compute_actor_commands(&self.store).await
}
}

impl<S: ReadStore + Send + Sync> SimpleActorScaler<S> {
#[allow(unused)]
/// Construct a new SimpleActorScaler with specified configuration values
fn new(store: S, actor_reference: String, lattice_id: String, replicas: usize) -> Self {
fn new(
store: S,
actor_reference: String,
lattice_id: String,
replicas: usize,
model_name: String,
) -> Self {
Self {
store,
config: SimpleScalerConfig {
actor_reference,
lattice_id,
replicas,
model_name,
},
}
}

/// Given a readable store containing the state of the lattice, compute the
/// required commands to either stop extra actor instances or start new
/// actor instances to reach the configured replica count
async fn compute_actor_commands(&self, store: &S) -> Result<HashSet<Command>> {
async fn compute_actor_commands(&self, store: &S) -> Result<Vec<Command>> {
// NOTE(brooksmtownsend): This will fail to look up the actor ID if an actor is not running in the lattice currently.
// This is acceptable for the simplescaler but might require a helper function in the future
let actor_id = store
Expand All @@ -100,46 +109,49 @@ impl<S: ReadStore + Send + Sync> SimpleActorScaler<S> {
#[allow(clippy::comparison_chain)]
if count > 0 {
// Choosing to retrieve the first host that an actor is running on over querying the store for efficiency
let host_id = actors.count.keys().next().cloned().unwrap_or_default();
let host_id = actors.instances.keys().next().cloned().unwrap_or_default();

HashSet::from_iter([Command::StartActor(StartActor {
vec![Command::StartActor(StartActor {
reference: self.config.actor_reference.to_owned(),
count: count as usize, // It's a positive integer so we know this will succeed
host_id,
model_name: "fake".into(),
})])
model_name: self.config.model_name.clone(),
annotations: HashMap::new(),
})]
} else if count < 0 {
// This is written iteratively rather than functionally just because it reads better.
let mut remaining = count.unsigned_abs() as usize;
let mut commands = HashSet::new();
let mut commands = Vec::new();

// For each host running this actor, request actor stops until
// the total number of stops equals the number of extra instances
for (host_id, count) in actors.count {
for (host_id, instances) in actors.instances {
if remaining == 0 {
break;
} else if remaining >= count {
commands.insert(Command::StopActor(StopActor {
} else if remaining >= instances.len() {
commands.push(Command::StopActor(StopActor {
actor_id: actor_id.to_owned(),
host_id,
count,
count: instances.len(),
model_name: "fake".into(),
annotations: HashMap::new(),
}));
remaining -= count;
remaining -= instances.len();
} else {
commands.insert(Command::StopActor(StopActor {
commands.push(Command::StopActor(StopActor {
actor_id: actor_id.to_owned(),
host_id,
count: remaining,
model_name: "fake".into(),
annotations: HashMap::new(),
}));
remaining = 0;
}
}

commands
} else {
HashSet::new()
Vec::new()
}
}
None => {
Expand All @@ -150,12 +162,13 @@ impl<S: ReadStore + Send + Sync> SimpleActorScaler<S> {
.next()
.map(|(host_id, _host)| host_id)
{
HashSet::from_iter([Command::StartActor(StartActor {
vec![Command::StartActor(StartActor {
reference: self.config.actor_reference.to_owned(),
count: self.config.replicas,
host_id: host_id.to_owned(),
model_name: "fake".into(),
})])
model_name: self.config.model_name.clone(),
annotations: HashMap::new(),
})]
} else {
return Err(anyhow::anyhow!(
"No hosts running, unable to return actor start commands"
Expand All @@ -171,18 +184,21 @@ impl<S: ReadStore + Send + Sync> SimpleActorScaler<S> {
mod test {
use std::{collections::HashMap, sync::Arc};

use tokio::sync::RwLock;

use crate::{
commands::{Command, StartActor},
consumers::{manager::Worker, ScopedMessage},
events::{ActorClaims, ActorStarted, Event, HostStarted},
scaler::{simplescaler::SimpleActorScaler, Scaler},
test_util::TestStore,
test_util::{TestLatticeSource, TestStore},
workers::EventWorker,
};

#[tokio::test]
async fn can_return_error_with_no_hosts() {
let lattice_id = "hoohah_no_host";
let model_name = "FAKEECHO";
let actor_reference = "fakecloud.azurecr.io/echo:0.3.4".to_string();
let replicas = 12;

Expand All @@ -192,6 +208,7 @@ mod test {
actor_reference,
lattice_id.to_string(),
replicas,
model_name.to_string(),
);

let cmds = simple_scaler.reconcile().await;
Expand All @@ -204,6 +221,7 @@ mod test {

#[tokio::test]
async fn can_request_start_actor() {
let model_name = "FAKEECHO";
let lattice_id = "hoohah_start_actor";
let actor_reference = "fakecloud.azurecr.io/echo:0.3.4".to_string();
let replicas = 12;
Expand All @@ -212,7 +230,11 @@ mod test {
// Lattice State: One empty host

let store = Arc::new(TestStore::default());
let worker = EventWorker::new(store.clone(), HashMap::default());
let lattice_source = TestLatticeSource {
claims: HashMap::default(),
inventory: Arc::new(RwLock::new(HashMap::default())),
};
let worker = EventWorker::new(store.clone(), lattice_source);

let host_id = "NASDASDIAMAREALHOST".to_string();
let host_name = "I am a real host".to_string();
Expand All @@ -239,37 +261,41 @@ mod test {
actor_reference.to_string(),
lattice_id.to_string(),
replicas,
model_name.to_string(),
);

let cmds = simple_scaler
.reconcile()
.await
.expect("Should have computed a set of commands");
assert_eq!(cmds.len(), 1);
let command = cmds
.iter()
.next()
.expect("Should have computed one command");
let command = cmds.first().expect("Should have computed one command");
assert_eq!(
command,
&Command::StartActor(StartActor {
reference: actor_reference,
host_id,
count: replicas,
model_name: "fake".into(),
model_name: model_name.to_string(),
annotations: HashMap::new()
})
)
}

#[tokio::test]
async fn can_request_multiple_stop_actor() {
let model_name = "MULTI_STOP";
let lattice_id = "hoohah_multi_stop_actor";
let actor_reference = "fakecloud.azurecr.io/echo:0.3.4".to_string();
let actor_id = "MASDASDIAMAREALACTOR";
let replicas = 2;

let store = Arc::new(TestStore::default());
let worker = EventWorker::new(store.clone(), HashMap::default());
let lattice_source = TestLatticeSource {
claims: HashMap::default(),
inventory: Arc::new(RwLock::new(HashMap::default())),
};
let worker = EventWorker::new(store.clone(), lattice_source);

// *** STATE SETUP BEGIN ***
// Lattice State: One host with 4 instances of the actor, and one host with 3 instances
Expand Down Expand Up @@ -356,6 +382,7 @@ mod test {
actor_reference.to_string(),
lattice_id.to_string(),
replicas,
model_name.to_string(),
);

let cmds = simple_scaler
Expand Down

0 comments on commit 5674ec5

Please sign in to comment.