Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions lib/chirp-workflow/core/src/db/pg_nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,16 @@ impl DatabasePgNats {
T: 'a,
{
let mut backoff = rivet_util::Backoff::new(3, None, QUERY_RETRY_MS, 50);
let mut i = 0;

for _ in 0..MAX_QUERY_RETRIES {
loop {
match cb().await {
Err(WorkflowError::Sqlx(err)) => {
i += 1;
if i > MAX_QUERY_RETRIES {
return Err(WorkflowError::MaxSqlRetries(err));
}

use sqlx::Error::*;
match &err {
// Retry transaction errors immediately
Expand All @@ -88,13 +94,13 @@ impl DatabasePgNats {
.message()
.contains("TransactionRetryWithProtoRefreshError")
{
tracing::info!(message=%db_err.message(), "transaction retry");
tracing::warn!(message=%db_err.message(), "transaction retry");
}
}
// Retry internal errors with a backoff
Io(_) | Tls(_) | Protocol(_) | PoolTimedOut | PoolClosed
| WorkerCrashed => {
tracing::info!(?err, "query retry");
tracing::warn!(?err, "query retry");
backoff.tick().await;
}
// Throw error
Expand All @@ -104,8 +110,6 @@ impl DatabasePgNats {
x => return x,
}
}

Err(WorkflowError::MaxSqlRetries)
}
}

Expand Down
4 changes: 2 additions & 2 deletions lib/chirp-workflow/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ pub enum WorkflowError {
#[error("sql: {0}")]
Sqlx(sqlx::Error),

#[error("max sql retries")]
MaxSqlRetries,
#[error("max sql retries (last error: {0})")]
MaxSqlRetries(sqlx::Error),

#[error("pools: {0}")]
Pools(#[from] rivet_pools::Error),
Expand Down