diff --git a/control_plane/attachment_service/src/node.rs b/control_plane/attachment_service/src/node.rs index 1f9dcef03374..27b03608fa25 100644 --- a/control_plane/attachment_service/src/node.rs +++ b/control_plane/attachment_service/src/node.rs @@ -1,6 +1,16 @@ -use pageserver_api::controller_api::{NodeAvailability, NodeSchedulingPolicy}; +use std::{str::FromStr, time::Duration}; + +use hyper::StatusCode; +use pageserver_api::{ + controller_api::{ + NodeAvailability, NodeRegisterRequest, NodeSchedulingPolicy, TenantLocateResponseShard, + }, + shard::TenantShardId, +}; +use pageserver_client::mgmt_api; use serde::Serialize; -use utils::id::NodeId; +use tokio_util::sync::CancellationToken; +use utils::{backoff, id::NodeId}; use crate::persistence::NodePersistence; @@ -12,16 +22,29 @@ use crate::persistence::NodePersistence; /// implementation of serialization on this type is only for debug dumps. #[derive(Clone, Serialize)] pub(crate) struct Node { - pub(crate) id: NodeId, + id: NodeId, + + availability: NodeAvailability, + scheduling: NodeSchedulingPolicy, - pub(crate) availability: NodeAvailability, - pub(crate) scheduling: NodeSchedulingPolicy, + listen_http_addr: String, + listen_http_port: u16, - pub(crate) listen_http_addr: String, - pub(crate) listen_http_port: u16, + listen_pg_addr: String, + listen_pg_port: u16, - pub(crate) listen_pg_addr: String, - pub(crate) listen_pg_port: u16, + // This cancellation token means "stop any RPCs in flight to this node, and don't start + // any more". It is not related to process shutdown. + #[serde(skip)] + cancel: CancellationToken, +} + +/// When updating [`Node::availability`] we use this type to indicate to the caller +/// whether/how they changed it. +pub(crate) enum AvailabilityTransition { + ToActive, + ToOffline, + Unchanged, } impl Node { @@ -29,6 +52,71 @@ impl Node { format!("http://{}:{}", self.listen_http_addr, self.listen_http_port) } + pub(crate) fn get_id(&self) -> NodeId { + self.id + } + + pub(crate) fn set_scheduling(&mut self, scheduling: NodeSchedulingPolicy) { + self.scheduling = scheduling + } + + /// Does this registration request match `self`? This is used when deciding whether a registration + /// request should be allowed to update an existing record with the same node ID. + pub(crate) fn registration_match(&self, register_req: &NodeRegisterRequest) -> bool { + self.id == register_req.node_id + && self.listen_http_addr == register_req.listen_http_addr + && self.listen_http_port == register_req.listen_http_port + && self.listen_pg_addr == register_req.listen_pg_addr + && self.listen_pg_port == register_req.listen_pg_port + } + + /// For a shard located on this node, populate a response object + /// with this node's address information. + pub(crate) fn shard_location(&self, shard_id: TenantShardId) -> TenantLocateResponseShard { + TenantLocateResponseShard { + shard_id, + node_id: self.id, + listen_http_addr: self.listen_http_addr.clone(), + listen_http_port: self.listen_http_port, + listen_pg_addr: self.listen_pg_addr.clone(), + listen_pg_port: self.listen_pg_port, + } + } + + pub(crate) fn set_availability( + &mut self, + availability: NodeAvailability, + ) -> AvailabilityTransition { + use NodeAvailability::*; + let transition = match (self.availability, availability) { + (Offline, Active) => { + // Give the node a new cancellation token, effectively resetting it to un-cancelled. Any + // users of previously-cloned copies of the node will still see the old cancellation + // state. For example, Reconcilers in flight will have to complete and be spawned + // again to realize that the node has become available. + self.cancel = CancellationToken::new(); + AvailabilityTransition::ToActive + } + (Active, Offline) => { + // Fire the node's cancellation token to cancel any in-flight API requests to it + self.cancel.cancel(); + AvailabilityTransition::ToOffline + } + _ => AvailabilityTransition::Unchanged, + }; + self.availability = availability; + transition + } + + /// Whether we may send API requests to this node. + pub(crate) fn is_available(&self) -> bool { + // When we clone a node, [`Self::availability`] is a snapshot, but [`Self::cancel`] holds + // a reference to the original Node's cancellation status. Checking both of these results + // in a "pessimistic" check where we will consider a Node instance unavailable if it was unavailable + // when we cloned it, or if the original Node instance's cancellation token was fired. + matches!(self.availability, NodeAvailability::Active) && !self.cancel.is_cancelled() + } + /// Is this node elegible to have work scheduled onto it? pub(crate) fn may_schedule(&self) -> bool { match self.availability { @@ -44,6 +132,26 @@ impl Node { } } + pub(crate) fn new( + id: NodeId, + listen_http_addr: String, + listen_http_port: u16, + listen_pg_addr: String, + listen_pg_port: u16, + ) -> Self { + Self { + id, + listen_http_addr, + listen_http_port, + listen_pg_addr, + listen_pg_port, + scheduling: NodeSchedulingPolicy::Filling, + // TODO: we shouldn't really call this Active until we've heartbeated it. + availability: NodeAvailability::Active, + cancel: CancellationToken::new(), + } + } + pub(crate) fn to_persistent(&self) -> NodePersistence { NodePersistence { node_id: self.id.0 as i64, @@ -54,4 +162,96 @@ impl Node { listen_pg_port: self.listen_pg_port as i32, } } + + pub(crate) fn from_persistent(np: NodePersistence) -> Self { + Self { + id: NodeId(np.node_id as u64), + // At startup we consider a node offline until proven otherwise. + availability: NodeAvailability::Offline, + scheduling: NodeSchedulingPolicy::from_str(&np.scheduling_policy) + .expect("Bad scheduling policy in DB"), + listen_http_addr: np.listen_http_addr, + listen_http_port: np.listen_http_port as u16, + listen_pg_addr: np.listen_pg_addr, + listen_pg_port: np.listen_pg_port as u16, + cancel: CancellationToken::new(), + } + } + + /// Wrapper for issuing requests to pageserver management API: takes care of generic + /// retry/backoff for retryable HTTP status codes. + /// + /// This will return None to indicate cancellation. Cancellation may happen from + /// the cancellation token passed in, or from Self's cancellation token (i.e. node + /// going offline). + pub(crate) async fn with_client_retries( + &self, + mut op: O, + jwt: &Option, + warn_threshold: u32, + max_retries: u32, + timeout: Duration, + cancel: &CancellationToken, + ) -> Option> + where + O: FnMut(mgmt_api::Client) -> F, + F: std::future::Future>, + { + fn is_fatal(e: &mgmt_api::Error) -> bool { + use mgmt_api::Error::*; + match e { + ReceiveBody(_) | ReceiveErrorBody(_) => false, + ApiError(StatusCode::SERVICE_UNAVAILABLE, _) + | ApiError(StatusCode::GATEWAY_TIMEOUT, _) + | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false, + ApiError(_, _) => true, + Cancelled => true, + } + } + + backoff::retry( + || { + let http_client = reqwest::ClientBuilder::new() + .timeout(timeout) + .build() + .expect("Failed to construct HTTP client"); + + let client = + mgmt_api::Client::from_client(http_client, self.base_url(), jwt.as_deref()); + + let node_cancel_fut = self.cancel.cancelled(); + + let op_fut = op(client); + + async { + tokio::select! { + r = op_fut=> {r}, + _ = node_cancel_fut => { + Err(mgmt_api::Error::Cancelled) + }} + } + }, + is_fatal, + warn_threshold, + max_retries, + &format!( + "Call to node {} ({}:{}) management API", + self.id, self.listen_http_addr, self.listen_http_port + ), + cancel, + ) + .await + } +} + +impl std::fmt::Display for Node { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{} ({})", self.id, self.listen_http_addr) + } +} + +impl std::fmt::Debug for Node { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{} ({})", self.id, self.listen_http_addr) + } } diff --git a/control_plane/attachment_service/src/reconciler.rs b/control_plane/attachment_service/src/reconciler.rs index 0fa6e8e2f8d5..603da9bf022c 100644 --- a/control_plane/attachment_service/src/reconciler.rs +++ b/control_plane/attachment_service/src/reconciler.rs @@ -1,6 +1,5 @@ use crate::persistence::Persistence; use crate::service; -use pageserver_api::controller_api::NodeAvailability; use pageserver_api::models::{ LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig, }; @@ -28,15 +27,16 @@ pub(super) struct Reconciler { pub(crate) shard: ShardIdentity, pub(crate) generation: Option, pub(crate) intent: TargetState, + + /// Nodes not referenced by [`Self::intent`], from which we should try + /// to detach this tenant shard. + pub(crate) detach: Vec, + pub(crate) config: TenantConfig, pub(crate) observed: ObservedState, pub(crate) service_config: service::Config, - /// A snapshot of the pageservers as they were when we were asked - /// to reconcile. - pub(crate) pageservers: Arc>, - /// A hook to notify the running postgres instances when we change the location /// of a tenant. Use this via [`Self::compute_notify`] to update our failure flag /// and guarantee eventual retries. @@ -67,29 +67,37 @@ pub(super) struct Reconciler { /// and the TargetState is just the instruction for a particular Reconciler run. #[derive(Debug)] pub(crate) struct TargetState { - pub(crate) attached: Option, - pub(crate) secondary: Vec, + pub(crate) attached: Option, + pub(crate) secondary: Vec, } impl TargetState { - pub(crate) fn from_intent(intent: &IntentState) -> Self { + pub(crate) fn from_intent(nodes: &HashMap, intent: &IntentState) -> Self { Self { - attached: *intent.get_attached(), - secondary: intent.get_secondary().clone(), - } - } - - fn all_pageservers(&self) -> Vec { - let mut result = self.secondary.clone(); - if let Some(node_id) = &self.attached { - result.push(*node_id); + attached: intent.get_attached().map(|n| { + nodes + .get(&n) + .expect("Intent attached referenced non-existent node") + .clone() + }), + secondary: intent + .get_secondary() + .iter() + .map(|n| { + nodes + .get(n) + .expect("Intent secondary referenced non-existent node") + .clone() + }) + .collect(), } - result } } #[derive(thiserror::Error, Debug)] pub(crate) enum ReconcileError { + #[error(transparent)] + Remote(#[from] mgmt_api::Error), #[error(transparent)] Notify(#[from] NotifyError), #[error("Cancelled")] @@ -101,45 +109,83 @@ pub(crate) enum ReconcileError { impl Reconciler { async fn location_config( &mut self, - node_id: NodeId, + node: &Node, config: LocationConfig, flush_ms: Option, lazy: bool, - ) -> anyhow::Result<()> { - let node = self - .pageservers - .get(&node_id) - .expect("Pageserver may not be removed while referenced"); - + ) -> Result<(), ReconcileError> { self.observed .locations - .insert(node.id, ObservedStateLocation { conf: None }); - - tracing::info!("location_config({}) calling: {:?}", node_id, config); - let client = - mgmt_api::Client::new(node.base_url(), self.service_config.jwt_token.as_deref()); - client - .location_config(self.tenant_shard_id, config.clone(), flush_ms, lazy) - .await?; - tracing::info!("location_config({}) complete: {:?}", node_id, config); + .insert(node.get_id(), ObservedStateLocation { conf: None }); + + // TODO: amend locations that use long-polling: they will hit this timeout. + let timeout = Duration::from_secs(25); + + tracing::info!("location_config({node}) calling: {:?}", config); + let tenant_shard_id = self.tenant_shard_id; + let config_ref = &config; + match node + .with_client_retries( + |client| async move { + let config = config_ref.clone(); + client + .location_config(tenant_shard_id, config.clone(), flush_ms, lazy) + .await + }, + &self.service_config.jwt_token, + 1, + 3, + timeout, + &self.cancel, + ) + .await + { + Some(Ok(_)) => {} + Some(Err(e)) => return Err(e.into()), + None => return Err(ReconcileError::Cancel), + }; + tracing::info!("location_config({node}) complete: {:?}", config); self.observed .locations - .insert(node.id, ObservedStateLocation { conf: Some(config) }); + .insert(node.get_id(), ObservedStateLocation { conf: Some(config) }); Ok(()) } + fn get_node(&self, node_id: &NodeId) -> Option<&Node> { + if let Some(node) = self.intent.attached.as_ref() { + if node.get_id() == *node_id { + return Some(node); + } + } + + if let Some(node) = self + .intent + .secondary + .iter() + .find(|n| n.get_id() == *node_id) + { + return Some(node); + } + + if let Some(node) = self.detach.iter().find(|n| n.get_id() == *node_id) { + return Some(node); + } + + None + } + async fn maybe_live_migrate(&mut self) -> Result<(), ReconcileError> { - let destination = if let Some(node_id) = self.intent.attached { - match self.observed.locations.get(&node_id) { + let destination = if let Some(node) = &self.intent.attached { + match self.observed.locations.get(&node.get_id()) { Some(conf) => { // We will do a live migration only if the intended destination is not // currently in an attached state. match &conf.conf { Some(conf) if conf.mode == LocationConfigMode::Secondary => { // Fall through to do a live migration - node_id + node } None | Some(_) => { // Attached or uncertain: don't do a live migration, proceed @@ -152,7 +198,7 @@ impl Reconciler { None => { // Our destination is not attached: maybe live migrate if some other // node is currently attached. Fall through. - node_id + node } } } else { @@ -165,15 +211,13 @@ impl Reconciler { for (node_id, state) in &self.observed.locations { if let Some(observed_conf) = &state.conf { if observed_conf.mode == LocationConfigMode::AttachedSingle { - let node = self - .pageservers - .get(node_id) - .expect("Nodes may not be removed while referenced"); // We will only attempt live migration if the origin is not offline: this // avoids trying to do it while reconciling after responding to an HA failover. - if !matches!(node.availability, NodeAvailability::Offline) { - origin = Some(*node_id); - break; + if let Some(node) = self.get_node(node_id) { + if node.is_available() { + origin = Some(node.clone()); + break; + } } } } @@ -186,7 +230,7 @@ impl Reconciler { // We have an origin and a destination: proceed to do the live migration tracing::info!("Live migrating {}->{}", origin, destination); - self.live_migrate(origin, destination).await?; + self.live_migrate(origin, destination.clone()).await?; Ok(()) } @@ -194,13 +238,8 @@ impl Reconciler { async fn get_lsns( &self, tenant_shard_id: TenantShardId, - node_id: &NodeId, + node: &Node, ) -> anyhow::Result> { - let node = self - .pageservers - .get(node_id) - .expect("Pageserver may not be removed while referenced"); - let client = mgmt_api::Client::new(node.base_url(), self.service_config.jwt_token.as_deref()); @@ -211,19 +250,27 @@ impl Reconciler { .collect()) } - async fn secondary_download(&self, tenant_shard_id: TenantShardId, node_id: &NodeId) { - let node = self - .pageservers - .get(node_id) - .expect("Pageserver may not be removed while referenced"); - - let client = - mgmt_api::Client::new(node.base_url(), self.service_config.jwt_token.as_deref()); - - match client.tenant_secondary_download(tenant_shard_id).await { - Ok(()) => {} - Err(_) => { - tracing::info!(" (skipping, destination wasn't in secondary mode)") + async fn secondary_download( + &self, + tenant_shard_id: TenantShardId, + node: &Node, + ) -> Result<(), ReconcileError> { + match node + .with_client_retries( + |client| async move { client.tenant_secondary_download(tenant_shard_id).await }, + &self.service_config.jwt_token, + 1, + 1, + Duration::from_secs(60), + &self.cancel, + ) + .await + { + None => Err(ReconcileError::Cancel), + Some(Ok(_)) => Ok(()), + Some(Err(e)) => { + tracing::info!(" (skipping destination download: {})", e); + Ok(()) } } } @@ -231,17 +278,14 @@ impl Reconciler { async fn await_lsn( &self, tenant_shard_id: TenantShardId, - pageserver_id: &NodeId, + node: &Node, baseline: HashMap, ) -> anyhow::Result<()> { loop { - let latest = match self.get_lsns(tenant_shard_id, pageserver_id).await { + let latest = match self.get_lsns(tenant_shard_id, node).await { Ok(l) => l, Err(e) => { - println!( - "🕑 Can't get LSNs on pageserver {} yet, waiting ({e})", - pageserver_id - ); + tracing::info!("🕑 Can't get LSNs on node {node} yet, waiting ({e})",); std::thread::sleep(Duration::from_millis(500)); continue; } @@ -251,7 +295,7 @@ impl Reconciler { for (timeline_id, baseline_lsn) in &baseline { match latest.get(timeline_id) { Some(latest_lsn) => { - println!("🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}"); + tracing::info!("🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}"); if latest_lsn < baseline_lsn { any_behind = true; } @@ -266,7 +310,7 @@ impl Reconciler { } if !any_behind { - println!("✅ LSN caught up. Proceeding..."); + tracing::info!("✅ LSN caught up. Proceeding..."); break; } else { std::thread::sleep(Duration::from_millis(500)); @@ -278,11 +322,11 @@ impl Reconciler { pub async fn live_migrate( &mut self, - origin_ps_id: NodeId, - dest_ps_id: NodeId, - ) -> anyhow::Result<()> { + origin_ps: Node, + dest_ps: Node, + ) -> Result<(), ReconcileError> { // `maybe_live_migrate` is responsibble for sanity of inputs - assert!(origin_ps_id != dest_ps_id); + assert!(origin_ps.get_id() != dest_ps.get_id()); fn build_location_config( shard: &ShardIdentity, @@ -302,10 +346,7 @@ impl Reconciler { } } - tracing::info!( - "🔁 Switching origin pageserver {} to stale mode", - origin_ps_id - ); + tracing::info!("🔁 Switching origin node {origin_ps} to stale mode",); // FIXME: it is incorrect to use self.generation here, we should use the generation // from the ObservedState of the origin pageserver (it might be older than self.generation) @@ -316,26 +357,18 @@ impl Reconciler { self.generation, None, ); - self.location_config( - origin_ps_id, - stale_conf, - Some(Duration::from_secs(10)), - false, - ) - .await?; + self.location_config(&origin_ps, stale_conf, Some(Duration::from_secs(10)), false) + .await?; - let baseline_lsns = Some(self.get_lsns(self.tenant_shard_id, &origin_ps_id).await?); + let baseline_lsns = Some(self.get_lsns(self.tenant_shard_id, &origin_ps).await?); // If we are migrating to a destination that has a secondary location, warm it up first - if let Some(destination_conf) = self.observed.locations.get(&dest_ps_id) { + if let Some(destination_conf) = self.observed.locations.get(&dest_ps.get_id()) { if let Some(destination_conf) = &destination_conf.conf { if destination_conf.mode == LocationConfigMode::Secondary { - tracing::info!( - "🔁 Downloading latest layers to destination pageserver {}", - dest_ps_id, - ); - self.secondary_download(self.tenant_shard_id, &dest_ps_id) - .await; + tracing::info!("🔁 Downloading latest layers to destination node {dest_ps}",); + self.secondary_download(self.tenant_shard_id, &dest_ps) + .await?; } } } @@ -343,7 +376,7 @@ impl Reconciler { // Increment generation before attaching to new pageserver self.generation = Some( self.persistence - .increment_generation(self.tenant_shard_id, dest_ps_id) + .increment_generation(self.tenant_shard_id, dest_ps.get_id()) .await?, ); @@ -355,23 +388,23 @@ impl Reconciler { None, ); - tracing::info!("🔁 Attaching to pageserver {}", dest_ps_id); - self.location_config(dest_ps_id, dest_conf, None, false) + tracing::info!("🔁 Attaching to pageserver {dest_ps}"); + self.location_config(&dest_ps, dest_conf, None, false) .await?; if let Some(baseline) = baseline_lsns { tracing::info!("🕑 Waiting for LSN to catch up..."); - self.await_lsn(self.tenant_shard_id, &dest_ps_id, baseline) + self.await_lsn(self.tenant_shard_id, &dest_ps, baseline) .await?; } - tracing::info!("🔁 Notifying compute to use pageserver {}", dest_ps_id); + tracing::info!("🔁 Notifying compute to use pageserver {dest_ps}"); // During a live migration it is unhelpful to proceed if we couldn't notify compute: if we detach // the origin without notifying compute, we will render the tenant unavailable. while let Err(e) = self.compute_notify().await { match e { - NotifyError::Fatal(_) => return Err(anyhow::anyhow!(e)), + NotifyError::Fatal(_) => return Err(ReconcileError::Notify(e)), _ => { tracing::warn!( "Live migration blocked by compute notification error, retrying: {e}" @@ -389,22 +422,19 @@ impl Reconciler { None, Some(LocationConfigSecondary { warm: true }), ); - self.location_config(origin_ps_id, origin_secondary_conf.clone(), None, false) + self.location_config(&origin_ps, origin_secondary_conf.clone(), None, false) .await?; // TODO: we should also be setting the ObservedState on earlier API calls, in case we fail // partway through. In fact, all location conf API calls should be in a wrapper that sets // the observed state to None, then runs, then sets it to what we wrote. self.observed.locations.insert( - origin_ps_id, + origin_ps.get_id(), ObservedStateLocation { conf: Some(origin_secondary_conf), }, ); - println!( - "🔁 Switching to AttachedSingle mode on pageserver {}", - dest_ps_id - ); + tracing::info!("🔁 Switching to AttachedSingle mode on node {dest_ps}",); let dest_final_conf = build_location_config( &self.shard, &self.config, @@ -412,16 +442,61 @@ impl Reconciler { self.generation, None, ); - self.location_config(dest_ps_id, dest_final_conf.clone(), None, false) + self.location_config(&dest_ps, dest_final_conf.clone(), None, false) .await?; self.observed.locations.insert( - dest_ps_id, + dest_ps.get_id(), ObservedStateLocation { conf: Some(dest_final_conf), }, ); - println!("✅ Migration complete"); + tracing::info!("✅ Migration complete"); + + Ok(()) + } + + async fn maybe_refresh_observed(&mut self) -> Result<(), ReconcileError> { + // If the attached node has uncertain state, read it from the pageserver before proceeding: this + // is important to avoid spurious generation increments. + // + // We don't need to do this for secondary/detach locations because it's harmless to just PUT their + // location conf, whereas for attached locations it can interrupt clients if we spuriously destroy/recreate + // the `Timeline` object in the pageserver. + + let Some(attached_node) = self.intent.attached.as_ref() else { + // Nothing to do + return Ok(()); + }; + + if matches!( + self.observed.locations.get(&attached_node.get_id()), + Some(ObservedStateLocation { conf: None }) + ) { + let tenant_shard_id = self.tenant_shard_id; + let observed_conf = match attached_node + .with_client_retries( + |client| async move { client.get_location_config(tenant_shard_id).await }, + &self.service_config.jwt_token, + 1, + 1, + Duration::from_secs(5), + &self.cancel, + ) + .await + { + Some(Ok(observed)) => observed, + Some(Err(e)) => return Err(e.into()), + None => return Err(ReconcileError::Cancel), + }; + tracing::info!("Scanned location configuration on {attached_node}: {observed_conf:?}"); + self.observed.locations.insert( + attached_node.get_id(), + ObservedStateLocation { + conf: observed_conf, + }, + ); + } Ok(()) } @@ -433,14 +508,14 @@ impl Reconciler { /// general case reconciliation where we walk through the intent by pageserver /// and call out to the pageserver to apply the desired state. pub(crate) async fn reconcile(&mut self) -> Result<(), ReconcileError> { - // TODO: if any of self.observed is None, call to remote pageservers - // to learn correct state. + // Prepare: if we have uncertain `observed` state for our would-be attachement location, then refresh it + self.maybe_refresh_observed().await?; // Special case: live migration self.maybe_live_migrate().await?; // If the attached pageserver is not attached, do so now. - if let Some(node_id) = self.intent.attached { + if let Some(node) = self.intent.attached.as_ref() { // If we are in an attached policy, then generation must have been set (null generations // are only present when a tenant is initially loaded with a secondary policy) debug_assert!(self.generation.is_some()); @@ -451,10 +526,10 @@ impl Reconciler { }; let mut wanted_conf = attached_location_conf(generation, &self.shard, &self.config); - match self.observed.locations.get(&node_id) { + match self.observed.locations.get(&node.get_id()) { Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => { // Nothing to do - tracing::info!(%node_id, "Observed configuration already correct.") + tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.") } observed => { // In all cases other than a matching observed configuration, we will @@ -492,16 +567,21 @@ impl Reconciler { if increment_generation { let generation = self .persistence - .increment_generation(self.tenant_shard_id, node_id) + .increment_generation(self.tenant_shard_id, node.get_id()) .await?; self.generation = Some(generation); wanted_conf.generation = generation.into(); } - tracing::info!(%node_id, "Observed configuration requires update."); + tracing::info!(node_id=%node.get_id(), "Observed configuration requires update."); + + // Because `node` comes from a ref to &self, clone it before calling into a &mut self + // function: this could be avoided by refactoring the state mutated by location_config into + // a separate type to Self. + let node = node.clone(); + // Use lazy=true, because we may run many of Self concurrently, and do not want to // overload the pageserver with logical size calculations. - self.location_config(node_id, wanted_conf, None, true) - .await?; + self.location_config(&node, wanted_conf, None, true).await?; self.compute_notify().await?; } } @@ -510,33 +590,27 @@ impl Reconciler { // Configure secondary locations: if these were previously attached this // implicitly downgrades them from attached to secondary. let mut changes = Vec::new(); - for node_id in &self.intent.secondary { + for node in &self.intent.secondary { let wanted_conf = secondary_location_conf(&self.shard, &self.config); - match self.observed.locations.get(node_id) { + match self.observed.locations.get(&node.get_id()) { Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => { // Nothing to do - tracing::info!(%node_id, "Observed configuration already correct.") + tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.") } _ => { // In all cases other than a matching observed configuration, we will // reconcile this location. - tracing::info!(%node_id, "Observed configuration requires update."); - changes.push((*node_id, wanted_conf)) + tracing::info!(node_id=%node.get_id(), "Observed configuration requires update."); + changes.push((node.clone(), wanted_conf)) } } } // Detach any extraneous pageservers that are no longer referenced // by our intent. - let all_pageservers = self.intent.all_pageservers(); - for node_id in self.observed.locations.keys() { - if all_pageservers.contains(node_id) { - // We are only detaching pageservers that aren't used at all. - continue; - } - + for node in &self.detach { changes.push(( - *node_id, + node.clone(), LocationConfig { mode: LocationConfigMode::Detached, generation: None, @@ -549,11 +623,11 @@ impl Reconciler { )); } - for (node_id, conf) in changes { + for (node, conf) in changes { if self.cancel.is_cancelled() { return Err(ReconcileError::Cancel); } - self.location_config(node_id, conf, None, false).await?; + self.location_config(&node, conf, None, false).await?; } Ok(()) @@ -562,12 +636,12 @@ impl Reconciler { pub(crate) async fn compute_notify(&mut self) -> Result<(), NotifyError> { // Whenever a particular Reconciler emits a notification, it is always notifying for the intended // destination. - if let Some(node_id) = self.intent.attached { + if let Some(node) = &self.intent.attached { let result = self .compute_hook .notify( self.tenant_shard_id, - node_id, + node.get_id(), self.shard.stripe_size, &self.cancel, ) @@ -576,7 +650,7 @@ impl Reconciler { // It is up to the caller whether they want to drop out on this error, but they don't have to: // in general we should avoid letting unavailability of the cloud control plane stop us from // making progress. - tracing::warn!("Failed to notify compute of attached pageserver {node_id}: {e}"); + tracing::warn!("Failed to notify compute of attached pageserver {node}: {e}"); // Set this flag so that in our ReconcileResult we will set the flag on the shard that it // needs to retry at some point. self.compute_notify_failure = true; diff --git a/control_plane/attachment_service/src/scheduler.rs b/control_plane/attachment_service/src/scheduler.rs index 87fce3df2577..26a2707e8d73 100644 --- a/control_plane/attachment_service/src/scheduler.rs +++ b/control_plane/attachment_service/src/scheduler.rs @@ -43,7 +43,7 @@ impl Scheduler { let mut scheduler_nodes = HashMap::new(); for node in nodes { scheduler_nodes.insert( - node.id, + node.get_id(), SchedulerNode { shard_count: 0, may_schedule: node.may_schedule(), @@ -68,7 +68,7 @@ impl Scheduler { let mut expect_nodes: HashMap = HashMap::new(); for node in nodes { expect_nodes.insert( - node.id, + node.get_id(), SchedulerNode { shard_count: 0, may_schedule: node.may_schedule(), @@ -156,7 +156,7 @@ impl Scheduler { pub(crate) fn node_upsert(&mut self, node: &Node) { use std::collections::hash_map::Entry::*; - match self.nodes.entry(node.id) { + match self.nodes.entry(node.get_id()) { Occupied(mut entry) => { entry.get_mut().may_schedule = node.may_schedule(); } @@ -255,7 +255,6 @@ impl Scheduler { pub(crate) mod test_utils { use crate::node::Node; - use pageserver_api::controller_api::{NodeAvailability, NodeSchedulingPolicy}; use std::collections::HashMap; use utils::id::NodeId; /// Test helper: synthesize the requested number of nodes, all in active state. @@ -264,18 +263,17 @@ pub(crate) mod test_utils { pub(crate) fn make_test_nodes(n: u64) -> HashMap { (1..n + 1) .map(|i| { - ( - NodeId(i), - Node { - id: NodeId(i), - availability: NodeAvailability::Active, - scheduling: NodeSchedulingPolicy::Active, - listen_http_addr: format!("httphost-{i}"), - listen_http_port: 80 + i as u16, - listen_pg_addr: format!("pghost-{i}"), - listen_pg_port: 5432 + i as u16, - }, - ) + (NodeId(i), { + let node = Node::new( + NodeId(i), + format!("httphost-{i}"), + 80 + i as u16, + format!("pghost-{i}"), + 5432 + i as u16, + ); + assert!(node.is_available()); + node + }) }) .collect() } diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index d162ab5c65b3..f41c4f89b96b 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -16,9 +16,9 @@ use futures::{stream::FuturesUnordered, StreamExt}; use hyper::StatusCode; use pageserver_api::{ controller_api::{ - NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, - TenantCreateResponse, TenantCreateResponseShard, TenantLocateResponse, - TenantLocateResponseShard, TenantShardMigrateRequest, TenantShardMigrateResponse, + NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, TenantCreateResponse, + TenantCreateResponseShard, TenantLocateResponse, TenantShardMigrateRequest, + TenantShardMigrateResponse, }, models::TenantConfigRequest, }; @@ -39,7 +39,6 @@ use pageserver_client::mgmt_api; use tokio_util::sync::CancellationToken; use tracing::instrument; use utils::{ - backoff, completion::Barrier, generation::Generation, http::error::ApiError, @@ -50,7 +49,7 @@ use utils::{ use crate::{ compute_hook::{self, ComputeHook}, - node::Node, + node::{AvailabilityTransition, Node}, persistence::{split_state::SplitState, DatabaseError, Persistence, TenantShardPersistence}, reconciler::attached_location_conf, scheduler::Scheduler, @@ -201,7 +200,8 @@ impl Service { async fn startup_reconcile(self: &Arc) { // For all tenant shards, a vector of observed states on nodes (where None means // indeterminate, same as in [`ObservedStateLocation`]) - let mut observed = HashMap::new(); + let mut observed: HashMap)>> = + HashMap::new(); let mut nodes_online = HashSet::new(); @@ -236,7 +236,8 @@ impl Service { nodes_online.insert(node_id); for (tenant_shard_id, conf_opt) in tenant_shards { - observed.insert(tenant_shard_id, (node_id, conf_opt)); + let shard_observations = observed.entry(tenant_shard_id).or_default(); + shard_observations.push((node_id, conf_opt)); } } @@ -252,27 +253,28 @@ impl Service { let mut new_nodes = (**nodes).clone(); for (node_id, node) in new_nodes.iter_mut() { if nodes_online.contains(node_id) { - node.availability = NodeAvailability::Active; + node.set_availability(NodeAvailability::Active); scheduler.node_upsert(node); } } *nodes = Arc::new(new_nodes); - for (tenant_shard_id, (node_id, observed_loc)) in observed { - let Some(tenant_state) = tenants.get_mut(&tenant_shard_id) else { - cleanup.push((tenant_shard_id, node_id)); - continue; - }; - - tenant_state - .observed - .locations - .insert(node_id, ObservedStateLocation { conf: observed_loc }); + for (tenant_shard_id, shard_observations) in observed { + for (node_id, observed_loc) in shard_observations { + let Some(tenant_state) = tenants.get_mut(&tenant_shard_id) else { + cleanup.push((tenant_shard_id, node_id)); + continue; + }; + tenant_state + .observed + .locations + .insert(node_id, ObservedStateLocation { conf: observed_loc }); + } } // Populate each tenant's intent state for (tenant_shard_id, tenant_state) in tenants.iter_mut() { - tenant_state.intent_from_observed(); + tenant_state.intent_from_observed(scheduler); if let Err(e) = tenant_state.schedule(scheduler) { // Non-fatal error: we are unable to properly schedule the tenant, perhaps because // not enough pageservers are available. The tenant may well still be available @@ -359,40 +361,19 @@ impl Service { for node in nodes.values() { node_list_futs.push({ async move { - let http_client = reqwest::ClientBuilder::new() - .timeout(Duration::from_secs(5)) - .build() - .expect("Failed to construct HTTP client"); - let client = mgmt_api::Client::from_client( - http_client, - node.base_url(), - self.config.jwt_token.as_deref(), - ); - - fn is_fatal(e: &mgmt_api::Error) -> bool { - use mgmt_api::Error::*; - match e { - ReceiveBody(_) | ReceiveErrorBody(_) => false, - ApiError(StatusCode::SERVICE_UNAVAILABLE, _) - | ApiError(StatusCode::GATEWAY_TIMEOUT, _) - | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false, - ApiError(_, _) => true, - } - } - - tracing::info!("Scanning shards on node {}...", node.id); - let description = format!("List locations on {}", node.id); - let response = backoff::retry( - || client.list_location_config(), - is_fatal, - 1, - 5, - &description, - &self.cancel, - ) - .await; - - (node.id, response) + tracing::info!("Scanning shards on node {node}..."); + let timeout = Duration::from_secs(5); + let response = node + .with_client_retries( + |client| async move { client.list_location_config().await }, + &self.config.jwt_token, + 1, + 5, + timeout, + &self.cancel, + ) + .await; + (node.get_id(), response) } }); } @@ -662,19 +643,9 @@ impl Service { .list_nodes() .await? .into_iter() - .map(|n| Node { - id: NodeId(n.node_id as u64), - // At startup we consider a node offline until proven otherwise. - availability: NodeAvailability::Offline, - scheduling: NodeSchedulingPolicy::from_str(&n.scheduling_policy) - .expect("Bad scheduling policy in DB"), - listen_http_addr: n.listen_http_addr, - listen_http_port: n.listen_http_port as u16, - listen_pg_addr: n.listen_pg_addr, - listen_pg_port: n.listen_pg_port as u16, - }) + .map(Node::from_persistent) .collect::>(); - let nodes: HashMap = nodes.into_iter().map(|n| (n.id, n)).collect(); + let nodes: HashMap = nodes.into_iter().map(|n| (n.get_id(), n)).collect(); tracing::info!("Loaded {} nodes from database.", nodes.len()); tracing::info!("Loading shards from database..."); @@ -701,15 +672,13 @@ impl Service { } for node_id in node_ids { tracing::info!("Creating node {} in scheduler for tests", node_id); - let node = Node { - id: NodeId(node_id as u64), - availability: NodeAvailability::Active, - scheduling: NodeSchedulingPolicy::Active, - listen_http_addr: "".to_string(), - listen_http_port: 123, - listen_pg_addr: "".to_string(), - listen_pg_port: 123, - }; + let node = Node::new( + NodeId(node_id as u64), + "".to_string(), + 123, + "".to_string(), + 123, + ); scheduler.node_upsert(&node); } @@ -975,6 +944,12 @@ impl Service { // Ordering: we must persist generation number updates before making them visible in the in-memory state let incremented_generations = self.persistence.re_attach(reattach_req.node_id).await?; + tracing::info!( + node_id=%reattach_req.node_id, + "Incremented {} tenant shards' generations", + incremented_generations.len() + ); + // Apply the updated generation to our in-memory state let mut locked = self.inner.write().unwrap(); @@ -987,7 +962,6 @@ impl Service { id: tenant_shard_id, gen: new_gen.into().unwrap(), }); - // Apply the new generation number to our in-memory state let shard_state = locked.tenants.get_mut(&tenant_shard_id); let Some(shard_state) = shard_state else { @@ -1023,6 +997,14 @@ impl Service { if let Some(conf) = observed.conf.as_mut() { conf.generation = new_gen.into(); } + } else { + // This node has no observed state for the shard: perhaps it was offline + // when the pageserver restarted. Insert a None, so that the Reconciler + // will be prompted to learn the location's state before it makes changes. + shard_state + .observed + .locations + .insert(reattach_req.node_id, ObservedStateLocation { conf: None }); } // TODO: cancel/restart any running reconciliation for this tenant, it might be trying @@ -1685,7 +1667,7 @@ impl Service { .map_err(|e| { ApiError::InternalServerError(anyhow::anyhow!( "Error doing time travel recovery for shard {tenant_shard_id} on node {}: {e}", - node.id + node )) })?; } @@ -1739,10 +1721,7 @@ impl Service { // Secondary downloads are always advisory: if something fails, we nevertheless report success, so that whoever // is calling us will proceed with whatever migration they're doing, albeit with a slightly less warm cache // than they had hoped for. - tracing::warn!( - "Ignoring tenant secondary download error from pageserver {}: {e}", - node.id, - ); + tracing::warn!("Ignoring tenant secondary download error from pageserver {node}: {e}",); } Ok(()) @@ -1780,13 +1759,11 @@ impl Service { // surface immediately as an error to our caller. let status = client.tenant_delete(tenant_shard_id).await.map_err(|e| { ApiError::InternalServerError(anyhow::anyhow!( - "Error deleting shard {tenant_shard_id} on node {}: {e}", - node.id + "Error deleting shard {tenant_shard_id} on node {node}: {e}", )) })?; tracing::info!( - "Shard {tenant_shard_id} on node {}, delete returned {}", - node.id, + "Shard {tenant_shard_id} on node {node}, delete returned {}", status ); if status == StatusCode::ACCEPTED { @@ -1885,10 +1862,9 @@ impl Service { create_req: TimelineCreateRequest, ) -> Result { tracing::info!( - "Creating timeline on shard {}/{}, attached to node {}", + "Creating timeline on shard {}/{}, attached to node {node}", tenant_shard_id, create_req.new_timeline_id, - node.id ); let client = mgmt_api::Client::new(node.base_url(), jwt.as_deref()); @@ -2012,10 +1988,7 @@ impl Service { jwt: Option, ) -> Result { tracing::info!( - "Deleting timeline on shard {}/{}, attached to node {}", - tenant_shard_id, - timeline_id, - node.id + "Deleting timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}", ); let client = mgmt_api::Client::new(node.base_url(), jwt.as_deref()); @@ -2024,8 +1997,7 @@ impl Service { .await .map_err(|e| { ApiError::InternalServerError(anyhow::anyhow!( - "Error deleting timeline {timeline_id} on {tenant_shard_id} on node {}: {e}", - node.id + "Error deleting timeline {timeline_id} on {tenant_shard_id} on node {node}: {e}", )) }) } @@ -2126,14 +2098,7 @@ impl Service { .get(&node_id) .expect("Pageservers may not be deleted while referenced"); - result.push(TenantLocateResponseShard { - shard_id: *tenant_shard_id, - node_id, - listen_http_addr: node.listen_http_addr.clone(), - listen_http_port: node.listen_http_port, - listen_pg_addr: node.listen_pg_addr.clone(), - listen_pg_port: node.listen_pg_port, - }); + result.push(node.shard_location(*tenant_shard_id)); match &shard_params { None => { @@ -2324,7 +2289,7 @@ impl Service { // populate the correct generation as part of its transaction, to protect us // against racing with changes in the state of the parent. generation: None, - generation_pageserver: Some(target.node.id.0 as i64), + generation_pageserver: Some(target.node.get_id().0 as i64), placement_policy: serde_json::to_string(&policy).unwrap(), // TODO: get the config out of the map config: serde_json::to_string(&TenantConfig::default()).unwrap(), @@ -2526,10 +2491,10 @@ impl Service { ))); }; - if node.availability != NodeAvailability::Active { + if !node.is_available() { // Warn but proceed: the caller may intend to manually adjust the placement of // a shard even if the node is down, e.g. if intervening during an incident. - tracing::warn!("Migrating to an unavailable node ({})", node.id); + tracing::warn!("Migrating to unavailable node {node}"); } let Some(shard) = tenants.get_mut(&tenant_shard_id) else { @@ -2784,11 +2749,7 @@ impl Service { if let Some(node) = locked.nodes.get(®ister_req.node_id) { // Note that we do not do a total equality of the struct, because we don't require // the availability/scheduling states to agree for a POST to be idempotent. - if node.listen_http_addr == register_req.listen_http_addr - && node.listen_http_port == register_req.listen_http_port - && node.listen_pg_addr == register_req.listen_pg_addr - && node.listen_pg_port == register_req.listen_pg_port - { + if node.registration_match(®ister_req) { tracing::info!( "Node {} re-registered with matching address", register_req.node_id @@ -2812,16 +2773,14 @@ impl Service { // Ordering: we must persist the new node _before_ adding it to in-memory state. // This ensures that before we use it for anything or expose it via any external // API, it is guaranteed to be available after a restart. - let new_node = Node { - id: register_req.node_id, - listen_http_addr: register_req.listen_http_addr, - listen_http_port: register_req.listen_http_port, - listen_pg_addr: register_req.listen_pg_addr, - listen_pg_port: register_req.listen_pg_port, - scheduling: NodeSchedulingPolicy::Filling, - // TODO: we shouldn't really call this Active until we've heartbeated it. - availability: NodeAvailability::Active, - }; + let new_node = Node::new( + register_req.node_id, + register_req.listen_http_addr, + register_req.listen_http_port, + register_req.listen_pg_addr, + register_req.listen_pg_port, + ); + // TODO: idempotency if the node already exists in the database self.persistence.insert_node(&new_node).await?; @@ -2866,29 +2825,14 @@ impl Service { )); }; - let mut offline_transition = false; - let mut active_transition = false; - - if let Some(availability) = &config_req.availability { - match (availability, &node.availability) { - (NodeAvailability::Offline, NodeAvailability::Active) => { - tracing::info!("Node {} transition to offline", config_req.node_id); - offline_transition = true; - } - (NodeAvailability::Active, NodeAvailability::Offline) => { - tracing::info!("Node {} transition to active", config_req.node_id); - active_transition = true; - } - _ => { - tracing::info!("Node {} no change during config", config_req.node_id); - // No change - } - }; - node.availability = *availability; - } + let availability_transition = if let Some(availability) = &config_req.availability { + node.set_availability(*availability) + } else { + AvailabilityTransition::Unchanged + }; if let Some(scheduling) = config_req.scheduling { - node.scheduling = scheduling; + node.set_scheduling(scheduling); // TODO: once we have a background scheduling ticker for fill/drain, kick it // to wake up and start working. @@ -2899,74 +2843,80 @@ impl Service { let new_nodes = Arc::new(new_nodes); - if offline_transition { - let mut tenants_affected: usize = 0; - for (tenant_shard_id, tenant_state) in tenants { - if let Some(observed_loc) = - tenant_state.observed.locations.get_mut(&config_req.node_id) - { - // When a node goes offline, we set its observed configuration to None, indicating unknown: we will - // not assume our knowledge of the node's configuration is accurate until it comes back online - observed_loc.conf = None; - } + match availability_transition { + AvailabilityTransition::ToOffline => { + tracing::info!("Node {} transition to offline", config_req.node_id); + let mut tenants_affected: usize = 0; + for (tenant_shard_id, tenant_state) in tenants { + if let Some(observed_loc) = + tenant_state.observed.locations.get_mut(&config_req.node_id) + { + // When a node goes offline, we set its observed configuration to None, indicating unknown: we will + // not assume our knowledge of the node's configuration is accurate until it comes back online + observed_loc.conf = None; + } - if tenant_state.intent.demote_attached(config_req.node_id) { - tenant_state.sequence = tenant_state.sequence.next(); - match tenant_state.schedule(scheduler) { - Err(e) => { - // It is possible that some tenants will become unschedulable when too many pageservers - // go offline: in this case there isn't much we can do other than make the issue observable. - // TODO: give TenantState a scheduling error attribute to be queried later. - tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", config_req.node_id); - } - Ok(()) => { - if tenant_state - .maybe_reconcile( - result_tx.clone(), - &new_nodes, - &compute_hook, - &self.config, - &self.persistence, - &self.gate, - &self.cancel, - ) - .is_some() - { - tenants_affected += 1; - }; + if tenant_state.intent.demote_attached(config_req.node_id) { + tenant_state.sequence = tenant_state.sequence.next(); + match tenant_state.schedule(scheduler) { + Err(e) => { + // It is possible that some tenants will become unschedulable when too many pageservers + // go offline: in this case there isn't much we can do other than make the issue observable. + // TODO: give TenantState a scheduling error attribute to be queried later. + tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", config_req.node_id); + } + Ok(()) => { + if tenant_state + .maybe_reconcile( + result_tx.clone(), + &new_nodes, + &compute_hook, + &self.config, + &self.persistence, + &self.gate, + &self.cancel, + ) + .is_some() + { + tenants_affected += 1; + }; + } } } } + tracing::info!( + "Launched {} reconciler tasks for tenants affected by node {} going offline", + tenants_affected, + config_req.node_id + ) } - tracing::info!( - "Launched {} reconciler tasks for tenants affected by node {} going offline", - tenants_affected, - config_req.node_id - ) - } - - if active_transition { - // When a node comes back online, we must reconcile any tenant that has a None observed - // location on the node. - for tenant_state in locked.tenants.values_mut() { - if let Some(observed_loc) = - tenant_state.observed.locations.get_mut(&config_req.node_id) - { - if observed_loc.conf.is_none() { - tenant_state.maybe_reconcile( - result_tx.clone(), - &new_nodes, - &compute_hook, - &self.config, - &self.persistence, - &self.gate, - &self.cancel, - ); + AvailabilityTransition::ToActive => { + tracing::info!("Node {} transition to active", config_req.node_id); + // When a node comes back online, we must reconcile any tenant that has a None observed + // location on the node. + for tenant_state in locked.tenants.values_mut() { + if let Some(observed_loc) = + tenant_state.observed.locations.get_mut(&config_req.node_id) + { + if observed_loc.conf.is_none() { + tenant_state.maybe_reconcile( + result_tx.clone(), + &new_nodes, + &compute_hook, + &self.config, + &self.persistence, + &self.gate, + &self.cancel, + ); + } } } - } - // TODO: in the background, we should balance work back onto this pageserver + // TODO: in the background, we should balance work back onto this pageserver + } + AvailabilityTransition::Unchanged => { + tracing::info!("Node {} no change during config", config_req.node_id); + } } locked.nodes = new_nodes; diff --git a/control_plane/attachment_service/src/tenant_state.rs b/control_plane/attachment_service/src/tenant_state.rs index 33b7d578c7ca..ddb98665278d 100644 --- a/control_plane/attachment_service/src/tenant_state.rs +++ b/control_plane/attachment_service/src/tenant_state.rs @@ -1,7 +1,10 @@ -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::Duration, +}; use crate::{metrics, persistence::TenantShardPersistence}; -use pageserver_api::controller_api::NodeAvailability; use pageserver_api::{ models::{LocationConfig, LocationConfigMode, TenantConfig}, shard::{ShardIdentity, TenantShardId}, @@ -370,7 +373,7 @@ impl TenantState { /// [`ObservedState`], even if it violates my [`PlacementPolicy`]. Call [`Self::schedule`] next, /// to get an intent state that complies with placement policy. The overall goal is to do scheduling /// in a way that makes use of any configured locations that already exist in the outside world. - pub(crate) fn intent_from_observed(&mut self) { + pub(crate) fn intent_from_observed(&mut self, scheduler: &mut Scheduler) { // Choose an attached location by filtering observed locations, and then sorting to get the highest // generation let mut attached_locs = self @@ -395,7 +398,7 @@ impl TenantState { attached_locs.sort_by_key(|i| i.1); if let Some((node_id, _gen)) = attached_locs.into_iter().last() { - self.intent.attached = Some(*node_id); + self.intent.set_attached(scheduler, Some(*node_id)); } // All remaining observed locations generate secondary intents. This includes None @@ -406,7 +409,7 @@ impl TenantState { // will take care of promoting one of these secondaries to be attached. self.observed.locations.keys().for_each(|node_id| { if Some(*node_id) != self.intent.attached { - self.intent.secondary.push(*node_id); + self.intent.push_secondary(scheduler, *node_id); } }); } @@ -564,7 +567,9 @@ impl TenantState { } } - fn dirty(&self) -> bool { + fn dirty(&self, nodes: &Arc>) -> bool { + let mut dirty_nodes = HashSet::new(); + if let Some(node_id) = self.intent.attached { // Maybe panic: it is a severe bug if we try to attach while generation is null. let generation = self @@ -575,7 +580,7 @@ impl TenantState { match self.observed.locations.get(&node_id) { Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {} Some(_) | None => { - return true; + dirty_nodes.insert(node_id); } } } @@ -585,7 +590,7 @@ impl TenantState { match self.observed.locations.get(node_id) { Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {} Some(_) | None => { - return true; + dirty_nodes.insert(*node_id); } } } @@ -593,17 +598,18 @@ impl TenantState { for node_id in self.observed.locations.keys() { if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) { // We have observed state that isn't part of our intent: need to clean it up. - return true; + dirty_nodes.insert(*node_id); } } - // Even if there is no pageserver work to be done, if we have a pending notification to computes, - // wake up a reconciler to send it. - if self.pending_compute_notification { - return true; - } + dirty_nodes.retain(|node_id| { + nodes + .get(node_id) + .map(|n| n.is_available()) + .unwrap_or(false) + }); - false + !dirty_nodes.is_empty() } #[allow(clippy::too_many_arguments)] @@ -625,15 +631,20 @@ impl TenantState { let node = pageservers .get(node_id) .expect("Nodes may not be removed while referenced"); - if observed_loc.conf.is_none() - && !matches!(node.availability, NodeAvailability::Offline) - { + if observed_loc.conf.is_none() && node.is_available() { dirty_observed = true; break; } } - if !self.dirty() && !dirty_observed { + let active_nodes_dirty = self.dirty(pageservers); + + // Even if there is no pageserver work to be done, if we have a pending notification to computes, + // wake up a reconciler to send it. + let do_reconcile = + active_nodes_dirty || dirty_observed || self.pending_compute_notification; + + if !do_reconcile { tracing::info!("Not dirty, no reconciliation needed."); return None; } @@ -663,6 +674,21 @@ impl TenantState { } } + // Build list of nodes from which the reconciler should detach + let mut detach = Vec::new(); + for node_id in self.observed.locations.keys() { + if self.intent.get_attached() != &Some(*node_id) + && !self.intent.secondary.contains(node_id) + { + detach.push( + pageservers + .get(node_id) + .expect("Intent references non-existent pageserver") + .clone(), + ) + } + } + // Reconcile in flight for a stale sequence? Our sequence's task will wait for it before // doing our sequence's work. let old_handle = self.reconciler.take(); @@ -677,14 +703,15 @@ impl TenantState { self.sequence = self.sequence.next(); let reconciler_cancel = cancel.child_token(); + let reconciler_intent = TargetState::from_intent(pageservers, &self.intent); let mut reconciler = Reconciler { tenant_shard_id: self.tenant_shard_id, shard: self.shard, generation: self.generation, - intent: TargetState::from_intent(&self.intent), + intent: reconciler_intent, + detach, config: self.config.clone(), observed: self.observed.clone(), - pageservers: pageservers.clone(), compute_hook: compute_hook.clone(), service_config: service_config.clone(), _gate_guard: gate_guard, @@ -819,7 +846,10 @@ impl TenantState { #[cfg(test)] pub(crate) mod tests { - use pageserver_api::shard::{ShardCount, ShardNumber}; + use pageserver_api::{ + controller_api::NodeAvailability, + shard::{ShardCount, ShardNumber}, + }; use utils::id::TenantId; use crate::scheduler::test_utils::make_test_nodes; @@ -878,7 +908,10 @@ pub(crate) mod tests { assert_eq!(tenant_state.intent.secondary.len(), 2); // Update the scheduler state to indicate the node is offline - nodes.get_mut(&attached_node_id).unwrap().availability = NodeAvailability::Offline; + nodes + .get_mut(&attached_node_id) + .unwrap() + .set_availability(NodeAvailability::Offline); scheduler.node_upsert(nodes.get(&attached_node_id).unwrap()); // Scheduling the node should promote the still-available secondary node to attached @@ -897,4 +930,54 @@ pub(crate) mod tests { Ok(()) } + + #[test] + fn intent_from_observed() -> anyhow::Result<()> { + let nodes = make_test_nodes(3); + let mut scheduler = Scheduler::new(nodes.values()); + + let mut tenant_state = make_test_tenant_shard(PlacementPolicy::Double(1)); + + tenant_state.observed.locations.insert( + NodeId(3), + ObservedStateLocation { + conf: Some(LocationConfig { + mode: LocationConfigMode::AttachedMulti, + generation: Some(2), + secondary_conf: None, + shard_number: tenant_state.shard.number.0, + shard_count: tenant_state.shard.count.literal(), + shard_stripe_size: tenant_state.shard.stripe_size.0, + tenant_conf: TenantConfig::default(), + }), + }, + ); + + tenant_state.observed.locations.insert( + NodeId(2), + ObservedStateLocation { + conf: Some(LocationConfig { + mode: LocationConfigMode::AttachedStale, + generation: Some(1), + secondary_conf: None, + shard_number: tenant_state.shard.number.0, + shard_count: tenant_state.shard.count.literal(), + shard_stripe_size: tenant_state.shard.stripe_size.0, + tenant_conf: TenantConfig::default(), + }), + }, + ); + + tenant_state.intent_from_observed(&mut scheduler); + + // The highest generationed attached location gets used as attached + assert_eq!(tenant_state.intent.attached, Some(NodeId(3))); + // Other locations get used as secondary + assert_eq!(tenant_state.intent.secondary, vec![NodeId(2)]); + + scheduler.consistency_check(nodes.values(), [&tenant_state].into_iter())?; + + tenant_state.intent.clear(&mut scheduler); + Ok(()) + } } diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index 4dde7bdf0baf..732eb951c9fd 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -7,7 +7,7 @@ use utils::{ pub mod util; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Client { mgmt_api_endpoint: String, authorization_header: Option, @@ -24,6 +24,9 @@ pub enum Error { #[error("pageserver API: {1}")] ApiError(StatusCode, String), + + #[error("Cancelled")] + Cancelled, } pub type Result = std::result::Result; @@ -287,6 +290,21 @@ impl Client { .map_err(Error::ReceiveBody) } + pub async fn get_location_config( + &self, + tenant_shard_id: TenantShardId, + ) -> Result> { + let path = format!( + "{}/v1/location_config/{tenant_shard_id}", + self.mgmt_api_endpoint + ); + self.request(Method::GET, &path, ()) + .await? + .json() + .await + .map_err(Error::ReceiveBody) + } + pub async fn timeline_create( &self, tenant_shard_id: TenantShardId, diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 6aaf1ab27e46..eafad9ab7383 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -14,6 +14,7 @@ use hyper::header; use hyper::StatusCode; use hyper::{Body, Request, Response, Uri}; use metrics::launch_timestamp::LaunchTimestamp; +use pageserver_api::models::LocationConfig; use pageserver_api::models::LocationConfigListResponse; use pageserver_api::models::ShardParameters; use pageserver_api::models::TenantDetails; @@ -1519,6 +1520,29 @@ async fn list_location_config_handler( json_response(StatusCode::OK, result) } +async fn get_location_config_handler( + request: Request, + _cancel: CancellationToken, +) -> Result, ApiError> { + let state = get_state(&request); + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + let slot = state.tenant_manager.get(tenant_shard_id); + + let Some(slot) = slot else { + return Err(ApiError::NotFound( + anyhow::anyhow!("Tenant shard not found").into(), + )); + }; + + let result: Option = match slot { + TenantSlot::Attached(t) => Some(t.get_location_conf()), + TenantSlot::Secondary(s) => Some(s.get_location_conf()), + TenantSlot::InProgress(_) => None, + }; + + json_response(StatusCode::OK, result) +} + // Do a time travel recovery on the given tenant/tenant shard. Tenant needs to be detached // (from all pageservers) as it invalidates consistency assumptions. async fn tenant_time_travel_remote_storage_handler( @@ -2223,6 +2247,9 @@ pub fn make_router( .get("/v1/location_config", |r| { api_handler(r, list_location_config_handler) }) + .get("/v1/location_config/:tenant_id", |r| { + api_handler(r, get_location_config_handler) + }) .put( "/v1/tenant/:tenant_shard_id/time_travel_remote_storage", |r| api_handler(r, tenant_time_travel_remote_storage_handler), diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 06b61d4631a5..fc08b3c82e27 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -1358,6 +1358,16 @@ impl TenantManager { } } + pub(crate) fn get(&self, tenant_shard_id: TenantShardId) -> Option { + let locked = self.tenants.read().unwrap(); + match &*locked { + TenantsMap::Initializing => None, + TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => { + map.get(&tenant_shard_id).cloned() + } + } + } + pub(crate) async fn delete_tenant( &self, tenant_shard_id: TenantShardId,