Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/chirp-workflow/core/src/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ macro_rules! join_signal {
)else*

else {
unreachable!("received signal that wasn't queried for");
unreachable!("received signal that wasn't queried for: {}, expected {:?}", name, &[$($signals::NAME),*]);
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions svc/pkg/cluster/src/workflows/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,15 @@ async fn insert_db(ctx: &ActivityCtx, input: &InsertDbInput) -> GlobalResult<()>
Ok(())
}

#[message("cluster-create-complete")]
#[message("cluster_create_complete")]
pub struct CreateComplete {}

#[signal("cluster-game-link")]
#[signal("cluster_game_link")]
pub struct GameLink {
pub game_id: Uuid,
}

#[signal("cluster-datacenter-create")]
#[signal("cluster_datacenter_create")]
pub struct DatacenterCreate {
pub datacenter_id: Uuid,
pub name_id: String,
Expand Down
10 changes: 5 additions & 5 deletions svc/pkg/cluster/src/workflows/datacenter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,19 +217,19 @@ async fn insert_db(ctx: &ActivityCtx, input: &InsertDbInput) -> GlobalResult<()>
Ok(())
}

#[signal("cluster-datacenter-update")]
#[signal("cluster_datacenter_update")]
pub struct Update {
pub pools: Vec<PoolUpdate>,
pub prebakes_enabled: Option<bool>,
}

#[signal("cluster-datacenter-scale")]
#[signal("cluster_datacenter_scale")]
pub struct Scale {}

#[signal("cluster-datacenter-tls-renew")]
#[signal("cluster_datacenter_tls_renew")]
pub struct TlsRenew {}

#[signal("cluster-datacenter-server-create")]
#[signal("cluster_datacenter_server_create")]
pub struct ServerCreate {
pub server_id: Uuid,
pub pool_type: PoolType,
Expand All @@ -238,7 +238,7 @@ pub struct ServerCreate {

join_signal!(Main, [Update, Scale, ServerCreate, TlsRenew]);

#[message("cluster-datacenter-create-complete")]
#[message("cluster_datacenter_create_complete")]
pub struct CreateComplete {}

#[derive(Debug, Serialize, Deserialize, Hash)]
Expand Down
120 changes: 76 additions & 44 deletions svc/pkg/cluster/src/workflows/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,32 @@ pub(crate) async fn cluster_server(ctx: &mut WorkflowCtx, input: &Input) -> Glob

// Install components on server
if !already_installed {
ctx.workflow(install::Input {
datacenter_id: input.datacenter_id,
server_id: Some(input.server_id),
public_ip,
pool_type: input.pool_type.clone(),
initialize_immediately: true,
})
.await?;
let install_res = ctx
.workflow(install::Input {
datacenter_id: input.datacenter_id,
server_id: Some(input.server_id),
public_ip,
pool_type: input.pool_type.clone(),
initialize_immediately: true,
})
.await;

// If the server failed all attempts to install, clean it up
match ctx.catch_unrecoverable(install_res)? {
Ok(_) => {}
Err(err) => {
tracing::warn!(?err, "failed installing server, cleaning up");

ctx.activity(MarkDestroyedInput {
server_id: input.server_id,
})
.await?;

cleanup(ctx, input, &dc.provider, provider_server_workflow_id, false).await?;

return Err(err);
}
}
}

// Scale to get rid of tainted servers
Expand Down Expand Up @@ -279,34 +297,7 @@ pub(crate) async fn cluster_server(ctx: &mut WorkflowCtx, input: &Input) -> Glob
}
}

// Cleanup DNS
if let PoolType::Gg = input.pool_type {
ctx.workflow(dns_delete::Input {
server_id: input.server_id,
})
.await?;
}

// Cleanup server
match dc.provider {
Provider::Linode => {
tracing::info!(server_id=?input.server_id, "destroying linode server");

ctx.tagged_signal(
&json!({
"server_id": input.server_id,
}),
linode::workflows::server::Destroy {},
)
.await?;

// Wait for workflow to complete
ctx.wait_for_workflow::<linode::workflows::server::Workflow>(
provider_server_workflow_id,
)
.await?;
}
}
cleanup(ctx, input, &dc.provider, provider_server_workflow_id, true).await?;

