Skip to content

Commit

Permalink
storage controller: robustness improvements (#7027)
Browse files Browse the repository at this point in the history
## Problem


Closes: #6847
Closes: #7006

## Summary of changes

- Pageserver API calls are wrapped in timeout/retry logic: this prevents
a reconciler getting hung on a pageserver API hang, and prevents
reconcilers having to totally retry if one API call returns a retryable
error (e.g. 503).
- Add a cancellation token to `Node`, so that when we mark a node
offline we will cancel any API calls in progress to that node, and avoid
issuing any more API calls to that offline node.
- If the dirty locations of a shard are all on offline nodes, then don't
spawn a reconciler
- In re-attach, if we have no observed state object for a tenant then
construct one with conf: None (which means "unknown"). Then in
Reconciler, implement a TODO for scanning such locations before running,
so that we will avoid spuriously incrementing a generation in the case
of a node that was offline while we started (this is the case that
tripped up #7006)
- Refactoring: make Node contents private (and thereby guarantee that
updates to availability mode reliably update the cancellation token.)
- Refactoring: don't pass the whole map of nodes into Reconciler (and
thereby remove a bunch of .expect() calls)

Some of this was discovered/tested with a new failure injection test
that will come in a separate PR, once it is stable enough for CI.
  • Loading branch information
jcsp committed Mar 7, 2024
1 parent 871977f commit d5a6a2a
Show file tree
Hide file tree
Showing 8 changed files with 747 additions and 387 deletions.
218 changes: 209 additions & 9 deletions control_plane/attachment_service/src/node.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
use pageserver_api::controller_api::{NodeAvailability, NodeSchedulingPolicy};
use std::{str::FromStr, time::Duration};

use hyper::StatusCode;
use pageserver_api::{
controller_api::{
NodeAvailability, NodeRegisterRequest, NodeSchedulingPolicy, TenantLocateResponseShard,
},
shard::TenantShardId,
};
use pageserver_client::mgmt_api;
use serde::Serialize;
use utils::id::NodeId;
use tokio_util::sync::CancellationToken;
use utils::{backoff, id::NodeId};

use crate::persistence::NodePersistence;

Expand All @@ -12,23 +22,101 @@ use crate::persistence::NodePersistence;
/// implementation of serialization on this type is only for debug dumps.
#[derive(Clone, Serialize)]
pub(crate) struct Node {
pub(crate) id: NodeId,
id: NodeId,

availability: NodeAvailability,
scheduling: NodeSchedulingPolicy,

pub(crate) availability: NodeAvailability,
pub(crate) scheduling: NodeSchedulingPolicy,
listen_http_addr: String,
listen_http_port: u16,

pub(crate) listen_http_addr: String,
pub(crate) listen_http_port: u16,
listen_pg_addr: String,
listen_pg_port: u16,

pub(crate) listen_pg_addr: String,
pub(crate) listen_pg_port: u16,
// This cancellation token means "stop any RPCs in flight to this node, and don't start
// any more". It is not related to process shutdown.
#[serde(skip)]
cancel: CancellationToken,
}

/// When updating [`Node::availability`] we use this type to indicate to the caller
/// whether/how they changed it.
pub(crate) enum AvailabilityTransition {
ToActive,
ToOffline,
Unchanged,
}

impl Node {
pub(crate) fn base_url(&self) -> String {
format!("http://{}:{}", self.listen_http_addr, self.listen_http_port)
}

pub(crate) fn get_id(&self) -> NodeId {
self.id
}

pub(crate) fn set_scheduling(&mut self, scheduling: NodeSchedulingPolicy) {
self.scheduling = scheduling
}

/// Does this registration request match `self`? This is used when deciding whether a registration
/// request should be allowed to update an existing record with the same node ID.
pub(crate) fn registration_match(&self, register_req: &NodeRegisterRequest) -> bool {
self.id == register_req.node_id
&& self.listen_http_addr == register_req.listen_http_addr
&& self.listen_http_port == register_req.listen_http_port
&& self.listen_pg_addr == register_req.listen_pg_addr
&& self.listen_pg_port == register_req.listen_pg_port
}

/// For a shard located on this node, populate a response object
/// with this node's address information.
pub(crate) fn shard_location(&self, shard_id: TenantShardId) -> TenantLocateResponseShard {
TenantLocateResponseShard {
shard_id,
node_id: self.id,
listen_http_addr: self.listen_http_addr.clone(),
listen_http_port: self.listen_http_port,
listen_pg_addr: self.listen_pg_addr.clone(),
listen_pg_port: self.listen_pg_port,
}
}

pub(crate) fn set_availability(
&mut self,
availability: NodeAvailability,
) -> AvailabilityTransition {
use NodeAvailability::*;
let transition = match (self.availability, availability) {
(Offline, Active) => {
// Give the node a new cancellation token, effectively resetting it to un-cancelled. Any
// users of previously-cloned copies of the node will still see the old cancellation
// state. For example, Reconcilers in flight will have to complete and be spawned
// again to realize that the node has become available.
self.cancel = CancellationToken::new();
AvailabilityTransition::ToActive
}
(Active, Offline) => {
// Fire the node's cancellation token to cancel any in-flight API requests to it
self.cancel.cancel();
AvailabilityTransition::ToOffline
}
_ => AvailabilityTransition::Unchanged,
};
self.availability = availability;
transition
}

/// Whether we may send API requests to this node.
pub(crate) fn is_available(&self) -> bool {
// When we clone a node, [`Self::availability`] is a snapshot, but [`Self::cancel`] holds
// a reference to the original Node's cancellation status. Checking both of these results
// in a "pessimistic" check where we will consider a Node instance unavailable if it was unavailable
// when we cloned it, or if the original Node instance's cancellation token was fired.
matches!(self.availability, NodeAvailability::Active) && !self.cancel.is_cancelled()
}

/// Is this node elegible to have work scheduled onto it?
pub(crate) fn may_schedule(&self) -> bool {
match self.availability {
Expand All @@ -44,6 +132,26 @@ impl Node {
}
}

pub(crate) fn new(
id: NodeId,
listen_http_addr: String,
listen_http_port: u16,
listen_pg_addr: String,
listen_pg_port: u16,
) -> Self {
Self {
id,
listen_http_addr,
listen_http_port,
listen_pg_addr,
listen_pg_port,
scheduling: NodeSchedulingPolicy::Filling,
// TODO: we shouldn't really call this Active until we've heartbeated it.
availability: NodeAvailability::Active,
cancel: CancellationToken::new(),
}
}

pub(crate) fn to_persistent(&self) -> NodePersistence {
NodePersistence {
node_id: self.id.0 as i64,
Expand All @@ -54,4 +162,96 @@ impl Node {
listen_pg_port: self.listen_pg_port as i32,
}
}

pub(crate) fn from_persistent(np: NodePersistence) -> Self {
Self {
id: NodeId(np.node_id as u64),
// At startup we consider a node offline until proven otherwise.
availability: NodeAvailability::Offline,
scheduling: NodeSchedulingPolicy::from_str(&np.scheduling_policy)
.expect("Bad scheduling policy in DB"),
listen_http_addr: np.listen_http_addr,
listen_http_port: np.listen_http_port as u16,
listen_pg_addr: np.listen_pg_addr,
listen_pg_port: np.listen_pg_port as u16,
cancel: CancellationToken::new(),
}
}

/// Wrapper for issuing requests to pageserver management API: takes care of generic
/// retry/backoff for retryable HTTP status codes.
///
/// This will return None to indicate cancellation. Cancellation may happen from
/// the cancellation token passed in, or from Self's cancellation token (i.e. node
/// going offline).
pub(crate) async fn with_client_retries<T, O, F>(
&self,
mut op: O,
jwt: &Option<String>,
warn_threshold: u32,
max_retries: u32,
timeout: Duration,
cancel: &CancellationToken,
) -> Option<mgmt_api::Result<T>>
where
O: FnMut(mgmt_api::Client) -> F,
F: std::future::Future<Output = mgmt_api::Result<T>>,
{
fn is_fatal(e: &mgmt_api::Error) -> bool {
use mgmt_api::Error::*;
match e {
ReceiveBody(_) | ReceiveErrorBody(_) => false,
ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
| ApiError(StatusCode::GATEWAY_TIMEOUT, _)
| ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
ApiError(_, _) => true,
Cancelled => true,
}
}

backoff::retry(
|| {
let http_client = reqwest::ClientBuilder::new()
.timeout(timeout)
.build()
.expect("Failed to construct HTTP client");

let client =
mgmt_api::Client::from_client(http_client, self.base_url(), jwt.as_deref());

let node_cancel_fut = self.cancel.cancelled();

let op_fut = op(client);

async {
tokio::select! {
r = op_fut=> {r},
_ = node_cancel_fut => {
Err(mgmt_api::Error::Cancelled)
}}
}
},
is_fatal,
warn_threshold,
max_retries,
&format!(
"Call to node {} ({}:{}) management API",
self.id, self.listen_http_addr, self.listen_http_port
),
cancel,
)
.await
}
}

impl std::fmt::Display for Node {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} ({})", self.id, self.listen_http_addr)
}
}

impl std::fmt::Debug for Node {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} ({})", self.id, self.listen_http_addr)
}
}

1 comment on commit d5a6a2a

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2570 tests run: 2436 passed, 0 failed, 134 skipped (full report)


Flaky tests (2)

Postgres 16

  • test_timeline_size_quota_on_startup: release

Postgres 14

  • test_compute_pageserver_connection_stress: release

Code coverage* (full report)

  • functions: 28.7% (7018 of 24415 functions)
  • lines: 47.5% (43371 of 91240 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
d5a6a2a at 2024-03-07T18:06:06.909Z :recycle:

Please sign in to comment.