Skip to content

Commit

Permalink
dube: timeout individual layer evictions, log progress and record met…
Browse files Browse the repository at this point in the history
…rics (#6131)

Because of bugs evictions could hang and pause disk usage eviction task.
One such bug is known and fixed #6928. Guard each layer eviction with a
modest timeout deeming timeouted evictions as failures, to be
conservative.

In addition, add logging and metrics recording on each eviction
iteration:
- log collection completed with duration and amount of layers
    - per tenant collection time is observed in a new histogram
    - per tenant layer count is observed in a new histogram
- record metric for collected, selected and evicted layer counts
- log if eviction takes more than 10s
- log eviction completion with eviction duration

Additionally remove dead code for which no dead code warnings appeared
in earlier PR.

Follow-up to: #6060.
  • Loading branch information
koivunej committed Feb 29, 2024
1 parent 502b69b commit ee93700
Show file tree
Hide file tree
Showing 8 changed files with 493 additions and 105 deletions.
145 changes: 106 additions & 39 deletions pageserver/src/disk_usage_eviction_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ use utils::{completion, id::TimelineId};

use crate::{
config::PageServerConf,
metrics::disk_usage_based_eviction::METRICS,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{
self,
mgr::TenantManager,
remote_timeline_client::LayerFileMetadata,
secondary::SecondaryTenant,
storage_layer::{AsLayerDesc, EvictionError, Layer, LayerFileName},
Timeline,
},
};

Expand Down Expand Up @@ -409,13 +409,23 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
"running disk usage based eviction due to pressure"
);

let candidates =
let (candidates, collection_time) = {
let started_at = std::time::Instant::now();
match collect_eviction_candidates(tenant_manager, eviction_order, cancel).await? {
EvictionCandidates::Cancelled => {
return Ok(IterationOutcome::Cancelled);
}
EvictionCandidates::Finished(partitioned) => partitioned,
};
EvictionCandidates::Finished(partitioned) => (partitioned, started_at.elapsed()),
}
};

METRICS.layers_collected.inc_by(candidates.len() as u64);

tracing::info!(
elapsed_ms = collection_time.as_millis(),
total_layers = candidates.len(),
"collection completed"
);

// Debug-log the list of candidates
let now = SystemTime::now();
Expand Down Expand Up @@ -446,9 +456,10 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
// the tenant's min-resident-size threshold, print a warning, and memorize the disk
// usage at that point, in 'usage_planned_min_resident_size_respecting'.

let selection = select_victims(&candidates, usage_pre);
let (evicted_amount, usage_planned) =
select_victims(&candidates, usage_pre).into_amount_and_planned();

let (evicted_amount, usage_planned) = selection.into_amount_and_planned();
METRICS.layers_selected.inc_by(evicted_amount as u64);

// phase2: evict layers

