Skip to content

Commit

Permalink
concurrency-limit low-priority initial logical size calculation [v2] (#…
Browse files Browse the repository at this point in the history
…6000)

Problem
-------

Before this PR, there was no concurrency limit on initial logical size
computations.

While logical size computations are lazy in theory, in practice
(production), they happen in a short timeframe after restart.

This means that on a PS with 20k tenants, we'd have up to 20k concurrent
initial logical size calculation requests.

This is self-inflicted needless overload.

This hasn't been a problem so far because the `.await` points on the
logical size calculation path never return `Pending`, hence we have a
natural concurrency limit of the number of executor threads.
But, as soon as we return `Pending` somewhere in the logical size
calculation path, other concurrent tasks get scheduled by tokio.
If these other tasks are also logical size calculations, they eventually
pound on the same bottleneck.

For example, in #5479, we want to switch the VirtualFile descriptor
cache to a `tokio::sync::RwLock`, which makes us return `Pending`, and
without measures like this patch, after PS restart, VirtualFile
descriptor cache thrashes heavily for 2 hours until all the logical size
calculations have been computed and the degree of concurrency /
concurrent VirtualFile operations is down to regular levels.
See the *Experiment* section below for details.

<!-- Experiments (see below) show that plain #5479 causes heavy
thrashing of the VirtualFile descriptor cache.
The high degree of concurrency is too much for 
In the case of #5479 the VirtualFile descriptor cache size starts
thrashing heavily.


-->

Background
----------

Before this PR, initial logical size calculation was spawned lazily on
first call to `Timeline::get_current_logical_size()`.

In practice (prod), the lazy calculation is triggered by
`WalReceiverConnectionHandler` if the timeline is active according to
storage broker, or by the first iteration of consumption metrics worker
after restart (`MetricsCollection`).

The spawns by walreceiver are high-priority because logical size is
needed by Safekeepers (via walreceiver `PageserverFeedback`) to enforce
the project logical size limit.
The spawns by metrics collection are not on the user-critical path and
hence low-priority. [^consumption_metrics_slo]

