Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions errors/servers/servers_server_failed_to_create.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
name = "SERVERS_SERVER_FAILED_TO_CREATE"
description = "Server failed to create."
http_status = 400
---

# Server Failed To Create

Server failed to create.
15 changes: 13 additions & 2 deletions svc/api/servers/src/route/servers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,16 @@ pub async fn create(

let server_id = Uuid::new_v4();

let mut sub = ctx
let mut create_sub = ctx
.subscribe::<ds::workflows::server::CreateComplete>(&json!({
"server_id": server_id,
}))
.await?;
let mut fail_sub = ctx
.subscribe::<ds::workflows::server::CreateFailed>(&json!({
"server_id": server_id,
}))
.await?;

ctx.dispatch_tagged_workflow(
&json!({
Expand Down Expand Up @@ -136,7 +141,13 @@ pub async fn create(
)
.await?;

sub.next().await?;
tokio::select! {
res = create_sub.next() => { res?; },
res = fail_sub.next() => {
res?;
bail_with!(SERVERS_SERVER_FAILED_TO_CREATE);
}
}

let servers_res = ctx
.op(ds::ops::server::get::Input {
Expand Down
61 changes: 45 additions & 16 deletions svc/pkg/ds/src/workflows/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,48 @@ pub struct Port {

#[workflow]
pub async fn ds_server(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> {
let res = setup(ctx, input).await;
match ctx.catch_unrecoverable(res)? {
Ok(_) => {}
// If we cannot recover a setup error, send a failed signal
Err(err) => {
tracing::warn!("unrecoverable setup");

// TODO: Cleanup

ctx.msg(
json!({
"server_id": input.server_id,
}),
CreateFailed {},
)
.await?;

// Throw the original error from the setup activities
return Err(err);
}
};

ctx.msg(
json!({
"server_id": input.server_id
}),
CreateComplete {},
)
.await?;

let destroy_sig = ctx.listen::<Destroy>().await?;

ctx.workflow(destroy::Input {
server_id: input.server_id,
override_kill_timeout_ms: destroy_sig.override_kill_timeout_ms,
})
.await?;

Ok(())
}

async fn setup(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> {
let (_, prereq) = ctx
.join((
InsertDbInput {
Expand Down Expand Up @@ -140,22 +182,6 @@ pub async fn ds_server(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()>
})
.await?;

ctx.msg(
json!({
"server_id": input.server_id
}),
CreateComplete {},
)
.await?;

let destroy_sig = ctx.listen::<Destroy>().await?;

ctx.workflow(destroy::Input {
server_id: input.server_id,
override_kill_timeout_ms: destroy_sig.override_kill_timeout_ms,
})
.await?;

Ok(())
}

Expand Down Expand Up @@ -1072,6 +1098,9 @@ async fn update_db(ctx: &ActivityCtx, input: &UpdateDbInput) -> GlobalResult<()>
#[message("ds_server_create_complete")]
pub struct CreateComplete {}

#[message("ds_server_create_failed")]
pub struct CreateFailed {}

#[signal("ds_server_destroy")]
pub struct Destroy {
pub override_kill_timeout_ms: i64,
Expand Down
2 changes: 1 addition & 1 deletion svc/pkg/linode/src/workflows/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub async fn linode_server(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult
Ok(x) => x,
// If we cannot recover a provisioning error, send a failed signal and clean up resources
Err(err) => {
tracing::warn!(?err, "unrecoverable provision, cleaning up");
tracing::warn!("unrecoverable provision, cleaning up");

ctx.dispatch_workflow(cleanup::Input {
api_token: input.api_token.clone(),
Expand Down