Skip to content

Commit

Permalink
ensured worker updates host state
Browse files Browse the repository at this point in the history
Signed-off-by: Brooks Townsend <brooks@cosmonic.com>
  • Loading branch information
brooksmtownsend committed Apr 10, 2023
1 parent 5972f6e commit 01a6194
Showing 1 changed file with 145 additions and 37 deletions.
182 changes: 145 additions & 37 deletions src/workers/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,6 @@ where
// multiple error cases, it was just easier to catch it into an anyhow Error and then convert at
// the end

// TODO(thomastaylor312): Initially I thought we'd have to update the host state as well with
// the new provider/actor. However, do we actually need to update the host info or just let the
// heartbeat take care of it? I think we might be ok because the actor/provider data should have
// all the info needed to make a decision for scaling in conjunction with the basic host info
// (like labels). We might have to revisit this when a) we implement the `Scaler` trait or b) if
// we start serving up lattice state to consumers of wadm (in which case we'd want state changes
// reflected immediately)

#[instrument(level = "debug", skip(self, actor), fields(actor_id = %actor.public_key, host_id = %actor.host_id))]
async fn handle_actor_started(
&self,
Expand All @@ -116,6 +108,23 @@ where
// Merge in current counts
actor_data.count = current.count;
}
// Update actor count in the host
if let Some(mut host) = self.store.get::<Host>(lattice_id, &actor.host_id).await? {
trace!(host = ?host, "Found existing host data");
match host.actors.get(&actor.public_key) {
Some(existing_count) => {
host.actors
.insert(actor.public_key.to_owned(), *existing_count + 1);
}
None => {
host.actors.insert(actor.public_key.to_owned(), 1);
}
}

self.store
.store(lattice_id, host.id.to_owned(), host)
.await?
}

// Update count of the data
actor_data
Expand Down Expand Up @@ -166,6 +175,27 @@ where
.await
}?;
}

// Update actor count in the host
if let Some(mut host) = self.store.get::<Host>(lattice_id, &actor.host_id).await? {
trace!(host = ?host, "Found existing host data");
match host.actors.get(&actor.public_key) {
Some(existing_count) if *existing_count <= 1 => {
host.actors.remove(&actor.public_key);
}
Some(existing_count) => {
host.actors
.insert(actor.public_key.to_owned(), *existing_count - 1);
}
// you cannot delete what doesn't exist
None => (),
}

self.store
.store(lattice_id, host.id.to_owned(), host)
.await?
}

Ok(())
}

Expand Down Expand Up @@ -331,6 +361,28 @@ where
prov.hosts = HashMap::from([(provider.host_id.clone(), ProviderStatus::default())]);
prov
};

// Insert provider into host map
if let Some(mut host) = self
.store
.get::<Host>(lattice_id, &provider.host_id)
.await?
{
trace!(host = ?host, "Found existing host data");

// Only one provider with this configuration is allowed to run on this host, so overwrites
// shouldn't happen but are acceptable
host.providers.insert(ProviderInfo {
contract_id: provider.contract_id.to_owned(),
link_name: provider.link_name.to_owned(),
public_key: provider.public_key.to_owned(),
});

self.store
.store(lattice_id, host.id.to_owned(), host)
.await?
}

debug!("Storing updated provider in store");
self.store
.store(lattice_id, id, provider_data)
Expand All @@ -355,6 +407,26 @@ where
debug!("Handling provider stopped event");
let id = crate::storage::provider_id(&provider.public_key, &provider.link_name);
trace!("Fetching current data from store");

// Remove provider from host map
if let Some(mut host) = self
.store
.get::<Host>(lattice_id, &provider.host_id)
.await?
{
trace!(host = ?host, "Found existing host data");

host.providers.remove(&ProviderInfo {
contract_id: provider.contract_id.to_owned(),
link_name: provider.link_name.to_owned(),
public_key: provider.public_key.to_owned(),
});

self.store
.store(lattice_id, host.id.to_owned(), host)
.await?
}

