Skip to content

Commit

Permalink
readyset: Make migration task upstream conn fault tolerant
Browse files Browse the repository at this point in the history
When running with async migrations, the migration background task needs
to load the server-configured schema search path from the upstream db
on startup. Previously, this process was not fault tolerant, and would
cause a panic triggering a full-process abort if the connection failed.
This commit changes it to sleep a second and automatically retry if it
encounters an error instead.

Fixes: ENG-2835
Change-Id: I8d464a55998029704ec79e9a031d8d30f006221c
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/4807
Tested-by: Buildkite CI
Reviewed-by: Dan Wilbanks <dan@readyset.io>
  • Loading branch information
glittershark committed May 2, 2023
1 parent 52177a2 commit 020f58a
Showing 1 changed file with 44 additions and 20 deletions.
64 changes: 44 additions & 20 deletions readyset/src/lib.rs
Expand Up @@ -72,6 +72,12 @@ const AWS_METADATA_TOKEN_ENDPOINT: &str = "http://169.254.169.254/latest/api/tok
/// Timeout to use when connecting to the upstream database
const UPSTREAM_CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);

/// Retry interval to use when attempting to connect to the upstream database
const UPSTREAM_CONNECTION_RETRY_INTERVAL: Duration = Duration::from_secs(1);

/// Retry interval to use when attempting to load the schema search path from the upstream database
const LOAD_SCHEMA_SEARCH_PATH_RETRY_INTERVAL: Duration = Duration::from_secs(1);

#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
Expand Down Expand Up @@ -838,28 +844,46 @@ where
rs_connect.in_scope(|| info!("Spawning migration handler task"));
let fut = async move {
let connection = span!(Level::INFO, "migration task upstream database connection");
let mut upstream =
if upstream_config.upstream_db_url.is_some()
&& !no_upstream_connections
&& !dry_run
{
Some(
H::UpstreamDatabase::connect(upstream_config, fallback_cache)
.instrument(connection.in_scope(|| {
span!(Level::INFO, "Connecting to upstream database")
}))
.await
.unwrap(),
let schema_search_path = if upstream_config.upstream_db_url.is_none()
|| no_upstream_connections
|| dry_run
{
Default::default()
} else {
loop {
let mut upstream = match H::UpstreamDatabase::connect(
upstream_config.clone(),
fallback_cache.clone(),
)
} else {
None
};
.instrument(
connection
.in_scope(|| span!(Level::INFO, "Connecting to upstream database")),
)
.await
{
Ok(upstream) => upstream,
Err(error) => {
error!(
%error,
"Error connecting to upstream database, retrying after 1s"
);
tokio::time::sleep(UPSTREAM_CONNECTION_RETRY_INTERVAL).await;
continue;
}
};

let schema_search_path = if let Some(upstream) = &mut upstream {
// TODO(ENG-1710): figure out a better error handling story for this task
upstream.schema_search_path().await.unwrap()
} else {
Default::default()
match upstream.schema_search_path().await {
Ok(ssp) => break ssp,
Err(error) => {
error!(
%error,
"Error loading schema search path from upstream database, retrying after 1s"
);
tokio::time::sleep(LOAD_SCHEMA_SEARCH_PATH_RETRY_INTERVAL).await;
continue;
}
}
}
};

let noria =
Expand Down

0 comments on commit 020f58a

Please sign in to comment.