diff --git a/readyset-server/src/controller/inner.rs b/readyset-server/src/controller/inner.rs index 0a6251678e..3bd9aac1e2 100644 --- a/readyset-server/src/controller/inner.rs +++ b/readyset-server/src/controller/inner.rs @@ -37,7 +37,7 @@ use tokio::time::sleep; use tracing::{debug, error, info, warn}; use crate::controller::state::{DfState, DfStateHandle}; -use crate::controller::{ControllerRequest, ControllerState, Worker, WorkerIdentifier}; +use crate::controller::{ControllerState, Worker, WorkerIdentifier}; use crate::coordination::DomainDescriptor; use crate::worker::WorkerRequestKind; @@ -761,26 +761,3 @@ impl Leader { } } } - -/// Helper method to distinguish if the given [`ControllerRequest`] actually -/// requires modifying the dataflow graph state. -pub(super) fn request_type(req: &ControllerRequest) -> ControllerRequestType { - match (&req.method, req.path.as_ref()) { - (&Method::GET, "/flush_partial") - | (&Method::GET | &Method::POST, "/controller_uri") - | (&Method::POST, "/extend_recipe") - | (&Method::POST, "/remove_query") - | (&Method::POST, "/remove_all_queries") - | (&Method::POST, "/set_replication_offset") - | (&Method::POST, "/replicate_readers") - | (&Method::POST, "/remove_node") => ControllerRequestType::Write, - (&Method::POST, "/dry_run") => ControllerRequestType::DryRun, - _ => ControllerRequestType::Read, - } -} - -pub(super) enum ControllerRequestType { - Write, - Read, - DryRun, -} diff --git a/readyset-server/src/controller/mod.rs b/readyset-server/src/controller/mod.rs index 7eca577931..c0ae7d0274 100644 --- a/readyset-server/src/controller/mod.rs +++ b/readyset-server/src/controller/mod.rs @@ -28,11 +28,11 @@ use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender}; use tokio::sync::{Notify, RwLock, RwLockReadGuard, RwLockWriteGuard}; -use tracing::{debug, error, info, info_span, warn}; +use tracing::{error, info, info_span, warn}; use tracing_futures::Instrument; use url::Url; -use crate::controller::inner::{ControllerRequestType, Leader}; +use crate::controller::inner::Leader; use crate::controller::migrate::Migration; use crate::controller::sql::Recipe; use crate::controller::state::DfState; @@ -563,29 +563,6 @@ impl Controller { .instrument(info_span!("authority")), ); - let (writer_tx, writer_rx) = tokio::sync::mpsc::channel(16); - tokio::spawn( - crate::controller::controller_req_processing_runner( - writer_rx, - self.authority.clone(), - self.inner.clone(), - self.shutdown_rx.clone(), - self.leader_ready.clone(), - ) - .instrument(info_span!("write_processing")), - ); - let (dry_run_tx, dry_run_rx) = tokio::sync::mpsc::channel(16); - tokio::spawn( - crate::controller::controller_req_processing_runner( - dry_run_rx, - self.authority.clone(), - self.inner.clone(), - self.shutdown_rx.clone(), - self.leader_ready.clone(), - ) - .instrument(info_span!("dry_run_processing")), - ); - let leader_ready = self.leader_ready.clone(); loop { select! { @@ -600,50 +577,13 @@ impl Controller { } req = self.http_rx.recv() => { if let Some(req) = req { - // Check if the request is a write request, dry run request, or read - // request. - // If it's a read request, then we can handle the request on this thread, - // since it will just read the current dataflow state. - // If it is a write request, we pass the request to the write processing - // task, which will also handle the request in the same way, but on a - // different thread. This is how we avoid blocking reads. - // Likewise if the request is a dry run request we handle the request on a - // dedicated dry run thread. This is to avoid blocking migrations - match crate::controller::inner::request_type(&req) { - ControllerRequestType::Read => { - let leader_ready = leader_ready.load(Ordering::Acquire); - crate::controller::handle_controller_request( - req, - self.authority.clone(), - self.inner.clone(), - leader_ready - ).await?; - } - ControllerRequestType::Write => { - if writer_tx.send(req).await.is_err() { - if self.shutdown_rx.signal_received() { - // If we've encountered an error but the shutdown signal has been received, the - // error probably occurred because the server is shutting down - info!("Controller shutting down after shutdown signal received"); - break; - } else { - internal!("write processing handle hung up but no shutdown signal was received!") - } - } - } - ControllerRequestType::DryRun => { - if dry_run_tx.send(req).await.is_err() { - if self.shutdown_rx.signal_received() { - // If we've encountered an error but the shutdown signal has been received, the - // error probably occurred because the server is shutting down - info!("Controller shutting down after shutdown signal received"); - break; - } else { - internal!("dry run processing handle hung up but no shutdown signal was received!") - } - } - } - } + let leader_ready = leader_ready.load(Ordering::Acquire); + tokio::spawn(handle_controller_request( + req, + self.authority.clone(), + self.inner.clone(), + leader_ready + )); } else { info!("Controller shutting down after HTTP handle dropped"); @@ -1066,45 +1006,12 @@ pub(crate) async fn authority_runner( Ok(()) } -/// Designed to be spun up in a task that handles [`ControllerRequest`]s of various types. -async fn controller_req_processing_runner( - mut request_rx: Receiver, - authority: Arc, - leader_handle: Arc, - mut shutdown_rx: ShutdownReceiver, - leader_ready: Arc, -) -> anyhow::Result<()> { - loop { - select! { - request = request_rx.recv() => { - if let Some(req) = request { - let leader_ready = leader_ready.load(Ordering::Acquire); - crate::controller::handle_controller_request( - req, - authority.clone(), - leader_handle.clone(), - leader_ready - ).await?; - } else { - debug!("Controller shutting down after write processing handle dropped"); - break; - } - }, - _ = shutdown_rx.recv() => { - debug!("Write processing task shutting down after shutdown signal received"); - break; - } - } - } - Ok(()) -} - async fn handle_controller_request( req: ControllerRequest, authority: Arc, leader_handle: Arc, leader_ready: bool, -) -> ReadySetResult<()> { +) { let ControllerRequest { method, path, @@ -1131,10 +1038,10 @@ async fn handle_controller_request( Ok(Ok(r)) => Ok(Ok(r)), Ok(Err(ReadySetError::NoQuorum)) => Err(StatusCode::SERVICE_UNAVAILABLE), Ok(Err(ReadySetError::UnknownEndpoint)) => Err(StatusCode::NOT_FOUND), - Ok(Err(e)) => Ok(Err(bincode::serialize(&e)?)), // something else failed: Err(ReadySetError::NotLeader) => Err(StatusCode::SERVICE_UNAVAILABLE), - Err(e) => Ok(Err(bincode::serialize(&e)?)), + Ok(Err(e)) | Err(e) => Ok(Err(bincode::serialize(&e) + .expect("Bincode serialization of ReadySetError should not fail"))), } }; @@ -1153,7 +1060,6 @@ async fn handle_controller_request( if reply_tx.send(ret).is_err() { warn!("client hung up"); } - Ok(()) } #[cfg(test)]