Expand Down Expand Up @@ -477,9 +488,15 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
if let Some(next) = next {
match next {
Ok(Ok(file_size)) => {
METRICS.layers_evicted.inc();
usage_assumed.add_available_bytes(file_size);
}
Ok(Err((file_size, EvictionError::NotFound | EvictionError::Downloaded))) => {
Ok(Err((
file_size,
EvictionError::NotFound
| EvictionError::Downloaded
| EvictionError::Timeout,
))) => {
evictions_failed.file_sizes += file_size;
evictions_failed.count += 1;
}
Expand All @@ -495,19 +512,26 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(

// calling again when consumed_all is fine as evicted is fused.
let Some((_partition, candidate)) = evicted.next() else {
consumed_all = true;
if !consumed_all {
tracing::info!("all evictions started, waiting");
consumed_all = true;
}
continue;
};

match candidate.layer {
EvictionLayer::Attached(layer) => {
let file_size = layer.layer_desc().file_size;
js.spawn(async move {
layer
.evict_and_wait()
.await
.map(|()| file_size)
.map_err(|e| (file_size, e))
// have a low eviction waiting timeout because our LRU calculations go stale fast;
// also individual layer evictions could hang because of bugs and we do not want to
// pause disk_usage_based_eviction for such.
let timeout = std::time::Duration::from_secs(5);

match layer.evict_and_wait(timeout).await {
Ok(()) => Ok(file_size),
Err(e) => Err((file_size, e)),
}
});
}
EvictionLayer::Secondary(layer) => {
Expand All @@ -529,6 +553,30 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
(usage_assumed, evictions_failed)
};

let started_at = std::time::Instant::now();

let evict_layers = async move {
let mut evict_layers = std::pin::pin!(evict_layers);

let maximum_expected = std::time::Duration::from_secs(10);

let res = tokio::time::timeout(maximum_expected, &mut evict_layers).await;
let tuple = if let Ok(tuple) = res {
tuple
} else {
let elapsed = started_at.elapsed();
tracing::info!(elapsed_ms = elapsed.as_millis(), "still ongoing");
evict_layers.await
};

let elapsed = started_at.elapsed();
tracing::info!(elapsed_ms = elapsed.as_millis(), "completed");
tuple
};

let evict_layers =
evict_layers.instrument(tracing::info_span!("evict_layers", layers=%evicted_amount));

let (usage_assumed, evictions_failed) = tokio::select! {
tuple = evict_layers => { tuple },
_ = cancel.cancelled() => {
Expand Down Expand Up @@ -763,6 +811,8 @@ async fn collect_eviction_candidates(
eviction_order: EvictionOrder,
cancel: &CancellationToken,
) -> anyhow::Result<EvictionCandidates> {
const LOG_DURATION_THRESHOLD: std::time::Duration = std::time::Duration::from_secs(10);

// get a snapshot of the list of tenants
let tenants = tenant::mgr::list_tenants()
.await
Expand Down Expand Up @@ -791,6 +841,8 @@ async fn collect_eviction_candidates(
continue;
}

let started_at = std::time::Instant::now();

// collect layers from all timelines in this tenant
//
// If one of the timelines becomes `!is_active()` during the iteration,
Expand All @@ -805,6 +857,7 @@ async fn collect_eviction_candidates(
}
let info = tl.get_local_layers_for_disk_usage_eviction().await;
debug!(tenant_id=%tl.tenant_shard_id.tenant_id, shard_id=%tl.tenant_shard_id.shard_slug(), timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len());

tenant_candidates.extend(info.resident_layers.into_iter());
max_layer_size = max_layer_size.max(info.max_layer_size.unwrap_or(0));

Expand Down Expand Up @@ -870,7 +923,25 @@ async fn collect_eviction_candidates(
(partition, candidate)
});

METRICS
.tenant_layer_count
.observe(tenant_candidates.len() as f64);

candidates.extend(tenant_candidates);

let elapsed = started_at.elapsed();
METRICS
.tenant_collection_time
.observe(elapsed.as_secs_f64());

if elapsed > LOG_DURATION_THRESHOLD {
tracing::info!(
tenant_id=%tenant.tenant_shard_id().tenant_id,
shard_id=%tenant.tenant_shard_id().shard_slug(),
elapsed_ms = elapsed.as_millis(),
"collection took longer than threshold"
);
}
}

// Note: the same tenant ID might be hit twice, if it transitions from attached to
Expand All @@ -885,18 +956,20 @@ async fn collect_eviction_candidates(
},
);

for secondary_tenant in secondary_tenants {
for tenant in secondary_tenants {
// for secondary tenants we use a sum of on_disk layers and already evicted layers. this is
// to prevent repeated disk usage based evictions from completely draining less often
// updating secondaries.
let (mut layer_info, total_layers) = secondary_tenant.get_layers_for_eviction();
let (mut layer_info, total_layers) = tenant.get_layers_for_eviction();

debug_assert!(
total_layers >= layer_info.resident_layers.len(),
"total_layers ({total_layers}) must be at least the resident_layers.len() ({})",
layer_info.resident_layers.len()
);

let started_at = std::time::Instant::now();

layer_info
.resident_layers
.sort_unstable_by_key(|layer_info| std::cmp::Reverse(layer_info.last_activity_ts));
Expand All @@ -918,9 +991,27 @@ async fn collect_eviction_candidates(
)
});

METRICS
.tenant_layer_count
.observe(tenant_candidates.len() as f64);
candidates.extend(tenant_candidates);

tokio::task::yield_now().await;

let elapsed = started_at.elapsed();

METRICS
.tenant_collection_time
.observe(elapsed.as_secs_f64());

if elapsed > LOG_DURATION_THRESHOLD {
tracing::info!(
tenant_id=%tenant.tenant_shard_id().tenant_id,
shard_id=%tenant.tenant_shard_id().shard_slug(),
elapsed_ms = elapsed.as_millis(),
"collection took longer than threshold"
);
}
}

debug_assert!(MinResidentSizePartition::Above < MinResidentSizePartition::Below,
Expand Down Expand Up @@ -997,30 +1088,6 @@ impl<U: Usage> VictimSelection<U> {
}
}

struct TimelineKey(Arc<Timeline>);

impl PartialEq for TimelineKey {
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.0, &other.0)
}
}

impl Eq for TimelineKey {}

impl std::hash::Hash for TimelineKey {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
Arc::as_ptr(&self.0).hash(state);
}
}

impl std::ops::Deref for TimelineKey {
type Target = Timeline;

fn deref(&self) -> &Self::Target {
self.0.as_ref()
}
}

/// A totally ordered f32 subset we can use with sorting functions.
pub(crate) mod finite_f32 {

Expand Down
59 changes: 59 additions & 0 deletions pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2474,6 +2474,64 @@ pub(crate) mod tenant_throttling {
}
}

pub(crate) mod disk_usage_based_eviction {
use super::*;

pub(crate) struct Metrics {
pub(crate) tenant_collection_time: Histogram,
pub(crate) tenant_layer_count: Histogram,
pub(crate) layers_collected: IntCounter,
pub(crate) layers_selected: IntCounter,
pub(crate) layers_evicted: IntCounter,
}

impl Default for Metrics {
fn default() -> Self {
let tenant_collection_time = register_histogram!(
"pageserver_disk_usage_based_eviction_tenant_collection_seconds",
"Time spent collecting layers from a tenant -- not normalized by collected layer amount",
vec![0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0]
)
.unwrap();

let tenant_layer_count = register_histogram!(
"pageserver_disk_usage_based_eviction_tenant_collected_layers",
"Amount of layers gathered from a tenant",
vec![5.0, 50.0, 500.0, 5000.0, 50000.0]
)
.unwrap();

let layers_collected = register_int_counter!(
"pageserver_disk_usage_based_eviction_collected_layers_total",
"Amount of layers collected"
)
.unwrap();

let layers_selected = register_int_counter!(
"pageserver_disk_usage_based_eviction_select_layers_total",
"Amount of layers selected"
)
.unwrap();

let layers_evicted = register_int_counter!(
"pageserver_disk_usage_based_eviction_evicted_layers_total",
"Amount of layers successfully evicted"
)
.unwrap();

Self {
tenant_collection_time,
tenant_layer_count,
layers_collected,
layers_selected,
layers_evicted,
}
}
}

pub(crate) static METRICS: Lazy<Metrics> = Lazy::new(Metrics::default);
}

pub fn preinitialize_metrics() {
// Python tests need these and on some we do alerting.
//
Expand Down Expand Up @@ -2508,6 +2566,7 @@ pub fn preinitialize_metrics() {
Lazy::force(&TENANT_MANAGER);

Lazy::force(&crate::tenant::storage_layer::layer::LAYER_IMPL_METRICS);
Lazy::force(&disk_usage_based_eviction::METRICS);

// countervecs
[&BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT]
Expand Down
Loading

1 comment on commit ee93700

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2525 tests run: 2391 passed, 0 failed, 134 skipped (full report)


Code coverage* (full report)

  • functions: 28.8% (6933 of 24084 functions)
  • lines: 47.4% (42573 of 89817 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
ee93700 at 2024-02-29T21:48:34.866Z :recycle:

Please sign in to comment.