From 273e5a332f9ce82d5c46c17d08916a0b6b19b5ef Mon Sep 17 00:00:00 2001 From: MasterPtato <23087326+MasterPtato@users.noreply.github.com> Date: Sun, 25 Aug 2024 22:50:54 +0000 Subject: [PATCH] fix(clusters): fix dns and unrecoverable error bugs (#1083) ## Changes --- lib/chirp-workflow/core/src/ctx/workflow.rs | 24 +++++++++++---- svc/pkg/cluster/src/util/mod.rs | 30 +++++++++++++++++-- .../src/workflows/server/dns_delete.rs | 12 ++++++-- svc/pkg/cluster/src/workflows/server/mod.rs | 9 +++++- svc/pkg/ds/src/workflows/server/mod.rs | 2 +- svc/pkg/linode/src/workflows/server/mod.rs | 2 +- 6 files changed, 65 insertions(+), 14 deletions(-) diff --git a/lib/chirp-workflow/core/src/ctx/workflow.rs b/lib/chirp-workflow/core/src/ctx/workflow.rs index 345001562f..77ff2060ee 100644 --- a/lib/chirp-workflow/core/src/ctx/workflow.rs +++ b/lib/chirp-workflow/core/src/ctx/workflow.rs @@ -23,8 +23,8 @@ use crate::{ time::{DurationToMillis, TsToMillis}, GlobalErrorExt, Location, }, - workflow::{Workflow, WorkflowInput}, worker, + workflow::{Workflow, WorkflowInput}, }; // Time to delay a workflow from retrying after an error @@ -763,10 +763,24 @@ impl WorkflowCtx { res: GlobalResult, ) -> GlobalResult> { match res { - Err(err) if !err.is_workflow_recoverable() => { - self.location_idx += 1; - - Ok(Err(err)) + Err(GlobalError::Raw(inner_err)) => { + match inner_err.downcast::() { + Ok(inner_err) => { + // Despite "history diverged" errors being unrecoverable, they should not have be returned + // by this function because the state of the history is already messed up and no new + // workflow items can be run. + if !inner_err.is_recoverable() + && !matches!(*inner_err, WorkflowError::HistoryDiverged(_)) + { + return Ok(Err(GlobalError::raw(inner_err))); + } else { + return Err(GlobalError::raw(inner_err)); + } + } + Err(err) => { + return Err(GlobalError::Raw(err)); + } + } } Err(err) => Err(err), Ok(x) => Ok(Ok(x)), diff --git a/svc/pkg/cluster/src/util/mod.rs b/svc/pkg/cluster/src/util/mod.rs index 306b029129..597c3c10c2 100644 --- a/svc/pkg/cluster/src/util/mod.rs +++ b/svc/pkg/cluster/src/util/mod.rs @@ -90,8 +90,32 @@ pub(crate) async fn create_dns_record( cf::dns::DnsContent::TXT { .. } => "TXT", cf::dns::DnsContent::SRV { .. } => "SRV", }; - let list_records_res = - get_dns_record(cf_token, zone_id, record_name, dns_type).await?; + + // Find record to delete + let list_records_res = match content { + cf::dns::DnsContent::A { .. } => { + get_dns_record(cf_token, zone_id, record_name, dns_type).await? + } + cf::dns::DnsContent::TXT { .. } => { + // Get DNS record with content comparison + client + .request(&cf::dns::ListDnsRecords { + zone_identifier: zone_id, + params: cf::dns::ListDnsRecordsParams { + record_type: Some(content.clone()), + name: Some(record_name.to_string()), + ..Default::default() + }, + }) + .await? + .result + .into_iter() + .next() + } + _ => { + unimplemented!("must configure whether to search for records via content vs no content for this DNS record type"); + } + }; if let Some(record) = list_records_res { delete_dns_record(client, zone_id, &record.id).await?; @@ -140,7 +164,7 @@ pub(crate) async fn delete_dns_record( Ok(()) } -/// Fetches the dns record by name. +/// Fetches a dns record by name and type, not content. async fn get_dns_record( cf_token: &str, zone_id: &str, diff --git a/svc/pkg/cluster/src/workflows/server/dns_delete.rs b/svc/pkg/cluster/src/workflows/server/dns_delete.rs index 0c560a73f1..5fa0e6b2d5 100644 --- a/svc/pkg/cluster/src/workflows/server/dns_delete.rs +++ b/svc/pkg/cluster/src/workflows/server/dns_delete.rs @@ -51,7 +51,7 @@ struct GetDnsRecordsInput { server_id: Uuid, } -#[derive(Debug, sqlx::FromRow, Serialize, Deserialize, Hash)] +#[derive(Debug, Default, sqlx::FromRow, Serialize, Deserialize, Hash)] struct GetDnsRecordsOutput { dns_record_id: Option, secondary_dns_record_id: Option, @@ -62,7 +62,7 @@ async fn get_dns_records( ctx: &ActivityCtx, input: &GetDnsRecordsInput, ) -> GlobalResult { - sql_fetch_one!( + let row = sql_fetch_optional!( [ctx, GetDnsRecordsOutput] " SELECT dns_record_id, secondary_dns_record_id @@ -73,7 +73,13 @@ async fn get_dns_records( ", &input.server_id, ) - .await + .await?; + + if row.is_none() { + tracing::warn!("server has no DNS record row"); + } + + Ok(row.unwrap_or_default()) } #[derive(Debug, Serialize, Deserialize, Hash)] diff --git a/svc/pkg/cluster/src/workflows/server/mod.rs b/svc/pkg/cluster/src/workflows/server/mod.rs index 6908879f6f..ead5e25558 100644 --- a/svc/pkg/cluster/src/workflows/server/mod.rs +++ b/svc/pkg/cluster/src/workflows/server/mod.rs @@ -306,7 +306,14 @@ pub(crate) async fn cluster_server(ctx: &mut WorkflowCtx, input: &Input) -> Glob } } - cleanup(ctx, input, &dc.provider, provider_server_workflow_id, true).await?; + cleanup( + ctx, + input, + &dc.provider, + provider_server_workflow_id, + state.has_dns, + ) + .await?; Ok(()) } diff --git a/svc/pkg/ds/src/workflows/server/mod.rs b/svc/pkg/ds/src/workflows/server/mod.rs index 858ad865c5..0c93d9a31b 100644 --- a/svc/pkg/ds/src/workflows/server/mod.rs +++ b/svc/pkg/ds/src/workflows/server/mod.rs @@ -82,7 +82,7 @@ pub async fn ds_server(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> Ok(_) => {} // If we cannot recover a setup error, send a failed signal Err(err) => { - tracing::warn!("unrecoverable setup"); + tracing::warn!(?err, "unrecoverable setup"); // TODO: Cleanup diff --git a/svc/pkg/linode/src/workflows/server/mod.rs b/svc/pkg/linode/src/workflows/server/mod.rs index db0ffcd17e..f28a916f5a 100644 --- a/svc/pkg/linode/src/workflows/server/mod.rs +++ b/svc/pkg/linode/src/workflows/server/mod.rs @@ -33,7 +33,7 @@ pub async fn linode_server(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult Ok(x) => x, // If we cannot recover a provisioning error, send a failed signal and clean up resources Err(err) => { - tracing::warn!("unrecoverable provision, cleaning up"); + tracing::warn!(?err, "unrecoverable provision, cleaning up"); ctx.dispatch_workflow(cleanup::Input { api_token: input.api_token.clone(),