diff --git a/lib/chirp-workflow/core/src/db/pg_nats.rs b/lib/chirp-workflow/core/src/db/pg_nats.rs index 7184d4e285..4cb5d650b5 100644 --- a/lib/chirp-workflow/core/src/db/pg_nats.rs +++ b/lib/chirp-workflow/core/src/db/pg_nats.rs @@ -76,10 +76,16 @@ impl DatabasePgNats { T: 'a, { let mut backoff = rivet_util::Backoff::new(3, None, QUERY_RETRY_MS, 50); + let mut i = 0; - for _ in 0..MAX_QUERY_RETRIES { + loop { match cb().await { Err(WorkflowError::Sqlx(err)) => { + i += 1; + if i > MAX_QUERY_RETRIES { + return Err(WorkflowError::MaxSqlRetries(err)); + } + use sqlx::Error::*; match &err { // Retry transaction errors immediately @@ -88,13 +94,13 @@ impl DatabasePgNats { .message() .contains("TransactionRetryWithProtoRefreshError") { - tracing::info!(message=%db_err.message(), "transaction retry"); + tracing::warn!(message=%db_err.message(), "transaction retry"); } } // Retry internal errors with a backoff Io(_) | Tls(_) | Protocol(_) | PoolTimedOut | PoolClosed | WorkerCrashed => { - tracing::info!(?err, "query retry"); + tracing::warn!(?err, "query retry"); backoff.tick().await; } // Throw error @@ -104,8 +110,6 @@ impl DatabasePgNats { x => return x, } } - - Err(WorkflowError::MaxSqlRetries) } } diff --git a/lib/chirp-workflow/core/src/error.rs b/lib/chirp-workflow/core/src/error.rs index f55f902565..2631e34a36 100644 --- a/lib/chirp-workflow/core/src/error.rs +++ b/lib/chirp-workflow/core/src/error.rs @@ -117,8 +117,8 @@ pub enum WorkflowError { #[error("sql: {0}")] Sqlx(sqlx::Error), - #[error("max sql retries")] - MaxSqlRetries, + #[error("max sql retries (last error: {0})")] + MaxSqlRetries(sqlx::Error), #[error("pools: {0}")] Pools(#[from] rivet_pools::Error),