[^consumption_metrics_slo]: We can't delay metrics collection
indefintely because there are TBD internal SLOs tied to metrics
collection happening in a timeline manner
(neondatabase/cloud#7408). But let's ignore
that in this issue.

The ratio of walreceiver-initiated spawns vs
consumption-metrics-initiated spawns can be reconstructed from logs
(`spawning logical size computation from context of task kind {:?}"`).
PR #5995 and #6018 adds metrics for this.

First investigation of the ratio lead to the discovery that walreceiver
spawns 75% of init logical size computations.
That's because of two bugs:
- In Safekeepers: #5993
- In interaction between Pageservers and Safekeepers:
#5962

The safekeeper bug is likely primarily responsible but we don't have the
data yet. The metrics will hopefully provide some insights.

When assessing production-readiness of this PR, please assume that
neither of these bugs are fixed yet.


Changes In This PR
------------------

With this PR, initial logical size calculation is reworked as follows:

First, all initial logical size calculation task_mgr tasks are started
early, as part of timeline activation, and run a retry loop with long
back-off until success. This removes the lazy computation; it was
needless complexity because in practice, we compute all logical sizes
anyways, because consumption metrics collects it.

Second, within the initial logical size calculation task, each attempt
queues behind the background loop concurrency limiter semaphore. This
fixes the performance issue that we pointed out in the "Problem" section
earlier.

Third, there is a twist to queuing behind the background loop
concurrency limiter semaphore. Logical size is needed by Safekeepers
(via walreceiver `PageserverFeedback`) to enforce the project logical
size limit. However, we currently do open walreceiver connections even
before we have an exact logical size. That's bad, and I'll build on top
of this PR to fix that
(#5963). But, for the
purposes of this PR, we don't want to introduce a regression, i.e., we
don't want to provide an exact value later than before this PR. The
solution is to introduce a priority-boosting mechanism
(`GetLogicalSizePriority`), allowing callers of
`Timeline::get_current_logical_size` to specify how urgently they need
an exact value. The effect of specifying high urgency is that the
initial logical size calculation task for the timeline will skip the
concurrency limiting semaphore. This should yield effectively the same
behavior as we had before this PR with lazy spawning.

Last, the priority-boosting mechanism obsoletes the `init_order`'s grace
period for initial logical size calculations. It's a separate commit to
reduce the churn during review. We can drop that commit if people think
it's too much churn, and commit it later once we know this PR here
worked as intended.

Experiment With #5479 
---------------------

I validated this PR combined with #5479 to assess whether we're making
forward progress towards asyncification.

The setup is an `i3en.3xlarge` instance with 20k tenants, each with one
timeline that has 9 layers.
All tenants are inactive, i.e., not known to SKs nor storage broker.
This means all initial logical size calculations are spawned by
consumption metrics `MetricsCollection` task kind.
The consumption metrics worker starts requesting logical sizes at low
priority immediately after restart. This is achieved by deleting the
consumption metrics cache file on disk before starting
PS.[^consumption_metrics_cache_file]

[^consumption_metrics_cache_file] Consumption metrics worker persists
its interval across restarts to achieve persistent reporting intervals
across PS restarts; delete the state file on disk to get predictable
(and I believe worst-case in terms of concurrency during PS restart)
behavior.

Before this patch, all of these timelines would all do their initial
logical size calculation in parallel, leading to extreme thrashing in
page cache and virtual file cache.

With this patch, the virtual file cache thrashing is reduced
significantly (from 80k `open`-system-calls/second to ~500
`open`-system-calls/second during loading).


### Critique

The obvious critique with above experiment is that there's no skipping
of the semaphore, i.e., the priority-boosting aspect of this PR is not
exercised.

If even just 1% of our 20k tenants in the setup were active in
SK/storage_broker, then 200 logical size calculations would skip the
limiting semaphore immediately after restart and run concurrently.

Further critique: given the two bugs wrt timeline inactive vs active
state that were mentioned in the Background section, we could have 75%
of our 20k tenants being (falsely) active on restart.

So... (next section)

This Doesn't Make Us Ready For Async VirtualFile
------------------------------------------------

This PR is a step towards asynchronous `VirtualFile`, aka, #5479 or even
#4744.

But it doesn't yet enable us to ship #5479.

The reason is that this PR doesn't limit the amount of high-priority
logical size computations.
If there are many high-priority logical size calculations requested,
we'll fall over like we did if #5479 is applied without this PR.
And currently, at very least due to the bugs mentioned in the Background
section, we run thousands of high-priority logical size calculations on
PS startup in prod.

So, at a minimum, we need to fix these bugs.

Then we can ship #5479 and #4744, and things will likely be fine under
normal operation.

But in high-traffic situations, overload problems will still be more
likely to happen, e.g., VirtualFile cache descriptor thrashing.
The solution candidates for that are orthogonal to this PR though:
* global concurrency limiting
* per-tenant rate limiting => #5899
* load shedding
* scaling bottleneck resources (fd cache size (neondatabase/cloud#8351),
page cache size(neondatabase/cloud#8351), spread load across more PSes,
etc)

Conclusion
----------

Even with the remarks from in the previous section, we should merge this
PR because:
1. it's an improvement over the status quo (esp. if the aforementioned
bugs wrt timeline active / inactive are fixed)
2. it prepares the way for
#6010
3. it gets us close to shipping #5479 and #4744
  • Loading branch information
problame committed Dec 4, 2023
1 parent 7403d55 commit c7f1143
Show file tree
Hide file tree
Showing 16 changed files with 301 additions and 285 deletions.
29 changes: 1 addition & 28 deletions pageserver/src/bin/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,15 +402,11 @@ fn start_pageserver(
let (init_remote_done_tx, init_remote_done_rx) = utils::completion::channel();
let (init_done_tx, init_done_rx) = utils::completion::channel();

let (init_logical_size_done_tx, init_logical_size_done_rx) = utils::completion::channel();

let (background_jobs_can_start, background_jobs_barrier) = utils::completion::channel();

let order = pageserver::InitializationOrder {
initial_tenant_load_remote: Some(init_done_tx),
initial_tenant_load: Some(init_remote_done_tx),
initial_logical_size_can_start: init_done_rx.clone(),
initial_logical_size_attempt: Some(init_logical_size_done_tx),
background_jobs_can_start: background_jobs_barrier.clone(),
};

Expand Down Expand Up @@ -464,34 +460,14 @@ fn start_pageserver(
});

let WaitForPhaseResult {
timeout_remaining: timeout,
timeout_remaining: _timeout,
skipped: init_load_skipped,
} = wait_for_phase("initial_tenant_load", init_load_done, timeout).await;

// initial logical sizes can now start, as they were waiting on init_done_rx.

scopeguard::ScopeGuard::into_inner(guard);

let guard = scopeguard::guard_on_success((), |_| {
tracing::info!("Cancelled before initial logical sizes completed")
});

let logical_sizes_done = std::pin::pin!(async {
init_logical_size_done_rx.wait().await;
startup_checkpoint(
started_startup_at,
"initial_logical_sizes",
"Initial logical sizes completed",
);
});

let WaitForPhaseResult {
timeout_remaining: _,
skipped: logical_sizes_skipped,
} = wait_for_phase("initial_logical_sizes", logical_sizes_done, timeout).await;

scopeguard::ScopeGuard::into_inner(guard);

// allow background jobs to start: we either completed prior stages, or they reached timeout
// and were skipped. It is important that we do not let them block background jobs indefinitely,
// because things like consumption metrics for billing are blocked by this barrier.
Expand All @@ -514,9 +490,6 @@ fn start_pageserver(
if let Some(f) = init_load_skipped {
f.await;
}
if let Some(f) = logical_sizes_skipped {
f.await;
}
scopeguard::ScopeGuard::into_inner(guard);

startup_checkpoint(started_startup_at, "complete", "Startup complete");
Expand Down
7 changes: 6 additions & 1 deletion pageserver/src/consumption_metrics/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,12 @@ impl TimelineSnapshot {

let current_exact_logical_size = {
let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
let size = span.in_scope(|| t.get_current_logical_size(ctx));
let size = span.in_scope(|| {
t.get_current_logical_size(
crate::tenant::timeline::GetLogicalSizePriority::Background,
ctx,
)
});
match size {
// Only send timeline logical size when it is fully calculated.
CurrentLogicalSize::Exact(ref size) => Some(size.into()),
Expand Down
3 changes: 2 additions & 1 deletion pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,8 @@ async fn build_timeline_info_common(
Lsn(0) => None,
lsn @ Lsn(_) => Some(lsn),
};
let current_logical_size = timeline.get_current_logical_size(ctx);
let current_logical_size =
timeline.get_current_logical_size(tenant::timeline::GetLogicalSizePriority::User, ctx);
let current_physical_size = Some(timeline.layer_size_sum().await);
let state = timeline.current_state();
let remote_consistent_lsn_projected = timeline
Expand Down
7 changes: 0 additions & 7 deletions pageserver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,6 @@ pub struct InitializationOrder {
/// Each initial tenant load task carries this until completion.
pub initial_tenant_load: Option<utils::completion::Completion>,

/// Barrier for when we can start initial logical size calculations.
pub initial_logical_size_can_start: utils::completion::Barrier,

/// Each timeline owns a clone of this to be consumed on the initial logical size calculation
/// attempt. It is important to drop this once the attempt has completed.
pub initial_logical_size_attempt: Option<utils::completion::Completion>,

/// Barrier for when we can start any background jobs.
///
/// This can be broken up later on, but right now there is just one class of a background job.
Expand Down
27 changes: 15 additions & 12 deletions pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,16 +407,14 @@ pub(crate) mod initial_logical_size {
use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec};
use once_cell::sync::Lazy;

use crate::task_mgr::TaskKind;

pub(crate) struct StartCalculation(IntCounterVec);
pub(crate) static START_CALCULATION: Lazy<StartCalculation> = Lazy::new(|| {
StartCalculation(
register_int_counter_vec!(
"pageserver_initial_logical_size_start_calculation",
"Incremented each time we start an initial logical size calculation attempt. \
The `task_kind` label is for the task kind that caused this attempt.",
&["attempt", "task_kind"]
The `circumstances` label provides some additional details.",
&["attempt", "circumstances"]
)
.unwrap(),
)
Expand Down Expand Up @@ -464,19 +462,24 @@ pub(crate) mod initial_logical_size {
inc_drop_calculation: Option<IntCounter>,
}

#[derive(strum_macros::IntoStaticStr)]
pub(crate) enum StartCircumstances {
EmptyInitial,
SkippedConcurrencyLimiter,
AfterBackgroundTasksRateLimit,
}

impl StartCalculation {
pub(crate) fn first(&self, causing_task_kind: Option<TaskKind>) -> OngoingCalculationGuard {
let task_kind_label: &'static str =
causing_task_kind.map(|k| k.into()).unwrap_or_default();
self.0.with_label_values(&["first", task_kind_label]);
pub(crate) fn first(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard {
let circumstances_label: &'static str = circumstances.into();
self.0.with_label_values(&["first", circumstances_label]);
OngoingCalculationGuard {
inc_drop_calculation: Some(DROP_CALCULATION.first.clone()),
}
}
pub(crate) fn retry(&self, causing_task_kind: Option<TaskKind>) -> OngoingCalculationGuard {
let task_kind_label: &'static str =
causing_task_kind.map(|k| k.into()).unwrap_or_default();
self.0.with_label_values(&["retry", task_kind_label]);
pub(crate) fn retry(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard {
let circumstances_label: &'static str = circumstances.into();
self.0.with_label_values(&["retry", circumstances_label]);
OngoingCalculationGuard {
inc_drop_calculation: Some(DROP_CALCULATION.retry.clone()),
}
Expand Down
8 changes: 8 additions & 0 deletions pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ impl Timeline {
}

/// Get a list of all existing relations in given tablespace and database.
///
/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub async fn list_rels(
&self,
spcnode: Oid,
Expand Down Expand Up @@ -630,6 +634,10 @@ impl Timeline {
///
/// Only relation blocks are counted currently. That excludes metadata,
/// SLRUs, twophase files etc.
///
/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub async fn get_current_logical_size_non_incremental(
&self,
lsn: Lsn,
Expand Down
63 changes: 11 additions & 52 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,6 @@ impl Tenant {
index_part: Option<IndexPart>,
metadata: TimelineMetadata,
ancestor: Option<Arc<Timeline>>,
init_order: Option<&InitializationOrder>,
_ctx: &RequestContext,
) -> anyhow::Result<()> {
let tenant_id = self.tenant_shard_id;
Expand All @@ -482,7 +481,6 @@ impl Tenant {
&metadata,
ancestor.clone(),
resources,
init_order,
CreateTimelineCause::Load,
)?;
let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
Expand Down Expand Up @@ -683,10 +681,6 @@ impl Tenant {
// as we are no longer loading, signal completion by dropping
// the completion while we resume deletion
drop(_completion);
// do not hold to initial_logical_size_attempt as it will prevent loading from proceeding without timeout
let _ = init_order
.as_mut()
.and_then(|x| x.initial_logical_size_attempt.take());
let background_jobs_can_start =
init_order.as_ref().map(|x| &x.background_jobs_can_start);
if let Some(background) = background_jobs_can_start {
Expand All @@ -700,7 +694,6 @@ impl Tenant {
&tenant_clone,
preload,
tenants,
init_order,
&ctx,
)
.await
Expand All @@ -713,7 +706,7 @@ impl Tenant {
}
}

match tenant_clone.attach(init_order, preload, &ctx).await {
match tenant_clone.attach(preload, &ctx).await {
Ok(()) => {
info!("attach finished, activating");
tenant_clone.activate(broker_client, None, &ctx);
Expand Down Expand Up @@ -776,7 +769,6 @@ impl Tenant {
///
async fn attach(
self: &Arc<Tenant>,
init_order: Option<InitializationOrder>,
preload: Option<TenantPreload>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
Expand All @@ -789,7 +781,7 @@ impl Tenant {
None => {
// Deprecated dev mode: load from local disk state instead of remote storage
// https://github.com/neondatabase/neon/issues/5624
return self.load_local(init_order, ctx).await;
return self.load_local(ctx).await;
}
};

Expand Down Expand Up @@ -884,7 +876,6 @@ impl Tenant {
&index_part.metadata,
Some(remote_timeline_client),
self.deletion_queue_client.clone(),
None,
)
.await
.context("resume_deletion")
Expand Down Expand Up @@ -1009,10 +1000,6 @@ impl Tenant {
None
};

// we can load remote timelines during init, but they are assumed to be so rare that
// initialization order is not passed to here.
let init_order = None;

// timeline loading after attach expects to find metadata file for each metadata
save_metadata(
self.conf,
Expand All @@ -1030,7 +1017,6 @@ impl Tenant {
Some(index_part),
remote_metadata,
ancestor,
init_order,
ctx,
)
.await
Expand Down Expand Up @@ -1272,11 +1258,7 @@ impl Tenant {
/// files on disk. Used at pageserver startup.
///
/// No background tasks are started as part of this routine.
async fn load_local(
self: &Arc<Tenant>,
init_order: Option<InitializationOrder>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
async fn load_local(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
span::debug_assert_current_span_has_tenant_id();

debug!("loading tenant task");
Expand All @@ -1302,7 +1284,7 @@ impl Tenant {
// Process loadable timelines first
for (timeline_id, local_metadata) in scan.sorted_timelines_to_load {
if let Err(e) = self
.load_local_timeline(timeline_id, local_metadata, init_order.as_ref(), ctx, false)
.load_local_timeline(timeline_id, local_metadata, ctx, false)
.await
{
match e {
Expand Down Expand Up @@ -1336,13 +1318,7 @@ impl Tenant {
}
Some(local_metadata) => {
if let Err(e) = self
.load_local_timeline(
timeline_id,
local_metadata,
init_order.as_ref(),
ctx,
true,
)
.load_local_timeline(timeline_id, local_metadata, ctx, true)
.await
{
match e {
Expand Down Expand Up @@ -1370,12 +1346,11 @@ impl Tenant {
/// Subroutine of `load_tenant`, to load an individual timeline
///
/// NB: The parent is assumed to be already loaded!
#[instrument(skip(self, local_metadata, init_order, ctx))]
#[instrument(skip(self, local_metadata, ctx))]
async fn load_local_timeline(
self: &Arc<Self>,
timeline_id: TimelineId,
local_metadata: TimelineMetadata,
init_order: Option<&InitializationOrder>,
ctx: &RequestContext,
found_delete_mark: bool,
) -> Result<(), LoadLocalTimelineError> {
Expand All @@ -1392,7 +1367,6 @@ impl Tenant {
&local_metadata,
None,
self.deletion_queue_client.clone(),
init_order,
)
.await
.context("resume deletion")
Expand All @@ -1409,17 +1383,9 @@ impl Tenant {
None
};

self.timeline_init_and_sync(
timeline_id,
resources,
None,
local_metadata,
ancestor,
init_order,
ctx,
)
.await
.map_err(LoadLocalTimelineError::Load)
self.timeline_init_and_sync(timeline_id, resources, None, local_metadata, ancestor, ctx)
.await
.map_err(LoadLocalTimelineError::Load)
}

pub(crate) fn tenant_id(&self) -> TenantId {
Expand Down Expand Up @@ -2314,7 +2280,6 @@ impl Tenant {
new_metadata: &TimelineMetadata,
ancestor: Option<Arc<Timeline>>,
resources: TimelineResources,
init_order: Option<&InitializationOrder>,
cause: CreateTimelineCause,
) -> anyhow::Result<Arc<Timeline>> {
let state = match cause {
Expand All @@ -2329,9 +2294,6 @@ impl Tenant {
CreateTimelineCause::Delete => TimelineState::Stopping,
};

let initial_logical_size_can_start = init_order.map(|x| &x.initial_logical_size_can_start);
let initial_logical_size_attempt = init_order.map(|x| &x.initial_logical_size_attempt);

let pg_version = new_metadata.pg_version();

let timeline = Timeline::new(
Expand All @@ -2345,8 +2307,6 @@ impl Tenant {
Arc::clone(&self.walredo_mgr),
resources,
pg_version,
initial_logical_size_can_start.cloned(),
initial_logical_size_attempt.cloned().flatten(),
state,
self.cancel.child_token(),
);
Expand Down Expand Up @@ -3168,7 +3128,6 @@ impl Tenant {
new_metadata,
ancestor,
resources,
None,
CreateTimelineCause::Load,
)
.context("Failed to create timeline data structure")?;
Expand Down Expand Up @@ -3843,7 +3802,7 @@ pub(crate) mod harness {
match mode {
LoadMode::Local => {
tenant
.load_local(None, ctx)
.load_local(ctx)
.instrument(info_span!("try_load", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
.await?;
}
Expand All @@ -3853,7 +3812,7 @@ pub(crate) mod harness {
.instrument(info_span!("try_load_preload", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
.await?;
tenant
.attach(None, Some(preload), ctx)
.attach(Some(preload), ctx)
.instrument(info_span!("try_load", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
.await?;
}
Expand Down
Loading

1 comment on commit c7f1143

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2504 tests run: 2401 passed, 0 failed, 103 skipped (full report)


Flaky tests (5)

Postgres 16

  • test_branching_with_pgbench[cascade-1-10]: debug
  • test_pitr_gc: debug
  • test_emergency_relocate_with_branches_slow_replay[local_fs]: debug

Postgres 15

  • test_branching_with_pgbench[cascade-1-10]: debug
  • test_empty_branch_remote_storage_upload: debug

Code coverage (full report)

  • functions: 54.6% (9297 of 17022 functions)
  • lines: 82.1% (54035 of 65781 lines)

The comment gets automatically updated with the latest test results
c7f1143 at 2023-12-04T18:06:02.715Z :recycle:

Please sign in to comment.