From f55b7e694b15d2709bf6d98b92c9eb15c11ccd0b Mon Sep 17 00:00:00 2001 From: MasterPtato <23087326+MasterPtato@users.noreply.github.com> Date: Thu, 18 Apr 2024 01:19:00 +0000 Subject: [PATCH] fix: add transacitons (#689) ## Changes --- lib/pools/src/utils/crdb.rs | 20 +++++++++++++ .../worker/src/workers/server_drain.rs | 27 +++++++++++++++-- .../worker/src/workers/server_undrain.rs | 29 ++++++++++++++++--- 3 files changed, 69 insertions(+), 7 deletions(-) diff --git a/lib/pools/src/utils/crdb.rs b/lib/pools/src/utils/crdb.rs index df3a3da490..f476c6cdae 100644 --- a/lib/pools/src/utils/crdb.rs +++ b/lib/pools/src/utils/crdb.rs @@ -55,6 +55,26 @@ where bail!("transaction failed with retry too many times"); } +/// Runs a transaction without retrying. +#[tracing::instrument(skip_all)] +pub async fn tx_no_retry(crdb: &CrdbPool, f: F) -> GlobalResult +where + for<'t> F: Fn(&'t mut sqlx::Transaction<'_, sqlx::Postgres>) -> AsyncResult<'t, T>, +{ + let mut tx = crdb.begin().await?; + + match f(&mut tx).await { + Err(err) => { + tx.rollback().await?; + Err(err) + } + Ok(x) => { + tx.commit().await?; + Ok(x) + } + } +} + // TODO: This seems to leak connections on retries, even though it matches the // CRDB spec. This is likely because of odd behavior in the sqlx driver. ///// Runs a transaction. This explicitly handles retry errors. diff --git a/svc/pkg/cluster/worker/src/workers/server_drain.rs b/svc/pkg/cluster/worker/src/workers/server_drain.rs index 6395436e78..f61bf93d4e 100644 --- a/svc/pkg/cluster/worker/src/workers/server_drain.rs +++ b/svc/pkg/cluster/worker/src/workers/server_drain.rs @@ -1,4 +1,5 @@ use chirp_worker::prelude::*; +use futures_util::FutureExt; use nomad_client::{ apis::{configuration::Configuration, nodes_api}, models, @@ -14,24 +15,44 @@ struct Server { datacenter_id: Uuid, pool_type: i64, nomad_node_id: Option, + is_not_draining: bool, } #[worker(name = "cluster-server-drain")] async fn worker(ctx: &OperationContext) -> GlobalResult<()> { + rivet_pools::utils::crdb::tx_no_retry(&ctx.crdb().await?, |tx| inner(ctx.clone(), tx).boxed()) + .await?; + + Ok(()) +} + +async fn inner( + ctx: OperationContext, + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, +) -> GlobalResult<()> { let server_id = unwrap_ref!(ctx.server_id).as_uuid(); let server = sql_fetch_one!( - [ctx, Server] + [ctx, Server, @tx tx] " SELECT - datacenter_id, pool_type, nomad_node_id + datacenter_id, + pool_type, + nomad_node_id, + (drain_ts IS NULL) AS is_not_draining FROM db_cluster.servers WHERE server_id = $1 + FOR UPDATE ", server_id, ) .await?; + if server.is_not_draining { + tracing::error!("attempting to drain server that was not set as draining"); + return Ok(()); + } + // Fetch datacenter config let datacenter_res = op!([ctx] cluster_datacenter_get { datacenter_ids: vec![server.datacenter_id.into()], @@ -95,7 +116,7 @@ async fn worker(ctx: &OperationContext) -> backend::cluster::PoolType::Gg => { // Delete DNS record msg!([ctx] cluster::msg::server_dns_delete(server_id) { - server_id: ctx.server_id, + server_id: Some(server_id.into()), }) .await?; } diff --git a/svc/pkg/cluster/worker/src/workers/server_undrain.rs b/svc/pkg/cluster/worker/src/workers/server_undrain.rs index f20672fd90..8a2e3a3612 100644 --- a/svc/pkg/cluster/worker/src/workers/server_undrain.rs +++ b/svc/pkg/cluster/worker/src/workers/server_undrain.rs @@ -1,4 +1,5 @@ use chirp_worker::prelude::*; +use futures_util::FutureExt; use nomad_client::{ apis::{configuration::Configuration, nodes_api}, models, @@ -14,25 +15,45 @@ struct Server { datacenter_id: Uuid, pool_type: i64, nomad_node_id: Option, + is_draining: bool, } #[worker(name = "cluster-server-undrain")] async fn worker(ctx: &OperationContext) -> GlobalResult<()> { + rivet_pools::utils::crdb::tx_no_retry(&ctx.crdb().await?, |tx| inner(ctx.clone(), tx).boxed()) + .await?; + + Ok(()) +} + +async fn inner( + ctx: OperationContext, + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, +) -> GlobalResult<()> { let server_id = unwrap_ref!(ctx.server_id).as_uuid(); // NOTE: `drain_ts` will already be set to null before this worker is called let server = sql_fetch_one!( - [ctx, Server] + [ctx, Server, @tx tx] " SELECT - datacenter_id, pool_type, nomad_node_id + datacenter_id, + pool_type, + nomad_node_id, + (drain_ts IS NOT NULL) AS is_draining FROM db_cluster.servers WHERE server_id = $1 + FOR UPDATE ", - server_id + server_id, ) .await?; + if server.is_draining { + tracing::error!("attempting to undrain server that was not set as undraining"); + return Ok(()); + } + let pool_type = unwrap!(backend::cluster::PoolType::from_i32( server.pool_type as i32 )); @@ -77,7 +98,7 @@ async fn worker(ctx: &OperationContext) - backend::cluster::PoolType::Gg => { // Recreate DNS record msg!([ctx] cluster::msg::server_dns_create(server_id) { - server_id: ctx.server_id, + server_id: Some(server_id.into()), }) .await?; }