Ok(())
}
Expand Down Expand Up @@ -677,6 +668,47 @@ async fn set_drain_complete(ctx: &ActivityCtx, input: &SetDrainCompleteInput) ->
Ok(())
}

async fn cleanup(
ctx: &mut WorkflowCtx,
input: &Input,
provider: &Provider,
provider_server_workflow_id: Uuid,
cleanup_dns: bool,
) -> GlobalResult<()> {
if cleanup_dns {
// Cleanup DNS
if let PoolType::Gg = input.pool_type {
ctx.workflow(dns_delete::Input {
server_id: input.server_id,
})
.await?;
}
}

// Cleanup server
match provider {
Provider::Linode => {
tracing::info!(server_id=?input.server_id, "destroying linode server");

ctx.tagged_signal(
&json!({
"server_id": input.server_id,
}),
linode::workflows::server::Destroy {},
)
.await?;

// Wait for workflow to complete
ctx.wait_for_workflow::<linode::workflows::server::Workflow>(
provider_server_workflow_id,
)
.await?;
}
}

Ok(())
}

/// Finite state machine for handling server updates.
struct State {
draining: bool,
Expand Down Expand Up @@ -782,30 +814,30 @@ type ProvisionComplete = linode::workflows::server::ProvisionComplete;
type ProvisionFailed = linode::workflows::server::ProvisionFailed;
join_signal!(pub(crate) Linode, [ProvisionComplete, ProvisionFailed]);

#[signal("cluster-server-drain")]
#[signal("cluster_server_drain")]
pub struct Drain {}

#[signal("cluster-server-undrain")]
#[signal("cluster_server_undrain")]
pub struct Undrain {}

#[signal("cluster-server-taint")]
#[signal("cluster_server_taint")]
pub struct Taint {}

#[signal("cluster-server-dns-create")]
#[signal("cluster_server_dns_create")]
pub struct DnsCreate {}

#[signal("cluster-server-dns-delete")]
#[signal("cluster_server_dns_delete")]
pub struct DnsDelete {}

#[signal("cluster-server-destroy")]
#[signal("cluster_server_destroy")]
pub struct Destroy {}

#[signal("cluster-server-nomad-registered")]
#[signal("cluster_server_nomad_registered")]
pub struct NomadRegistered {
pub node_id: String,
}

#[signal("cluster-server-nomad-drain-complete")]
#[signal("cluster_server_nomad_drain_complete")]
pub struct NomadDrainComplete {}

join_signal!(
Expand Down
4 changes: 2 additions & 2 deletions svc/pkg/linode/src/workflows/image.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ async fn create_custom_image(
Ok(create_image_res.id)
}

#[signal("linode-image-create-complete")]
#[signal("linode_image_create_complete")]
pub struct CreateComplete {
pub image_id: String,
}

#[signal("linode-image-destroy")]
#[signal("linode_image_destroy")]
pub struct Destroy {}

#[derive(Debug, Serialize, Deserialize, Hash)]
Expand Down
6 changes: 3 additions & 3 deletions svc/pkg/linode/src/workflows/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,15 +417,15 @@ async fn get_public_ip(ctx: &ActivityCtx, input: &GetPublicIpInput) -> GlobalRes
api::get_public_ip(&client, input.linode_id).await
}

#[signal("linode-server-provision-complete")]
#[signal("linode_server_provision_complete")]
pub struct ProvisionComplete {
pub linode_id: u64,
pub public_ip: Ipv4Addr,
pub boot_disk_id: u64,
}

#[signal("linode-server-provision-failed")]
#[signal("linode_server_provision_failed")]
pub struct ProvisionFailed {}

#[signal("linode-server-destroy")]
#[signal("linode_server_destroy")]
pub struct Destroy {}