Skip to content

Commit

Permalink
server: Add controller RPC for notifying about domain failure
Browse files Browse the repository at this point in the history
Add a controller RPC that a worker can use to notify the controller that
only one of the domain replicas running on that worker has failed. This:

1. Removes the domain replica's worker assignment from the df state
2. Kills all downstream domains of that replica (without regard for
   replica index - which is fine for now since we only fan-out replicas
   at reader domains, but for efficiency's sake should be fixed to track
   replica fanout once we allow replicating internal and/or base
   domains)
3. Runs recovery (in the background!) to try to re-place the failed and
   killed domains.

That background recovery process uses a new, somewhat general mechanism
in the controller for notifying about background task failure - this
could hypothetically be reused elsewhere for more fallible
controller-related background tasks

Note that this whole thing is *quite* deadlock-prone without running the
entire process (most notably the acquisition of the dataflow state
handle's writer lock) in the background: since lots of things in the
controller hold on to the dataflow state handle's write lock while
blocking on requests to workers, we need to make sure that we're *never*
holding the dataflow state handle's writer lock while the worker is
waiting on a request *to the controller* (eg in the other direction).

Refs: REA-3186
Change-Id: I6527a94a3b93dfe324e27ac29e8ac1fb011abd17
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/5642
Reviewed-by: Luke Osborne <luke@readyset.io>
Reviewed-by: Dan Wilbanks <dan@readyset.io>
Tested-by: Buildkite CI
  • Loading branch information
glittershark committed Aug 15, 2023
1 parent 569d1a2 commit 30490ac
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 4 deletions.
10 changes: 9 additions & 1 deletion readyset-client/src/controller.rs
Expand Up @@ -26,7 +26,7 @@ use url::Url;
use crate::consensus::{Authority, AuthorityControl};
use crate::debug::info::GraphInfo;
use crate::debug::stats;
use crate::internal::DomainIndex;
use crate::internal::{DomainIndex, ReplicaAddress};
use crate::metrics::MetricsDump;
use crate::recipe::changelist::ChangeList;
use crate::recipe::{ExtendRecipeResult, ExtendRecipeSpec, MigrationStatus};
Expand Down Expand Up @@ -933,4 +933,12 @@ impl ReadySetHandle {
) -> impl Future<Output = ReadySetResult<()>> + '_ {
self.rpc("failpoint", (name, action), self.request_timeout)
}

/// Notify the controller that a running domain replica has died
pub fn domain_died(
&mut self,
replica_address: ReplicaAddress,
) -> impl Future<Output = ReadySetResult<()>> + '_ {
self.rpc("domain_died", replica_address, self.request_timeout)
}
}
108 changes: 106 additions & 2 deletions readyset-server/src/controller/inner.rs
Expand Up @@ -15,10 +15,11 @@ use database_utils::UpstreamConfig;
use dataflow::DomainIndex;
use failpoint_macros::failpoint;
use futures::future::Fuse;
use futures::FutureExt;
use futures::{Future, FutureExt};
use hyper::Method;
use readyset_client::consensus::{Authority, AuthorityControl};
use readyset_client::debug::stats::PersistentStats;
use readyset_client::internal::ReplicaAddress;
use readyset_client::recipe::{ExtendRecipeResult, ExtendRecipeSpec, MigrationStatus};
use readyset_client::status::{ReadySetStatus, SnapshotStatus};
use readyset_client::{SingleKeyEviction, WorkerDescriptor};
Expand All @@ -32,7 +33,7 @@ use replicators::ReplicatorMessage;
use reqwest::Url;
use slotmap::{DefaultKey, Key, KeyData, SlotMap};
use tokio::select;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::{self, UnboundedSender};
use tokio::sync::{watch, Mutex};
use tokio::task::JoinHandle;
use tokio::time::sleep;
Expand Down Expand Up @@ -82,6 +83,9 @@ pub struct Leader {
/// `/migration_status`.
running_migrations: Mutex<SlotMap<DefaultKey, RunningMigration>>,

/// A channel that will be notified if a background task for the controller fails
pub(super) background_task_failed: mpsc::Sender<ReadySetError>,

pub(super) running_recovery: Option<watch::Receiver<ReadySetResult<()>>>,
}

Expand Down Expand Up @@ -577,6 +581,11 @@ impl Leader {
self.dataflow_state_handle.commit(writer, authority).await?;
return_serialized!(());
}
(&Method::POST, "/domain_died") => {
let body = bincode::deserialize(&body)?;
self.handle_failed_domain(body).await?;
return_serialized!(());
}
_ => Err(ReadySetError::UnknownEndpoint),
}
}
Expand Down Expand Up @@ -720,11 +729,105 @@ impl Leader {
.await
}

