Skip to content

Commit

Permalink
mostly finished with state refactor, Scaler vec
Browse files Browse the repository at this point in the history
Signed-off-by: Brooks Townsend <brooks@cosmonic.com>
  • Loading branch information
brooksmtownsend committed Apr 11, 2023
1 parent ef779e7 commit 19c2140
Show file tree
Hide file tree
Showing 9 changed files with 2,359 additions and 2,120 deletions.
32 changes: 18 additions & 14 deletions src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub enum Command {
}

/// Struct for the StartActor command
#[derive(Clone, Debug, Eq, Serialize, Deserialize, Default)]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct StartActor {
/// The OCI or bindle reference to start
pub reference: String,
Expand All @@ -45,25 +45,27 @@ pub struct StartActor {
pub count: usize,
/// The name of the model/manifest that generated this command
pub model_name: String,
/// Additional annotations to attach on this command
pub annotations: HashMap<String, String>,
}

from_impl!(StartActor);

impl PartialEq for StartActor {
fn eq(&self, other: &StartActor) -> bool {
self.reference == other.reference && self.host_id == other.host_id
}
}

impl Hash for StartActor {
fn hash<H: Hasher>(&self, state: &mut H) {
self.reference.hash(state);
self.host_id.hash(state);
self.count.hash(state);
self.model_name.hash(state);

for annotation in &self.annotations {
annotation.hash(state)
}
}
}

/// Struct for the StopActor command
#[derive(Clone, Debug, Eq, Serialize, Deserialize, Default)]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct StopActor {
/// The ID of the actor to stop
pub actor_id: String,
Expand All @@ -73,20 +75,22 @@ pub struct StopActor {
pub count: usize,
/// The name of the model/manifest that generated this command
pub model_name: String,
/// Additional annotations to attach on this command
pub annotations: HashMap<String, String>,
}

from_impl!(StopActor);

impl PartialEq for StopActor {
fn eq(&self, other: &StopActor) -> bool {
self.actor_id == other.actor_id && self.host_id == other.host_id
}
}

