Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(feat) Implement the ActorSpreadScaler #75

Merged
merged 3 commits into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
40 changes: 19 additions & 21 deletions src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ macro_rules! from_impl {
}

/// All possible compensatory commands for a lattice
#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
// In order to keep things clean, we are using untagged enum serialization. So it will just try to
// match on data without tagging (see https://serde.rs/enum-representations.html for more info on
// what the other options look like). These types are purely internal to wadm, but for greater
Expand All @@ -35,7 +35,7 @@ pub enum Command {
}

/// Struct for the StartActor command
#[derive(Clone, Debug, Eq, Serialize, Deserialize, Default)]
#[derive(Clone, Debug, Serialize, Deserialize, Default, Eq)]
pub struct StartActor {
/// The OCI or bindle reference to start
pub reference: String,
Expand All @@ -45,25 +45,24 @@ pub struct StartActor {
pub count: usize,
/// The name of the model/manifest that generated this command
pub model_name: String,
/// Additional annotations to attach on this command
pub annotations: HashMap<String, String>,
}

from_impl!(StartActor);

impl PartialEq for StartActor {
fn eq(&self, other: &StartActor) -> bool {
self.reference == other.reference && self.host_id == other.host_id
}
}

impl Hash for StartActor {
fn hash<H: Hasher>(&self, state: &mut H) {
self.reference.hash(state);
self.host_id.hash(state);
fn eq(&self, other: &Self) -> bool {
self.reference == other.reference
&& self.host_id == other.host_id
&& self.count == other.count
&& self.model_name == other.model_name
&& self.annotations == other.annotations
}
}

/// Struct for the StopActor command
#[derive(Clone, Debug, Eq, Serialize, Deserialize, Default)]
#[derive(Clone, Debug, Serialize, Deserialize, Default, Eq)]
pub struct StopActor {
/// The ID of the actor to stop
pub actor_id: String,
Expand All @@ -73,20 +72,19 @@ pub struct StopActor {
pub count: usize,
/// The name of the model/manifest that generated this command
pub model_name: String,
/// Additional annotations to attach on this command
pub annotations: HashMap<String, String>,
}

from_impl!(StopActor);

impl PartialEq for StopActor {
fn eq(&self, other: &StopActor) -> bool {
self.actor_id == other.actor_id && self.host_id == other.host_id
}
}

impl Hash for StopActor {
fn hash<H: Hasher>(&self, state: &mut H) {
self.actor_id.hash(state);
self.host_id.hash(state);
fn eq(&self, other: &Self) -> bool {
self.actor_id == other.actor_id
&& self.host_id == other.host_id
&& self.count == other.count
&& self.model_name == other.model_name
&& self.annotations == other.annotations
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ pub struct SpreadScalerProperty {
}

/// Configuration for various spreading requirements
#[derive(Debug, Serialize, Deserialize, Clone)]
#[derive(Default, Debug, Serialize, Deserialize, Clone)]
pub struct Spread {
/// The name of this spread requirement
pub name: String,
Expand Down
9 changes: 5 additions & 4 deletions src/scaler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use anyhow::Result;
use async_trait::async_trait;
use std::collections::HashSet;

use crate::{commands::Command, events::Event};

mod simplescaler;
pub mod spreadscaler;

/// A trait describing a struct that can be configured to compute the difference between
/// desired state and configured state, returning a set of commands to approach desired state.
Expand All @@ -21,11 +21,12 @@ pub trait Scaler {
type Config: Send + Sync;

/// Provide a scaler with configuration to use internally when computing commands
fn update_config(&mut self, config: Self::Config) -> Result<bool>;
/// This should trigger a reconcile with the new configuration
async fn update_config(&mut self, config: Self::Config) -> Result<Vec<Command>>;

/// Compute commands that must be taken given an event that changes the lattice state
async fn handle_event(&self, event: Event) -> Result<HashSet<Command>>;
async fn handle_event(&self, event: &Event) -> Result<Vec<Command>>;

/// Compute commands that must be taken to achieve desired state as specified in config
async fn reconcile(&self) -> Result<HashSet<Command>>;
async fn reconcile(&self) -> Result<Vec<Command>>;
}
90 changes: 59 additions & 31 deletions src/scaler/simplescaler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Result;
use async_trait::async_trait;
use std::collections::HashSet;
use std::collections::HashMap;

use crate::{
commands::{Command, StartActor, StopActor},
Expand All @@ -18,6 +18,8 @@ struct SimpleScalerConfig {
lattice_id: String,
// Required configuration in the `simplescaler` block
replicas: usize,
// Name of the model this SimpleScaler is associated with
model_name: String,
}

/// The SimpleScaler ensures that a certain number of replicas are running
Expand All @@ -35,44 +37,52 @@ struct SimpleActorScaler<S: ReadStore + Send + Sync> {
impl<S: ReadStore + Send + Sync> Scaler for SimpleActorScaler<S> {
type Config = SimpleScalerConfig;

fn update_config(&mut self, config: Self::Config) -> Result<bool> {
async fn update_config(&mut self, config: Self::Config) -> Result<Vec<Command>> {
self.config = config;
Ok(true)

self.reconcile().await
}

async fn handle_event(&self, event: Event) -> Result<HashSet<Command>> {
async fn handle_event(&self, event: &Event) -> Result<Vec<Command>> {
match event {
Event::ActorStarted(_) | Event::ActorStopped(_) | Event::HostStopped(_) => {
self.compute_actor_commands(&self.store).await
}
// No other event impacts the job of this scaler so we can ignore it
_ => Ok(HashSet::new()),
_ => Ok(Vec::new()),
}
}

async fn reconcile(&self) -> Result<HashSet<Command>> {
async fn reconcile(&self) -> Result<Vec<Command>> {
self.compute_actor_commands(&self.store).await
}
}

impl<S: ReadStore + Send + Sync> SimpleActorScaler<S> {
#[allow(unused)]
/// Construct a new SimpleActorScaler with specified configuration values
fn new(store: S, actor_reference: String, lattice_id: String, replicas: usize) -> Self {
fn new(
store: S,
actor_reference: String,
lattice_id: String,
replicas: usize,
model_name: String,
) -> Self {
Self {
store,
config: SimpleScalerConfig {
actor_reference,
lattice_id,
replicas,
model_name,
},
}
}

/// Given a readable store containing the state of the lattice, compute the
/// required commands to either stop extra actor instances or start new
/// actor instances to reach the configured replica count
async fn compute_actor_commands(&self, store: &S) -> Result<HashSet<Command>> {
async fn compute_actor_commands(&self, store: &S) -> Result<Vec<Command>> {
// NOTE(brooksmtownsend): This will fail to look up the actor ID if an actor is not running in the lattice currently.
// This is acceptable for the simplescaler but might require a helper function in the future
let actor_id = store
Expand All @@ -99,46 +109,49 @@ impl<S: ReadStore + Send + Sync> SimpleActorScaler<S> {
#[allow(clippy::comparison_chain)]
if count > 0 {
// Choosing to retrieve the first host that an actor is running on over querying the store for efficiency
let host_id = actors.count.keys().next().cloned().unwrap_or_default();
let host_id = actors.instances.keys().next().cloned().unwrap_or_default();

HashSet::from_iter([Command::StartActor(StartActor {
vec![Command::StartActor(StartActor {
reference: self.config.actor_reference.to_owned(),
count: count as usize, // It's a positive integer so we know this will succeed
host_id,
model_name: "fake".into(),
})])
model_name: self.config.model_name.clone(),
annotations: HashMap::new(),
})]
} else if count < 0 {
// This is written iteratively rather than functionally just because it reads better.
let mut remaining = count.unsigned_abs() as usize;
let mut commands = HashSet::new();
let mut commands = Vec::new();

// For each host running this actor, request actor stops until
// the total number of stops equals the number of extra instances
for (host_id, count) in actors.count {
for (host_id, instances) in actors.instances {
if remaining == 0 {
break;
} else if remaining >= count {
commands.insert(Command::StopActor(StopActor {
} else if remaining >= instances.len() {
commands.push(Command::StopActor(StopActor {
actor_id: actor_id.to_owned(),
host_id,
count,
count: instances.len(),
model_name: "fake".into(),
annotations: HashMap::new(),
}));
remaining -= count;
remaining -= instances.len();
} else {
commands.insert(Command::StopActor(StopActor {
commands.push(Command::StopActor(StopActor {
actor_id: actor_id.to_owned(),
host_id,
count: remaining,
model_name: "fake".into(),
annotations: HashMap::new(),
}));
remaining = 0;
}
}

commands
} else {
HashSet::new()
Vec::new()
}
}
None => {
Expand All @@ -149,12 +162,13 @@ impl<S: ReadStore + Send + Sync> SimpleActorScaler<S> {
.next()
.map(|(host_id, _host)| host_id)
{
HashSet::from_iter([Command::StartActor(StartActor {
vec![Command::StartActor(StartActor {
reference: self.config.actor_reference.to_owned(),
count: self.config.replicas,
host_id: host_id.to_owned(),
model_name: "fake".into(),
})])
model_name: self.config.model_name.clone(),
annotations: HashMap::new(),
})]
} else {
return Err(anyhow::anyhow!(
"No hosts running, unable to return actor start commands"
Expand All @@ -170,18 +184,21 @@ impl<S: ReadStore + Send + Sync> SimpleActorScaler<S> {
mod test {
use std::{collections::HashMap, sync::Arc};

use tokio::sync::RwLock;

use crate::{
commands::{Command, StartActor},
consumers::{manager::Worker, ScopedMessage},
events::{ActorClaims, ActorStarted, Event, HostStarted},
scaler::{simplescaler::SimpleActorScaler, Scaler},
test_util::TestStore,
test_util::{TestLatticeSource, TestStore},
workers::EventWorker,
};

#[tokio::test]
async fn can_return_error_with_no_hosts() {
let lattice_id = "hoohah_no_host";
let model_name = "FAKEECHO";
let actor_reference = "fakecloud.azurecr.io/echo:0.3.4".to_string();
let replicas = 12;

Expand All @@ -191,6 +208,7 @@ mod test {
actor_reference,
lattice_id.to_string(),
replicas,
model_name.to_string(),
);

let cmds = simple_scaler.reconcile().await;
Expand All @@ -203,6 +221,7 @@ mod test {

#[tokio::test]
async fn can_request_start_actor() {
let model_name = "FAKEECHO";
let lattice_id = "hoohah_start_actor";
let actor_reference = "fakecloud.azurecr.io/echo:0.3.4".to_string();
let replicas = 12;
Expand All @@ -211,7 +230,11 @@ mod test {
// Lattice State: One empty host

let store = Arc::new(TestStore::default());
let worker = EventWorker::new(store.clone(), HashMap::default());
let lattice_source = TestLatticeSource {
claims: HashMap::default(),
inventory: Arc::new(RwLock::new(HashMap::default())),
};
let worker = EventWorker::new(store.clone(), lattice_source);

let host_id = "NASDASDIAMAREALHOST".to_string();
let host_name = "I am a real host".to_string();
Expand All @@ -238,37 +261,41 @@ mod test {
actor_reference.to_string(),
lattice_id.to_string(),
replicas,
model_name.to_string(),
);

let cmds = simple_scaler
.reconcile()
.await
.expect("Should have computed a set of commands");
assert_eq!(cmds.len(), 1);
let command = cmds
.iter()
.next()
.expect("Should have computed one command");
let command = cmds.first().expect("Should have computed one command");
assert_eq!(
command,
&Command::StartActor(StartActor {
reference: actor_reference,
host_id,
count: replicas,
model_name: "fake".into(),
model_name: model_name.to_string(),
annotations: HashMap::new()
})
)
}

#[tokio::test]
async fn can_request_multiple_stop_actor() {
let model_name = "MULTI_STOP";
let lattice_id = "hoohah_multi_stop_actor";
let actor_reference = "fakecloud.azurecr.io/echo:0.3.4".to_string();
let actor_id = "MASDASDIAMAREALACTOR";
let replicas = 2;

let store = Arc::new(TestStore::default());
let worker = EventWorker::new(store.clone(), HashMap::default());
let lattice_source = TestLatticeSource {
claims: HashMap::default(),
inventory: Arc::new(RwLock::new(HashMap::default())),
};
let worker = EventWorker::new(store.clone(), lattice_source);

// *** STATE SETUP BEGIN ***
// Lattice State: One host with 4 instances of the actor, and one host with 3 instances
Expand Down Expand Up @@ -355,6 +382,7 @@ mod test {
actor_reference.to_string(),
lattice_id.to_string(),
replicas,
model_name.to_string(),
);

let cmds = simple_scaler
Expand Down