Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions lib/pools/src/utils/crdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,26 @@ where
bail!("transaction failed with retry too many times");
}

/// Runs a transaction without retrying.
#[tracing::instrument(skip_all)]
pub async fn tx_no_retry<T, F>(crdb: &CrdbPool, f: F) -> GlobalResult<T>
where
for<'t> F: Fn(&'t mut sqlx::Transaction<'_, sqlx::Postgres>) -> AsyncResult<'t, T>,
{
let mut tx = crdb.begin().await?;

match f(&mut tx).await {
Err(err) => {
tx.rollback().await?;
Err(err)
}
Ok(x) => {
tx.commit().await?;
Ok(x)
}
}
}

// TODO: This seems to leak connections on retries, even though it matches the
// CRDB spec. This is likely because of odd behavior in the sqlx driver.
///// Runs a transaction. This explicitly handles retry errors.
Expand Down
27 changes: 24 additions & 3 deletions svc/pkg/cluster/worker/src/workers/server_drain.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use chirp_worker::prelude::*;
use futures_util::FutureExt;
use nomad_client::{
apis::{configuration::Configuration, nodes_api},
models,
Expand All @@ -14,24 +15,44 @@ struct Server {
datacenter_id: Uuid,
pool_type: i64,
nomad_node_id: Option<String>,
is_not_draining: bool,
}

#[worker(name = "cluster-server-drain")]
async fn worker(ctx: &OperationContext<cluster::msg::server_drain::Message>) -> GlobalResult<()> {
rivet_pools::utils::crdb::tx_no_retry(&ctx.crdb().await?, |tx| inner(ctx.clone(), tx).boxed())
.await?;

Ok(())
}

async fn inner(
ctx: OperationContext<cluster::msg::server_drain::Message>,
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> GlobalResult<()> {
let server_id = unwrap_ref!(ctx.server_id).as_uuid();

let server = sql_fetch_one!(
[ctx, Server]
[ctx, Server, @tx tx]
"
SELECT
datacenter_id, pool_type, nomad_node_id
datacenter_id,
pool_type,
nomad_node_id,
(drain_ts IS NULL) AS is_not_draining
FROM db_cluster.servers
WHERE server_id = $1
FOR UPDATE
",
server_id,
)
.await?;

if server.is_not_draining {
tracing::error!("attempting to drain server that was not set as draining");
return Ok(());
}

// Fetch datacenter config
let datacenter_res = op!([ctx] cluster_datacenter_get {
datacenter_ids: vec![server.datacenter_id.into()],
Expand Down Expand Up @@ -95,7 +116,7 @@ async fn worker(ctx: &OperationContext<cluster::msg::server_drain::Message>) ->
backend::cluster::PoolType::Gg => {
// Delete DNS record
msg!([ctx] cluster::msg::server_dns_delete(server_id) {
server_id: ctx.server_id,
server_id: Some(server_id.into()),
})
.await?;
}
Expand Down
29 changes: 25 additions & 4 deletions svc/pkg/cluster/worker/src/workers/server_undrain.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use chirp_worker::prelude::*;
use futures_util::FutureExt;
use nomad_client::{
apis::{configuration::Configuration, nodes_api},
models,
Expand All @@ -14,25 +15,45 @@ struct Server {
datacenter_id: Uuid,
pool_type: i64,
nomad_node_id: Option<String>,
is_draining: bool,
}

#[worker(name = "cluster-server-undrain")]
async fn worker(ctx: &OperationContext<cluster::msg::server_undrain::Message>) -> GlobalResult<()> {
rivet_pools::utils::crdb::tx_no_retry(&ctx.crdb().await?, |tx| inner(ctx.clone(), tx).boxed())
.await?;

Ok(())
}

async fn inner(
ctx: OperationContext<cluster::msg::server_undrain::Message>,
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> GlobalResult<()> {
let server_id = unwrap_ref!(ctx.server_id).as_uuid();

// NOTE: `drain_ts` will already be set to null before this worker is called
let server = sql_fetch_one!(
[ctx, Server]
[ctx, Server, @tx tx]
"
SELECT
datacenter_id, pool_type, nomad_node_id
datacenter_id,
pool_type,
nomad_node_id,
(drain_ts IS NOT NULL) AS is_draining
FROM db_cluster.servers
WHERE server_id = $1
FOR UPDATE
",
server_id
server_id,
)
.await?;

if server.is_draining {
tracing::error!("attempting to undrain server that was not set as undraining");
return Ok(());
}

let pool_type = unwrap!(backend::cluster::PoolType::from_i32(
server.pool_type as i32
));
Expand Down Expand Up @@ -77,7 +98,7 @@ async fn worker(ctx: &OperationContext<cluster::msg::server_undrain::Message>) -
backend::cluster::PoolType::Gg => {
// Recreate DNS record
msg!([ctx] cluster::msg::server_dns_create(server_id) {
server_id: ctx.server_id,
server_id: Some(server_id.into()),
})
.await?;
}
Expand Down