Skip to content

Commit

Permalink
server: Run all controller requests in the background
Browse files Browse the repository at this point in the history
Rather than launching three dedicated tasks, each to handle
a completely distinct subset of controller requests (but implemented
using the same pattern match!) it's way simpler to just spawn a new
tokio task for each controller request - we have basically everything in
place to do this safely, including RwLocks around all the stuff that
matters, a *channel* rather than a return value to send our response,
etc. This significantly simplifies the implementation of the controller
request handling code, and also makes all controller requests as
non-blocking and concurrent as possible!

Refs: ENG-3058
Change-Id: Id4f624d7826300a8062079575c8c194a312ec53c
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/4956
Tested-by: Buildkite CI
Reviewed-by: Luke Osborne <luke.o@readyset.io>
  • Loading branch information
glittershark committed May 19, 2023
1 parent 8869ae6 commit 120fb30
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 130 deletions.
25 changes: 1 addition & 24 deletions readyset-server/src/controller/inner.rs
Expand Up @@ -37,7 +37,7 @@ use tokio::time::sleep;
use tracing::{debug, error, info, warn};

use crate::controller::state::{DfState, DfStateHandle};
use crate::controller::{ControllerRequest, ControllerState, Worker, WorkerIdentifier};
use crate::controller::{ControllerState, Worker, WorkerIdentifier};
use crate::coordination::DomainDescriptor;
use crate::worker::WorkerRequestKind;

Expand Down Expand Up @@ -761,26 +761,3 @@ impl Leader {
}
}
}

/// Helper method to distinguish if the given [`ControllerRequest`] actually
/// requires modifying the dataflow graph state.
pub(super) fn request_type(req: &ControllerRequest) -> ControllerRequestType {
match (&req.method, req.path.as_ref()) {
(&Method::GET, "/flush_partial")
| (&Method::GET | &Method::POST, "/controller_uri")
| (&Method::POST, "/extend_recipe")
| (&Method::POST, "/remove_query")
| (&Method::POST, "/remove_all_queries")
| (&Method::POST, "/set_replication_offset")
| (&Method::POST, "/replicate_readers")
| (&Method::POST, "/remove_node") => ControllerRequestType::Write,
(&Method::POST, "/dry_run") => ControllerRequestType::DryRun,
_ => ControllerRequestType::Read,
}
}

pub(super) enum ControllerRequestType {
Write,
Read,
DryRun,
}
118 changes: 12 additions & 106 deletions readyset-server/src/controller/mod.rs
Expand Up @@ -28,11 +28,11 @@ use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender};
use tokio::sync::{Notify, RwLock, RwLockReadGuard, RwLockWriteGuard};
use tracing::{debug, error, info, info_span, warn};
use tracing::{error, info, info_span, warn};
use tracing_futures::Instrument;
use url::Url;

use crate::controller::inner::{ControllerRequestType, Leader};
use crate::controller::inner::Leader;
use crate::controller::migrate::Migration;
use crate::controller::sql::Recipe;
use crate::controller::state::DfState;
Expand Down Expand Up @@ -563,29 +563,6 @@ impl Controller {
.instrument(info_span!("authority")),
);

let (writer_tx, writer_rx) = tokio::sync::mpsc::channel(16);
tokio::spawn(
crate::controller::controller_req_processing_runner(
writer_rx,
self.authority.clone(),
self.inner.clone(),
self.shutdown_rx.clone(),
self.leader_ready.clone(),
)
.instrument(info_span!("write_processing")),
);
let (dry_run_tx, dry_run_rx) = tokio::sync::mpsc::channel(16);
tokio::spawn(
crate::controller::controller_req_processing_runner(
dry_run_rx,
self.authority.clone(),
self.inner.clone(),
self.shutdown_rx.clone(),
self.leader_ready.clone(),
)
.instrument(info_span!("dry_run_processing")),
);

