Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pageserver: circuit breakers on pathological amplification, repeated compaction failures #6734

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions control_plane/src/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,11 @@ impl PageServerNode {
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'lazy_slru_download' as bool")?,
enforce_circuit_breakers: settings
.remove("enforce_circuit_breakers")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'enforce_circuit_breakers' as bool")?,
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")
Expand Down Expand Up @@ -505,6 +510,11 @@ impl PageServerNode {
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'lazy_slru_download' as bool")?,
enforce_circuit_breakers: settings
.remove("enforce_circuit_breakers")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'enforce_circuit_breakers' as bool")?,
}
};

Expand Down
1 change: 1 addition & 0 deletions libs/pageserver_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ pub struct TenantConfig {
pub gc_feedback: Option<bool>,
pub heatmap_period: Option<String>,
pub lazy_slru_download: Option<bool>,
pub enforce_circuit_breakers: Option<bool>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
Expand Down
90 changes: 90 additions & 0 deletions libs/utils/src/circuit_breaker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use std::time::{Duration, Instant};

/// Circuit breakers are for operations that are expensive and fallible: if they fail repeatedly,
/// we will stop attempting them for some period of time, to avoid denial-of-service from retries, and
/// to mitigate the log spam from repeated failures.
pub struct CircuitBreaker {
/// Consecutive failures since last success
fail_count: usize,

/// How many consecutive failures before we break the circuit
fail_threshold: usize,

/// If circuit is broken, when was it broken?
broken_at: Option<Instant>,

/// If set, we will auto-reset the circuit this long after it was broken. If None, broken
/// circuits stay broken forever, or until success() is called.
reset_period: Option<Duration>,

/// If this is true, no actual circuit-breaking happens. This is for overriding a circuit breaker
/// to permit something to keep running even if it would otherwise have tripped it.
short_circuit: bool,
}

impl CircuitBreaker {
pub fn new(fail_threshold: usize, reset_period: Option<Duration>) -> Self {
Self {
fail_count: 0,
fail_threshold,
broken_at: None,
reset_period,
short_circuit: false,
}
}

pub fn short_circuit() -> Self {
Self {
fail_threshold: 0,
fail_count: 0,
broken_at: None,
reset_period: None,
short_circuit: true,
}
}

pub fn fail(&mut self) {
if self.short_circuit {
return;
}

self.fail_count += 1;
if self.broken_at.is_none() && self.fail_count >= self.fail_threshold {
self.break_circuit();
}
}

/// Call this after successfully executing an operation
pub fn success(&mut self) {
self.fail_count = 0;
self.broken_at = None;
}

/// Call this before attempting an operation, and skip the operation if we are currently broken.
pub fn is_broken(&mut self) -> bool {
if self.short_circuit {
return false;
}

if let Some(broken_at) = self.broken_at {
match self.reset_period {
Some(reset_period) if broken_at.elapsed() > reset_period => {
self.reset_circuit();
false
}
_ => true,
}
} else {
false
}
}

fn break_circuit(&mut self) {
self.broken_at = Some(Instant::now())
}

fn reset_circuit(&mut self) {
self.broken_at = None;
self.fail_count = 0;
}
}
2 changes: 2 additions & 0 deletions libs/utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ pub mod failpoint_support;

pub mod yielding_loop;

pub mod circuit_breaker;

/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
///
/// we have several cases:
Expand Down
81 changes: 79 additions & 2 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::backoff;
use utils::circuit_breaker::CircuitBreaker;
use utils::completion;
use utils::crashsafe::path_with_suffix_extension;
use utils::failpoint_support;
Expand Down Expand Up @@ -293,6 +294,10 @@ pub struct Tenant {

eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,

/// Track repeated failures to compact, so that we can back off.
/// Overhead of mutex is acceptable because compaction is done with a multi-second period.
compaction_circuit_breaker: std::sync::Mutex<CircuitBreaker>,

/// If the tenant is in Activating state, notify this to encourage it
/// to proceed to Active as soon as possible, rather than waiting for lazy
/// background warmup.
Expand Down Expand Up @@ -1936,11 +1941,68 @@ impl Tenant {
timelines_to_compact
};

// Before doing any I/O work, check our circuit breaker
if self.compaction_circuit_breaker.lock().unwrap().is_broken() {
return Ok(());
}

let mut total_physical = 0;
for (timeline_id, timeline) in &timelines_to_compact {
timeline
let timeline_result = timeline
.compact(cancel, EnumSet::empty(), ctx)
.instrument(info_span!("compact_timeline", %timeline_id))
.await?;
.await;

if let Some(remote_client) = &timeline.remote_client {
total_physical += remote_client.get_remote_physical_size();
}

if timeline_result.is_err() {
self.compaction_circuit_breaker.lock().unwrap().fail();
}

timeline_result?;
}

self.compaction_circuit_breaker.lock().unwrap().success();

// Circuit breaker: if a timeline's statistics indicate a pathological storage issue, such
// as extremely high write inflation, then we will stop ingesting data for that timeline. This
// reduces the blast radius of postgres/walingest bugs that might enable one tenant to generate
// an extremely large storage size, and thereby interfere with other tenants on the same pageserver.
let synthetic_size = self.cached_synthetic_tenant_size.load(Ordering::Relaxed);
if synthetic_size > 0 {
let amplification = total_physical as f64 / synthetic_size as f64;

// We only try to evaluate amplification once synthetic size reaches some threshold, to avoid
// noisy results on very small/new tenants.
const SIZE_THRESHOLD_FOR_AMPLIFICATION_CHECK: u64 = 1000000000;

// Typical storage amplification is something like 3x-10x. 100x would be really extreme.
// 1000x is unthinkable: if we see an amplification this extreme, then something bad and
// dangerous is going on.
const PATHOLOGICAL_AMPLIFICATION_FACTOR: f64 = 1000.0;

if synthetic_size > SIZE_THRESHOLD_FOR_AMPLIFICATION_CHECK
&& amplification > PATHOLOGICAL_AMPLIFICATION_FACTOR
{
tracing::error!("Pathological storage amplification detected (synthetic size {synthetic_size}, physical size {total_physical}): shutting down ingest");
if self.get_enforce_circuit_breakers() {
for (timeline_id, timeline) in timelines_to_compact {
if tokio::time::timeout(
Duration::from_secs(5),
timeline.kill_wal_receiver(),
)
.await
.is_err()
{
tracing::error!(
"Timed out shutting down WAL intest on timeline {timeline_id}"
);
}
}
}
}
}

Ok(())
Expand Down Expand Up @@ -2593,6 +2655,16 @@ impl Tenant {
}
}

pub(crate) fn get_enforce_circuit_breakers(&self) -> bool {
let tenant_conf = self
.tenant_conf
.read()
.unwrap()
.tenant_conf
.enforce_circuit_breakers;
tenant_conf.unwrap_or(self.conf.default_tenant_conf.enforce_circuit_breakers)
}

pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
self.tenant_conf.write().unwrap().tenant_conf = new_tenant_conf;
// Don't hold self.timelines.lock() during the notifies.
Expand Down Expand Up @@ -2752,6 +2824,10 @@ impl Tenant {
cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
compaction_circuit_breaker: std::sync::Mutex::new(CircuitBreaker::new(
5,
Some(Duration::from_secs(3600)),
)),
activate_now_sem: tokio::sync::Semaphore::new(0),
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
cancel: CancellationToken::default(),
Expand Down Expand Up @@ -3962,6 +4038,7 @@ pub(crate) mod harness {
gc_feedback: Some(tenant_conf.gc_feedback),
heatmap_period: Some(tenant_conf.heatmap_period),
lazy_slru_download: Some(tenant_conf.lazy_slru_download),
enforce_circuit_breakers: Some(tenant_conf.enforce_circuit_breakers),
}
}
}
Expand Down
15 changes: 15 additions & 0 deletions pageserver/src/tenant/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub mod defaults {
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";

pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;

pub const DEFAULT_ENFORCE_CIRCUIT_BREAKERS: bool = false;
}