if let Some(mut current) = self.store.get::<Provider>(lattice_id, &id).await? {
if current.hosts.remove(&provider.host_id).is_none() {
trace!(host_id = %provider.host_id, "Did not find host entry in provider");
Expand Down Expand Up @@ -760,6 +832,17 @@ mod test {
assert_eq!(actors.len(), 1, "Should only be 1 actor in state");
assert_actor(&actors, &actor1, &[(&host1_id, 1)]);

// The stored host should also now have this actor in its map
let host = store
.get::<Host>(lattice_id, &host1_id)
.await
.expect("Should be able to access store")
.expect("Should have the host in the store");
assert_eq!(
*host.actors.get(&actor1.public_key).unwrap_or(&0),
1 as usize
);

worker
.handle_actor_started(lattice_id, &actor1)
.await
Expand Down Expand Up @@ -870,8 +953,31 @@ mod test {
assert_eq!(providers.len(), 2, "Should only be 2 providers in state");
assert_provider(&providers, &provider2, &[&host1_id, &host2_id]);

// TODO(thomastaylor312): This is just a reminder that if we add in host state updating on
// provider/actor events, we should test that here before we hit a host heartbeat
// Check that hosts got updated properly
let hosts = store.list::<Host>(lattice_id).await.unwrap();
assert_eq!(hosts.len(), 2, "Should only have 2 hosts");
let host = hosts.get(&host1_id).expect("Host should still exist");
assert_eq!(
host.actors.len(),
2,
"Should have two different actors running"
);
assert_eq!(
host.providers.len(),
2,
"Should have two different providers running"
);
let host = hosts.get(&host2_id).expect("Host should still exist");
assert_eq!(
host.actors.len(),
2,
"Should have two different actors running"
);
assert_eq!(
host.providers.len(),
1,
"Should have a single provider running"
);

/***********************************************************/
/******************* Host Heartbeat Test *******************/
Expand Down Expand Up @@ -934,32 +1040,6 @@ mod test {
.await
.expect("Should be able to handle host heartbeat");

// Check that hosts got updated properly
let hosts = store.list::<Host>(lattice_id).await.unwrap();
assert_eq!(hosts.len(), 2, "Should only have 2 hosts");
let host = hosts.get(&host1_id).expect("Host should still exist");
assert_eq!(
host.actors.len(),
2,
"Should have two different actors running"
);
assert_eq!(
host.providers.len(),
2,
"Should have two different providers running"
);
let host = hosts.get(&host2_id).expect("Host should still exist");
assert_eq!(
host.actors.len(),
2,
"Should have two different actors running"
);
assert_eq!(
host.providers.len(),
1,
"Should have a single provider running"
);

// Check that our actor and provider data is still correct.
let providers = store.list::<Provider>(lattice_id).await.unwrap();
assert_eq!(providers.len(), 2, "Should still have 2 providers in state");
Expand Down Expand Up @@ -995,6 +1075,20 @@ mod test {
assert_actor(&actors, &actor1, &[(&host2_id, 2)]);
assert_actor(&actors, &actor2, &[(&host1_id, 2), (&host2_id, 2)]);

let host = store
.get::<Host>(lattice_id, &host2_id)
.await
.expect("Should be able to access store")
.expect("Should have the host in the store");
assert_eq!(
*host.actors.get(&actor1.public_key).unwrap_or(&0),
2 as usize
);
assert_eq!(
*host.actors.get(&actor2.public_key).unwrap_or(&0),
2 as usize
);

// Now stop on the other
let stopped = ActorStopped {
host_id: host2_id.clone(),
Expand Down Expand Up @@ -1039,6 +1133,20 @@ mod test {
assert_provider(&providers, &provider1, &[&host1_id]);
assert_provider(&providers, &provider2, &[&host2_id]);

// Check that hosts got updated properly
let hosts = store.list::<Host>(lattice_id).await.unwrap();
assert_eq!(hosts.len(), 2, "Should only have 2 hosts");
let host = hosts.get(&host1_id).expect("Host should still exist");
assert_eq!(host.actors.len(), 1, "Should have 1 actor running");
assert_eq!(host.providers.len(), 1, "Should have 1 provider running");
let host = hosts.get(&host2_id).expect("Host should still exist");
assert_eq!(host.actors.len(), 1, "Should have 1 actor running");
assert_eq!(
host.providers.len(),
1,
"Should have a single provider running"
);

/***********************************************************/
/***************** Heartbeat Tests Part 2 ******************/
/***********************************************************/
Expand Down Expand Up @@ -1088,7 +1196,7 @@ mod test {
.await
.expect("Should be able to handle host heartbeat");

// Check that hosts got updated properly
// Check that the heartbeat kept state consistent
let hosts = store.list::<Host>(lattice_id).await.unwrap();
assert_eq!(hosts.len(), 2, "Should only have 2 hosts");
let host = hosts.get(&host1_id).expect("Host should still exist");
Expand Down

0 comments on commit 01a6194

Please sign in to comment.