Skip to content

Commit

Permalink
server: Make extend_recipe requests poll if they take too long
Browse files Browse the repository at this point in the history
The current (simplistic) behavior of *always* running `extend_recipe`
synchronously doesn't work well in the case of *very* large migrations,
which can take arbitrarily long - basically whatever we set the RPC
timeout to, there will be a migration at some point that takes longer
than that timeout. We don't want to *not* set timeouts, though, since
timeouts are important to catch cases such as network hangs and
partitions, etc. To work around that, this commit splits the
extend_recipe RPC into two parts:

1. The existing /extend_recipe RPC, which tries to run the migration
   synchronously for up to 5 seconds. If it takes longer, it leaves it
   running in the background (storing its JoinHandle inside a new
   `pending_migrations` field in the Leader) and returns a "handle" to
   it which can be used by:
2. A new /migration_status RPC, which checks the status of a migration
   that's running in the background.

The `extend_recipe` method in Controller now uses these two RPCs,
polling /migration_status every 1 second if /extend_recipe returns
Pending. This has no real effect on the user (everything still *appears*
synchronous) except that migrations which take longer than
`self.migration_timeout` no longer time out.

Fixes: ENG-3029
Change-Id: Id5e58510ca7961bd6cb3d81b2a365726453da83a
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/4925
Tested-by: Buildkite CI
Reviewed-by: Fran Noriega <fran@readyset.io>
  • Loading branch information
glittershark committed May 17, 2023
1 parent 2af1dea commit 1b77183
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 15 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 26 additions & 3 deletions readyset-client/src/controller.rs
Expand Up @@ -27,7 +27,7 @@ use crate::debug::info::GraphInfo;
use crate::debug::stats;
use crate::metrics::MetricsDump;
use crate::recipe::changelist::ChangeList;
use crate::recipe::ExtendRecipeSpec;
use crate::recipe::{ExtendRecipeResult, ExtendRecipeSpec, MigrationStatus};
use crate::replication::ReplicationOffsets;
use crate::status::ReadySetStatus;
use crate::table::{Table, TableBuilder, TableRpc};
Expand All @@ -36,6 +36,8 @@ use crate::{NodeSize, ReplicationOffset, TableStatus, ViewCreateRequest, ViewFil

mod rpc;

const EXTEND_RECIPE_POLL_INTERVAL: Duration = Duration::from_secs(1);

/// Describes a running controller instance.
///
/// A serialized version of this struct is stored in ZooKeeper so that clients can reach the
Expand Down Expand Up @@ -536,7 +538,7 @@ impl ReadySetHandle {
pub fn dry_run(
&mut self,
changes: ChangeList,
) -> impl Future<Output = ReadySetResult<()>> + '_ {
) -> impl Future<Output = ReadySetResult<ExtendRecipeResult>> + '_ {
let request = ExtendRecipeSpec::from(changes);

self.rpc("dry_run", request, self.migration_timeout)
Expand All @@ -551,7 +553,28 @@ impl ReadySetHandle {
) -> impl Future<Output = ReadySetResult<()>> + '_ {
let request = ExtendRecipeSpec::from(changes);

self.rpc("extend_recipe", request, self.migration_timeout)
async move {
match self
.rpc("extend_recipe", request, self.migration_timeout)
.await?
{
ExtendRecipeResult::Done => Ok(()),
ExtendRecipeResult::Pending(migration_id) => {
while self
.rpc::<_, MigrationStatus>(
"migration_status",
migration_id,
self.migration_timeout,
)
.await?
.is_pending()
{
tokio::time::sleep(EXTEND_RECIPE_POLL_INTERVAL).await;
}
Ok(())
}
}
}
}

/// Extend the existing recipe with the given set of queries and don't require leader ready.
Expand Down
29 changes: 29 additions & 0 deletions readyset-client/src/recipe/mod.rs
Expand Up @@ -31,3 +31,32 @@ impl<'a> From<ChangeList> for ExtendRecipeSpec<'a> {
}
}
}

/// The result of a request to extend a recipe
#[derive(Clone, Copy, Debug, Deserialize, Serialize)]
pub enum ExtendRecipeResult {
/// The extend recipe request has completed, and the graph has been successfully modified
Done,
/// The extend recipe request is still running, and its status can be queried using the given
/// token
Pending(u64),
}