pub(super) async fn handle_failed_domain(&self, addr: ReplicaAddress) -> ReadySetResult<()> {
// It's important that this happens in the background not just for parallelism /
// performance, but because the worker thread blocks on this RPC completing before it can
// accept any additional domain requests from us - both the "run this domain" request that
// gets sent by the eventual recovery, and other domain requests which might already be
// holding on to a lock of the dataflow state handle (eg as part of a migration)

let dataflow_state_handle = Arc::clone(&self.dataflow_state_handle);
let authority = Arc::clone(&self.authority);
self.spawn_background_task(async move {
warn!(domain = %addr, "Handling failure of domain");
let mut writer = dataflow_state_handle.write().await;
let ds = writer.as_mut();

// 1. Remove the domain from our internal state
let Some(dh) = ds.domains.get_mut(&addr.domain_index) else {
warn!(domain = %addr, "Notified about failure of unknown domain");
return Ok(())
};
dh.remove_assignment(addr.shard, addr.replica);

let mut domains_to_recover = vec![addr.domain_index];

// 2. Kill and clean up any downstream domains
let downstream_domains = ds.downstream_domains(addr.domain_index)?;
if !downstream_domains.is_empty() {
info!(
num_downstream_domains = downstream_domains.len(),
"Killing domains downstream of failed domain"
);
domains_to_recover.extend(downstream_domains.iter().copied());
ds.kill_domains(downstream_domains).await?;
}

// 3. Try to recover all now-non-running domains
info!(?domains_to_recover, "Recovering domains");
#[allow(clippy::indexing_slicing)] // Internal data structure invariant
let domain_nodes: HashMap<_, HashSet<_>> = domains_to_recover
.into_iter()
.map(|d| (d, ds.domain_nodes[&d].values().copied().collect()))
.collect();
info!(num_domains = %domain_nodes.len(), "Recovering domains");
let dmp = ds.plan_recovery(&domain_nodes).await?;

if dmp.failed_placement().is_empty() {
info!("Finished planning recovery with all domains placed");
} else {
info!(
num_unplaced_domains = dmp.failed_placement().len(),
"Finished planning recovery with some domains unplaced"
);
}

dataflow_state_handle.commit(writer, &authority).await?;

// 4. Apply the plan for recovery.
let mut writer = dataflow_state_handle.write().await;
if let Err(error) = dmp.apply(writer.as_mut()).await {
error!(%error, "Error applying domain migration plan");
Err(error)
} else {
dataflow_state_handle.commit(writer, &authority).await?;
Ok(())
}
})
.await;
Ok(())
}

/// Spawn a background task which notifies the controller if it fails or panics
async fn spawn_background_task<F>(&self, fut: F)
where
F: Future<Output = ReadySetResult<()>> + Send + 'static,
{
let task = tokio::spawn(fut);

let failed_tx = self.background_task_failed.clone();
tokio::spawn(async move {
let send_res = match task.await {
Ok(Ok(())) => Ok(()),
Ok(Err(e)) => failed_tx.send(e).await,
Err(e) => failed_tx.send(internal_err!("{e}")).await,
};

if let Err(mpsc::error::SendError(error)) = send_res {
error!(
%error,
"Controller background task failed, but could not notify controller of error"
);
}
});
}

/// Construct a new `Leader`
pub(super) fn new(
state: ControllerState,
controller_uri: Url,
authority: Arc<Authority>,
background_task_failed: mpsc::Sender<ReadySetError>,
replicator_statement_logging: bool,
replicator_config: UpstreamConfig,
worker_request_timeout: Duration,
Expand All @@ -744,6 +847,7 @@ impl Leader {
authority,
worker_request_timeout,
running_migrations: Default::default(),
background_task_failed,
running_recovery: None,
}
}
Expand Down
16 changes: 15 additions & 1 deletion readyset-server/src/controller/mod.rs
Expand Up @@ -30,7 +30,7 @@ use readyset_util::shutdown::ShutdownReceiver;
use replicators::ReplicatorMessage;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender};
use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tracing::{error, info, info_span, warn};
use tracing_futures::Instrument;
Expand Down Expand Up @@ -357,6 +357,10 @@ pub struct Controller {
http_rx: Receiver<ControllerRequest>,
/// Receives requests from the controller's `Handle`.
handle_rx: Receiver<HandleRequest>,
/// Receives notifications that background tasks have failed
background_task_failed_rx: Receiver<ReadySetError>,
/// Clone to send notifications that background tasks have failed
background_task_failed_tx: Sender<ReadySetError>,
/// A `ControllerDescriptor` that describes this server instance.
our_descriptor: ControllerDescriptor,
/// The descriptor of the worker this controller's server is running.
Expand Down Expand Up @@ -393,12 +397,15 @@ impl Controller {
) -> Self {
// If we don't have an upstream, we allow permissive writes to base tables.
let permissive_writes = config.replicator_config.upstream_db_url.is_none();
let (background_task_failed_tx, background_task_failed_rx) = mpsc::channel(1);
Self {
inner: Arc::new(LeaderHandle::new()),
authority,
worker_tx,
http_rx: controller_rx,
handle_rx,
background_task_failed_tx,
background_task_failed_rx,
our_descriptor,
worker_descriptor,
config,
Expand Down Expand Up @@ -495,10 +502,12 @@ impl Controller {
AuthorityUpdate::WonLeaderElection(state) => {
info!("won leader election, creating Leader");
gauge!(recorded::CONTROLLER_IS_LEADER, 1f64);
let background_task_failed_tx = self.background_task_failed_tx.clone();
let mut leader = Leader::new(
state,
self.our_descriptor.controller_uri.clone(),
self.authority.clone(),
background_task_failed_tx,
self.config.replicator_statement_logging,
self.config.replicator_config.clone(),
self.config.worker_request_timeout,
Expand Down Expand Up @@ -705,6 +714,11 @@ impl Controller {
leader.running_recovery = None;
}
}
err = self.background_task_failed_rx.recv() => {
if let Some(err) = err {
return Err(err)
}
}
_ = self.shutdown_rx.recv() => {
info!("Controller shutting down after shutdown signal received");
break;
Expand Down

0 comments on commit 30490ac

Please sign in to comment.