let leader_ready = self.leader_ready.clone();
loop {
select! {
Expand All @@ -600,50 +577,13 @@ impl Controller {
}
req = self.http_rx.recv() => {
if let Some(req) = req {
// Check if the request is a write request, dry run request, or read
// request.
// If it's a read request, then we can handle the request on this thread,
// since it will just read the current dataflow state.
// If it is a write request, we pass the request to the write processing
// task, which will also handle the request in the same way, but on a
// different thread. This is how we avoid blocking reads.
// Likewise if the request is a dry run request we handle the request on a
// dedicated dry run thread. This is to avoid blocking migrations
match crate::controller::inner::request_type(&req) {
ControllerRequestType::Read => {
let leader_ready = leader_ready.load(Ordering::Acquire);
crate::controller::handle_controller_request(
req,
self.authority.clone(),
self.inner.clone(),
leader_ready
).await?;
}
ControllerRequestType::Write => {
if writer_tx.send(req).await.is_err() {
if self.shutdown_rx.signal_received() {
// If we've encountered an error but the shutdown signal has been received, the
// error probably occurred because the server is shutting down
info!("Controller shutting down after shutdown signal received");
break;
} else {
internal!("write processing handle hung up but no shutdown signal was received!")
}
}
}
ControllerRequestType::DryRun => {
if dry_run_tx.send(req).await.is_err() {
if self.shutdown_rx.signal_received() {
// If we've encountered an error but the shutdown signal has been received, the
// error probably occurred because the server is shutting down
info!("Controller shutting down after shutdown signal received");
break;
} else {
internal!("dry run processing handle hung up but no shutdown signal was received!")
}
}
}
}
let leader_ready = leader_ready.load(Ordering::Acquire);
tokio::spawn(handle_controller_request(
req,
self.authority.clone(),
self.inner.clone(),
leader_ready
));
}
else {
info!("Controller shutting down after HTTP handle dropped");
Expand Down Expand Up @@ -1066,45 +1006,12 @@ pub(crate) async fn authority_runner(
Ok(())
}

/// Designed to be spun up in a task that handles [`ControllerRequest`]s of various types.
async fn controller_req_processing_runner(
mut request_rx: Receiver<ControllerRequest>,
authority: Arc<Authority>,
leader_handle: Arc<LeaderHandle>,
mut shutdown_rx: ShutdownReceiver,
leader_ready: Arc<AtomicBool>,
) -> anyhow::Result<()> {
loop {
select! {
request = request_rx.recv() => {
if let Some(req) = request {
let leader_ready = leader_ready.load(Ordering::Acquire);
crate::controller::handle_controller_request(
req,
authority.clone(),
leader_handle.clone(),
leader_ready
).await?;
} else {
debug!("Controller shutting down after write processing handle dropped");
break;
}
},
_ = shutdown_rx.recv() => {
debug!("Write processing task shutting down after shutdown signal received");
break;
}
}
}
Ok(())
}

async fn handle_controller_request(
req: ControllerRequest,
authority: Arc<Authority>,
leader_handle: Arc<LeaderHandle>,
leader_ready: bool,
) -> ReadySetResult<()> {
) {
let ControllerRequest {
method,
path,
Expand All @@ -1131,10 +1038,10 @@ async fn handle_controller_request(
Ok(Ok(r)) => Ok(Ok(r)),
Ok(Err(ReadySetError::NoQuorum)) => Err(StatusCode::SERVICE_UNAVAILABLE),
Ok(Err(ReadySetError::UnknownEndpoint)) => Err(StatusCode::NOT_FOUND),
Ok(Err(e)) => Ok(Err(bincode::serialize(&e)?)),
// something else failed:
Err(ReadySetError::NotLeader) => Err(StatusCode::SERVICE_UNAVAILABLE),
Err(e) => Ok(Err(bincode::serialize(&e)?)),
Ok(Err(e)) | Err(e) => Ok(Err(bincode::serialize(&e)
.expect("Bincode serialization of ReadySetError should not fail"))),
}
};

Expand All @@ -1153,7 +1060,6 @@ async fn handle_controller_request(
if reply_tx.send(ret).is_err() {
warn!("client hung up");
}
Ok(())
}

#[cfg(test)]
Expand Down

0 comments on commit 120fb30

Please sign in to comment.