#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq)]
Expand Down Expand Up @@ -348,6 +350,10 @@ pub struct TenantConf {

/// If true then SLRU segments are dowloaded on demand, if false SLRU segments are included in basebackup
pub lazy_slru_download: bool,

/// If true, then the tenant will automatically shut off external APIs (e.g. wal ingest, page service) in
/// response to high failure rates that likely indicate a bug.
pub enforce_circuit_breakers: bool,
}

/// Same as TenantConf, but this struct preserves the information about
Expand Down Expand Up @@ -437,6 +443,10 @@ pub struct TenantConfOpt {
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub lazy_slru_download: Option<bool>,

#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub enforce_circuit_breakers: Option<bool>,
}

impl TenantConfOpt {
Expand Down Expand Up @@ -485,6 +495,9 @@ impl TenantConfOpt {
lazy_slru_download: self
.lazy_slru_download
.unwrap_or(global_conf.lazy_slru_download),
enforce_circuit_breakers: self
.enforce_circuit_breakers
.unwrap_or(global_conf.enforce_circuit_breakers),
}
}
}
Expand Down Expand Up @@ -524,6 +537,7 @@ impl Default for TenantConf {
gc_feedback: false,
heatmap_period: Duration::ZERO,
lazy_slru_download: false,
enforce_circuit_breakers: false,
}
}
}
Expand Down Expand Up @@ -596,6 +610,7 @@ impl From<TenantConfOpt> for models::TenantConfig {
gc_feedback: value.gc_feedback,
heatmap_period: value.heatmap_period.map(humantime),
lazy_slru_download: value.lazy_slru_download,
enforce_circuit_breakers: value.enforce_circuit_breakers,
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1655,6 +1655,17 @@ impl Timeline {
));
}

/// For terminating wal ingestion without tearing down the rest of the Timeline (i.e. reads to
/// already ingested data should still work)
pub(super) async fn kill_wal_receiver(&self) {
task_mgr::shutdown_tasks(
Some(TaskKind::WalReceiverManager),
Some(self.tenant_shard_id),
Some(self.timeline_id),
)
.await;
}

/// Initialize with an empty layer map. Used when creating a new timeline.
pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) {
let mut layers = self.layers.try_write().expect(
Expand Down
1 change: 1 addition & 0 deletions test_runner/regress/test_attach_tenant_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
"pitr_interval": "1m",
"lagging_wal_timeout": "23m",
"lazy_slru_download": True,
"enforce_circuit_breakers": True,
"max_lsn_wal_lag": 230000,
"min_resident_size_override": 23,
"trace_read_requests": True,
Expand Down
Loading