/// The status of an actively running migration
#[derive(Clone, Copy, Debug, Deserialize, Serialize)]
pub enum MigrationStatus {
/// The migration has completed
Done,
/// The migration has not yet completed
Pending,
}

impl MigrationStatus {
/// Returns `true` if the migration status is [`Pending`].
///
/// [`Pending`]: MigrationStatus::Pending
#[must_use]
pub fn is_pending(&self) -> bool {
matches!(self, Self::Pending)
}
}
4 changes: 4 additions & 0 deletions readyset-errors/src/lib.rs
Expand Up @@ -654,6 +654,10 @@ pub enum ReadySetError {
#[error("Connection to the upstream database was lost: {0}")]
UpstreamConnectionLost(String),

/// An unknown pending migration was referenced
#[error("Unknown migration: {0}")]
UnknownMigration(u64),

/// Error interacting with a Consul server
#[error("Consul error: {0}")]
ConsulError(String),
Expand Down
1 change: 1 addition & 0 deletions readyset-server/Cargo.toml
Expand Up @@ -68,6 +68,7 @@ streaming-iterator = "0.1"
proptest = "1.0.0"
once_cell = "1.14"
enum-kinds = "0.5.1"
slotmap = "1.0.6"

timekeeper = { version = "0.3.2", default-features = false }
vec_map = { version = "0.8.0", features = ["eders"] }
Expand Down
90 changes: 78 additions & 12 deletions readyset-server/src/controller/inner.rs
Expand Up @@ -13,28 +13,42 @@ use std::time::Duration;

use database_utils::UpstreamConfig;
use failpoint_macros::failpoint;
use futures::future::Fuse;
use futures::FutureExt;
use hyper::Method;
use readyset_client::consensus::Authority;
use readyset_client::internal::ReplicaAddress;
use readyset_client::recipe::ExtendRecipeSpec;
use readyset_client::recipe::{ExtendRecipeResult, ExtendRecipeSpec, MigrationStatus};
use readyset_client::replication::ReplicationOffset;
use readyset_client::status::{ReadySetStatus, SnapshotStatus};
use readyset_client::WorkerDescriptor;
use readyset_errors::{ReadySetError, ReadySetResult};
use readyset_errors::{internal_err, ReadySetError, ReadySetResult};
use readyset_telemetry_reporter::TelemetrySender;
use readyset_util::futures::abort_on_panic;
use readyset_util::shutdown::ShutdownReceiver;
use readyset_version::RELEASE_VERSION;
use reqwest::Url;
use slotmap::{DefaultKey, Key, KeyData, SlotMap};
use tokio::select;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::Notify;
use tokio::sync::{Mutex, Notify};
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tracing::{debug, error, info, warn};

use crate::controller::state::{DfState, DfStateHandle};
use crate::controller::{ControllerRequest, ControllerState, Worker, WorkerIdentifier};
use crate::coordination::DomainDescriptor;
use crate::worker::WorkerRequestKind;

/// Maximum amount of time to wait for an `extend_recipe` request to run synchronously, before we
/// let it run in the background and return [`ExtendRecipeResult::Pending`].
const EXTEND_RECIPE_MAX_SYNC_TIME: Duration = Duration::from_secs(5);

/// A handle to a migration running in the background. Used as part of
/// [`Leader::running_migrations`].
type RunningMigration = Fuse<JoinHandle<ReadySetResult<()>>>;

/// The ReadySet leader, responsible for making control-plane decisions for the whole of a ReadySet
/// cluster.
///
Expand All @@ -45,7 +59,7 @@ use crate::worker::WorkerRequestKind;
/// `Migration`, which can be performed using `Leader::migrate`. Only one `Migration` can
/// occur at any given point in time.
pub struct Leader {
pub(super) dataflow_state_handle: DfStateHandle,
pub(super) dataflow_state_handle: Arc<DfStateHandle>,

pending_recovery: bool,

Expand All @@ -60,6 +74,13 @@ pub struct Leader {
pub(super) replicator_config: UpstreamConfig,
/// A client to the current authority.
pub(super) authority: Arc<Authority>,

/// A map of currently running migrations.
///
/// Requests to `/extend_recipe` run for at least [`EXTEND_RECIPE_MAX_SYNC_TIME`], after which
/// a handle to the running migration is placed here, where it can be queried via an rpc to
/// `/migration_status`.
running_migrations: Mutex<SlotMap<DefaultKey, RunningMigration>>,
}

impl Leader {
Expand Down Expand Up @@ -459,15 +480,15 @@ impl Leader {
if body.require_leader_ready {
require_leader_ready()?;
}
let ret = futures::executor::block_on(async move {
futures::executor::block_on(async move {
let mut state_copy: DfState = {
let reader = self.dataflow_state_handle.read().await;
check_quorum!(reader);
reader.clone()
};
state_copy.extend_recipe(body, true).await
})?;
return_serialized!(ret);
return_serialized!(ExtendRecipeResult::Done);
}
(&Method::GET | &Method::POST, "/supports_pagination") => {
let ds = futures::executor::block_on(self.dataflow_state_handle.read());
Expand Down Expand Up @@ -498,14 +519,58 @@ impl Leader {
require_leader_ready()?;
}
let ret = futures::executor::block_on(async move {
let mut writer = self.dataflow_state_handle.write().await;
check_quorum!(writer.as_ref());
let r = writer.as_mut().extend_recipe(body, false).await?;
self.dataflow_state_handle.commit(writer, authority).await?;
Ok(r)
check_quorum!(self.dataflow_state_handle.read().await);

// Start the migration running in the background
let dataflow_state_handle = Arc::clone(&self.dataflow_state_handle);
let authority = Arc::clone(authority);
let mut migration = tokio::spawn(async move {
let mut writer = dataflow_state_handle.write().await;
writer.as_mut().extend_recipe(body, false).await?;
dataflow_state_handle.commit(writer, &authority).await?;
Ok(())
})
.fuse();

// Either the migration completes synchronously (under
// EXTEND_RECIPE_MAX_SYNC_TIME), or we place it in `self.running_migrations` and
// return a `Pending` result.
select! {
res = &mut migration => {
res.map_err(|e| internal_err!("{e}"))?.map(|_| ExtendRecipeResult::Done)
}
_ = sleep(EXTEND_RECIPE_MAX_SYNC_TIME) => {
let mut running_migrations = self.running_migrations.lock().await;
let migration_id = running_migrations.insert(migration);
Ok(ExtendRecipeResult::Pending(migration_id.data().as_ffi()))
}
}
})?;
return_serialized!(ret);
}
(&Method::POST, "/migration_status") => {
let migration_id: u64 = bincode::deserialize(&body)?;
let migration_key = DefaultKey::from(KeyData::from_ffi(migration_id));
let ret = futures::executor::block_on(async move {
let mut running_migrations = self.running_migrations.lock().await;
let mut migration: &mut RunningMigration = running_migrations
.get_mut(migration_key)
.ok_or_else(|| ReadySetError::UnknownMigration(migration_id))?;

match (&mut migration).now_or_never() {
None => ReadySetResult::Ok(MigrationStatus::Pending),
Some(res) => {
// Migration is done, remove it from the map
// Note that this means that only one thread can poll on the status of a
// particular migration!
running_migrations.remove(migration_key);
res.map_err(|e| internal_err!("{e}"))??;
Ok(MigrationStatus::Done)
}
}
})?;
return_serialized!(ret)
}
(&Method::POST, "/remove_query") => {
require_leader_ready()?;
let query_name = bincode::deserialize(&body)?;
Expand Down Expand Up @@ -697,7 +762,7 @@ impl Leader {
// [`ControllerState`] itself.
let pending_recovery = state.dataflow_state.ingredients.node_indices().count() > 1;

let dataflow_state_handle = DfStateHandle::new(state.dataflow_state);
let dataflow_state_handle = Arc::new(DfStateHandle::new(state.dataflow_state));

Leader {
dataflow_state_handle,
Expand All @@ -711,6 +776,7 @@ impl Leader {
replicator_config,
authority,
worker_request_timeout,
running_migrations: Default::default(),
}
}
}
Expand Down

0 comments on commit 1b77183

Please sign in to comment.