impl Hash for StopActor {
fn hash<H: Hasher>(&self, state: &mut H) {
self.actor_id.hash(state);
self.host_id.hash(state);
self.count.hash(state);
self.model_name.hash(state);

for annotation in &self.annotations {
annotation.hash(state)
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/scaler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ pub trait Scaler {

/// Provide a scaler with configuration to use internally when computing commands
/// This should trigger a reconcile with the new configuration
async fn update_config(&mut self, config: Self::Config) -> Result<HashSet<Command>>;
async fn update_config(&mut self, config: Self::Config) -> Result<Vec<Command>>;

/// Compute commands that must be taken given an event that changes the lattice state
async fn handle_event(&self, event: Event) -> Result<HashSet<Command>>;
async fn handle_event(&self, event: &Event) -> Result<Vec<Command>>;

/// Compute commands that must be taken to achieve desired state as specified in config
async fn reconcile(&self) -> Result<HashSet<Command>>;
async fn reconcile(&self) -> Result<Vec<Command>>;
}
110 changes: 83 additions & 27 deletions src/scaler/simplescaler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Result;
use async_trait::async_trait;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

use crate::{
commands::{Command, StartActor, StopActor},
Expand All @@ -18,6 +18,8 @@ struct SimpleScalerConfig {
lattice_id: String,
// Required configuration in the `simplescaler` block
replicas: usize,
// Name of the model this SimpleScaler is associated with
model_name: String,
}

/// The SimpleScaler ensures that a certain number of replicas are running
Expand All @@ -35,45 +37,52 @@ struct SimpleActorScaler<S: ReadStore + Send + Sync> {
impl<S: ReadStore + Send + Sync> Scaler for SimpleActorScaler<S> {
type Config = SimpleScalerConfig;

async fn update_config(&mut self, config: Self::Config) -> Result<HashSet<Command>> {
async fn update_config(&mut self, config: Self::Config) -> Result<Vec<Command>> {
self.config = config;

self.reconcile().await
}

async fn handle_event(&self, event: Event) -> Result<HashSet<Command>> {
async fn handle_event(&self, event: &Event) -> Result<Vec<Command>> {
match event {
Event::ActorStarted(_) | Event::ActorStopped(_) | Event::HostStopped(_) => {
self.compute_actor_commands(&self.store).await
}
// No other event impacts the job of this scaler so we can ignore it
_ => Ok(HashSet::new()),
_ => Ok(Vec::new()),
}
}

async fn reconcile(&self) -> Result<HashSet<Command>> {
async fn reconcile(&self) -> Result<Vec<Command>> {
self.compute_actor_commands(&self.store).await
}
}

impl<S: ReadStore + Send + Sync> SimpleActorScaler<S> {
#[allow(unused)]
/// Construct a new SimpleActorScaler with specified configuration values
fn new(store: S, actor_reference: String, lattice_id: String, replicas: usize) -> Self {
fn new(
store: S,
actor_reference: String,
lattice_id: String,
replicas: usize,
model_name: String,
) -> Self {
Self {
store,
config: SimpleScalerConfig {
actor_reference,
lattice_id,
replicas,
model_name,
},
}
}

/// Given a readable store containing the state of the lattice, compute the
/// required commands to either stop extra actor instances or start new
/// actor instances to reach the configured replica count
async fn compute_actor_commands(&self, store: &S) -> Result<HashSet<Command>> {
async fn compute_actor_commands(&self, store: &S) -> Result<Vec<Command>> {
// NOTE(brooksmtownsend): This will fail to look up the actor ID if an actor is not running in the lattice currently.
// This is acceptable for the simplescaler but might require a helper function in the future
let actor_id = store
Expand All @@ -100,46 +109,49 @@ impl<S: ReadStore + Send + Sync> SimpleActorScaler<S> {
#[allow(clippy::comparison_chain)]
if count > 0 {
// Choosing to retrieve the first host that an actor is running on over querying the store for efficiency
let host_id = actors.count.keys().next().cloned().unwrap_or_default();
let host_id = actors.instances.keys().next().cloned().unwrap_or_default();

HashSet::from_iter([Command::StartActor(StartActor {
vec![Command::StartActor(StartActor {
reference: self.config.actor_reference.to_owned(),
count: count as usize, // It's a positive integer so we know this will succeed
host_id,
model_name: "fake".into(),
})])
model_name: self.config.model_name.clone(),
annotations: HashMap::new(),
})]
} else if count < 0 {
// This is written iteratively rather than functionally just because it reads better.
let mut remaining = count.unsigned_abs() as usize;
let mut commands = HashSet::new();
let mut commands = Vec::new();

// For each host running this actor, request actor stops until
// the total number of stops equals the number of extra instances
for (host_id, count) in actors.count {
for (host_id, instances) in actors.instances {
if remaining == 0 {
break;
} else if remaining >= count {
commands.insert(Command::StopActor(StopActor {
} else if remaining >= instances.len() {
commands.push(Command::StopActor(StopActor {
actor_id: actor_id.to_owned(),
host_id,
count,
count: instances.len(),
model_name: "fake".into(),
annotations: HashMap::new(),
}));
remaining -= count;
remaining -= instances.len();
} else {
commands.insert(Command::StopActor(StopActor {
commands.push(Command::StopActor(StopActor {
actor_id: actor_id.to_owned(),
host_id,
count: remaining,
count: instances.len(),
model_name: "fake".into(),
annotations: HashMap::new(),
}));
remaining = 0;
}
}

commands
} else {
HashSet::new()
Vec::new()
}
}
None => {
Expand All @@ -150,12 +162,13 @@ impl<S: ReadStore + Send + Sync> SimpleActorScaler<S> {
.next()
.map(|(host_id, _host)| host_id)
{
HashSet::from_iter([Command::StartActor(StartActor {
vec![Command::StartActor(StartActor {
reference: self.config.actor_reference.to_owned(),
count: self.config.replicas,
host_id: host_id.to_owned(),
model_name: "fake".into(),
})])
model_name: self.config.model_name.clone(),
annotations: HashMap::new(),
})]
} else {
return Err(anyhow::anyhow!(
"No hosts running, unable to return actor start commands"
Expand All @@ -171,18 +184,47 @@ impl<S: ReadStore + Send + Sync> SimpleActorScaler<S> {
mod test {
use std::{collections::HashMap, sync::Arc};

use wasmcloud_control_interface::HostInventory;

use crate::{
commands::{Command, StartActor},
consumers::{manager::Worker, ScopedMessage},
events::{ActorClaims, ActorStarted, Event, HostStarted},
scaler::{simplescaler::SimpleActorScaler, Scaler},
test_util::TestStore,
workers::EventWorker,
workers::{Claims, ClaimsSource, EventWorker, InventorySource},
};

//TODO: move to common util
struct TestLatticeSource {
claims: HashMap<String, Claims>,
inventory: HashMap<String, HostInventory>,
}

#[async_trait::async_trait]
impl ClaimsSource for TestLatticeSource {
async fn get_claims(&self) -> anyhow::Result<HashMap<String, Claims>> {
Ok(self.claims.clone())
}
}

#[async_trait::async_trait]
impl InventorySource for TestLatticeSource {
async fn get_inventory(&self, host_id: &str) -> anyhow::Result<HostInventory> {
// Ok(self
// .inventory
// .get(host_id)
// .clone()
// .unwrap_or_default()
// .clone())
todo!()
}
}

#[tokio::test]
async fn can_return_error_with_no_hosts() {
let lattice_id = "hoohah_no_host";
let model_name = "FAKEECHO";
let actor_reference = "fakecloud.azurecr.io/echo:0.3.4".to_string();
let replicas = 12;

Expand All @@ -192,6 +234,7 @@ mod test {
actor_reference,
lattice_id.to_string(),
replicas,
model_name.to_string(),
);

let cmds = simple_scaler.reconcile().await;
Expand All @@ -204,6 +247,7 @@ mod test {

#[tokio::test]
async fn can_request_start_actor() {
let model_name = "FAKEECHO";
let lattice_id = "hoohah_start_actor";
let actor_reference = "fakecloud.azurecr.io/echo:0.3.4".to_string();
let replicas = 12;
Expand All @@ -212,7 +256,11 @@ mod test {
// Lattice State: One empty host

let store = Arc::new(TestStore::default());
let worker = EventWorker::new(store.clone(), HashMap::default());
let lattice_source = TestLatticeSource {
claims: HashMap::default(),
inventory: HashMap::default(),
};
let worker = EventWorker::new(store.clone(), lattice_source);

let host_id = "NASDASDIAMAREALHOST".to_string();
let host_name = "I am a real host".to_string();
Expand All @@ -239,6 +287,7 @@ mod test {
actor_reference.to_string(),
lattice_id.to_string(),
replicas,
model_name.to_string(),
);

let cmds = simple_scaler
Expand All @@ -256,20 +305,26 @@ mod test {
reference: actor_reference,
host_id,
count: replicas,
model_name: "fake".into(),
model_name: model_name.to_string(),
annotations: HashMap::new()
})
)
}

#[tokio::test]
async fn can_request_multiple_stop_actor() {
let model_name = "MULTI_STOP";
let lattice_id = "hoohah_multi_stop_actor";
let actor_reference = "fakecloud.azurecr.io/echo:0.3.4".to_string();
let actor_id = "MASDASDIAMAREALACTOR";
let replicas = 2;

let store = Arc::new(TestStore::default());
let worker = EventWorker::new(store.clone(), HashMap::default());
let lattice_source = TestLatticeSource {
claims: HashMap::default(),
inventory: HashMap::default(),
};
let worker = EventWorker::new(store.clone(), lattice_source);

// *** STATE SETUP BEGIN ***
// Lattice State: One host with 4 instances of the actor, and one host with 3 instances
Expand Down Expand Up @@ -356,6 +411,7 @@ mod test {
actor_reference.to_string(),
lattice_id.to_string(),
replicas,
model_name.to_string(),
);

let cmds = simple_scaler
Expand Down

0 comments on commit 19c2140

Please sign in to comment.