Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

concurrency-limit initial logical size calculation [v2] #6000

Merged
merged 6 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 1 addition & 28 deletions pageserver/src/bin/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,15 +402,11 @@ fn start_pageserver(
let (init_remote_done_tx, init_remote_done_rx) = utils::completion::channel();
let (init_done_tx, init_done_rx) = utils::completion::channel();

let (init_logical_size_done_tx, init_logical_size_done_rx) = utils::completion::channel();

let (background_jobs_can_start, background_jobs_barrier) = utils::completion::channel();

let order = pageserver::InitializationOrder {
initial_tenant_load_remote: Some(init_done_tx),
initial_tenant_load: Some(init_remote_done_tx),
initial_logical_size_can_start: init_done_rx.clone(),
initial_logical_size_attempt: Some(init_logical_size_done_tx),
background_jobs_can_start: background_jobs_barrier.clone(),
};

Expand Down Expand Up @@ -464,34 +460,14 @@ fn start_pageserver(
});

let WaitForPhaseResult {
timeout_remaining: timeout,
timeout_remaining: _timeout,
skipped: init_load_skipped,
} = wait_for_phase("initial_tenant_load", init_load_done, timeout).await;

// initial logical sizes can now start, as they were waiting on init_done_rx.

scopeguard::ScopeGuard::into_inner(guard);

let guard = scopeguard::guard_on_success((), |_| {
tracing::info!("Cancelled before initial logical sizes completed")
});

let logical_sizes_done = std::pin::pin!(async {
init_logical_size_done_rx.wait().await;
startup_checkpoint(
started_startup_at,
"initial_logical_sizes",
"Initial logical sizes completed",
);
});

let WaitForPhaseResult {
timeout_remaining: _,
skipped: logical_sizes_skipped,
} = wait_for_phase("initial_logical_sizes", logical_sizes_done, timeout).await;

scopeguard::ScopeGuard::into_inner(guard);

// allow background jobs to start: we either completed prior stages, or they reached timeout
// and were skipped. It is important that we do not let them block background jobs indefinitely,
// because things like consumption metrics for billing are blocked by this barrier.
Expand All @@ -514,9 +490,6 @@ fn start_pageserver(
if let Some(f) = init_load_skipped {
f.await;
}
if let Some(f) = logical_sizes_skipped {
f.await;
}
scopeguard::ScopeGuard::into_inner(guard);

startup_checkpoint(started_startup_at, "complete", "Startup complete");
Expand Down
7 changes: 6 additions & 1 deletion pageserver/src/consumption_metrics/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,12 @@ impl TimelineSnapshot {

let current_exact_logical_size = {
let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
let size = span.in_scope(|| t.get_current_logical_size(ctx));
let size = span.in_scope(|| {
t.get_current_logical_size(
crate::tenant::timeline::GetLogicalSizePriority::Background,
ctx,
)
});
match size {
// Only send timeline logical size when it is fully calculated.
CurrentLogicalSize::Exact(ref size) => Some(size.into()),
Expand Down
3 changes: 2 additions & 1 deletion pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,8 @@ async fn build_timeline_info_common(
Lsn(0) => None,
lsn @ Lsn(_) => Some(lsn),
};
let current_logical_size = timeline.get_current_logical_size(ctx);
let current_logical_size =
timeline.get_current_logical_size(tenant::timeline::GetLogicalSizePriority::User, ctx);
let current_physical_size = Some(timeline.layer_size_sum().await);
let state = timeline.current_state();
let remote_consistent_lsn_projected = timeline
Expand Down
7 changes: 0 additions & 7 deletions pageserver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,6 @@ pub struct InitializationOrder {
/// Each initial tenant load task carries this until completion.
pub initial_tenant_load: Option<utils::completion::Completion>,

/// Barrier for when we can start initial logical size calculations.
pub initial_logical_size_can_start: utils::completion::Barrier,

/// Each timeline owns a clone of this to be consumed on the initial logical size calculation
/// attempt. It is important to drop this once the attempt has completed.
pub initial_logical_size_attempt: Option<utils::completion::Completion>,

/// Barrier for when we can start any background jobs.
///
/// This can be broken up later on, but right now there is just one class of a background job.
Expand Down
27 changes: 15 additions & 12 deletions pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,16 +407,14 @@ pub(crate) mod initial_logical_size {
use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec};
use once_cell::sync::Lazy;

use crate::task_mgr::TaskKind;

pub(crate) struct StartCalculation(IntCounterVec);
pub(crate) static START_CALCULATION: Lazy<StartCalculation> = Lazy::new(|| {
StartCalculation(
register_int_counter_vec!(
"pageserver_initial_logical_size_start_calculation",
"Incremented each time we start an initial logical size calculation attempt. \
The `task_kind` label is for the task kind that caused this attempt.",
&["attempt", "task_kind"]
The `circumstances` label provides some additional details.",
&["attempt", "circumstances"]
)
.unwrap(),
)
Expand Down Expand Up @@ -464,19 +462,24 @@ pub(crate) mod initial_logical_size {
inc_drop_calculation: Option<IntCounter>,
}

#[derive(strum_macros::IntoStaticStr)]
pub(crate) enum StartCircumstances {
EmptyInitial,
SkippedConcurrencyLimiter,
AfterBackgroundTasksRateLimit,
}

impl StartCalculation {
pub(crate) fn first(&self, causing_task_kind: Option<TaskKind>) -> OngoingCalculationGuard {
let task_kind_label: &'static str =
causing_task_kind.map(|k| k.into()).unwrap_or_default();
self.0.with_label_values(&["first", task_kind_label]);
pub(crate) fn first(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard {
let circumstances_label: &'static str = circumstances.into();
self.0.with_label_values(&["first", circumstances_label]);
OngoingCalculationGuard {
inc_drop_calculation: Some(DROP_CALCULATION.first.clone()),
}
}
pub(crate) fn retry(&self, causing_task_kind: Option<TaskKind>) -> OngoingCalculationGuard {
let task_kind_label: &'static str =
causing_task_kind.map(|k| k.into()).unwrap_or_default();
self.0.with_label_values(&["retry", task_kind_label]);
pub(crate) fn retry(&self, circumstances: StartCircumstances) -> OngoingCalculationGuard {
let circumstances_label: &'static str = circumstances.into();
self.0.with_label_values(&["retry", circumstances_label]);
OngoingCalculationGuard {
inc_drop_calculation: Some(DROP_CALCULATION.retry.clone()),
}
Expand Down
8 changes: 8 additions & 0 deletions pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ impl Timeline {
}

/// Get a list of all existing relations in given tablespace and database.
///
/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub async fn list_rels(
&self,
spcnode: Oid,
Expand Down Expand Up @@ -630,6 +634,10 @@ impl Timeline {
///
/// Only relation blocks are counted currently. That excludes metadata,
/// SLRUs, twophase files etc.
///
/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub async fn get_current_logical_size_non_incremental(
&self,
lsn: Lsn,
Expand Down
63 changes: 11 additions & 52 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,6 @@ impl Tenant {
index_part: Option<IndexPart>,
metadata: TimelineMetadata,
ancestor: Option<Arc<Timeline>>,
init_order: Option<&InitializationOrder>,
_ctx: &RequestContext,
) -> anyhow::Result<()> {
let tenant_id = self.tenant_shard_id;
Expand All @@ -482,7 +481,6 @@ impl Tenant {
&metadata,
ancestor.clone(),
resources,
init_order,
CreateTimelineCause::Load,
)?;
let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
Expand Down Expand Up @@ -683,10 +681,6 @@ impl Tenant {
// as we are no longer loading, signal completion by dropping
// the completion while we resume deletion
drop(_completion);
// do not hold to initial_logical_size_attempt as it will prevent loading from proceeding without timeout
let _ = init_order
.as_mut()
.and_then(|x| x.initial_logical_size_attempt.take());
let background_jobs_can_start =
init_order.as_ref().map(|x| &x.background_jobs_can_start);
if let Some(background) = background_jobs_can_start {
Expand All @@ -700,7 +694,6 @@ impl Tenant {
&tenant_clone,
preload,
tenants,
init_order,
&ctx,
)
.await
Expand All @@ -713,7 +706,7 @@ impl Tenant {
}
}

match tenant_clone.attach(init_order, preload, &ctx).await {
match tenant_clone.attach(preload, &ctx).await {
Ok(()) => {
info!("attach finished, activating");
tenant_clone.activate(broker_client, None, &ctx);
Expand Down Expand Up @@ -776,7 +769,6 @@ impl Tenant {
///
async fn attach(
self: &Arc<Tenant>,
init_order: Option<InitializationOrder>,
preload: Option<TenantPreload>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
Expand All @@ -789,7 +781,7 @@ impl Tenant {
None => {
// Deprecated dev mode: load from local disk state instead of remote storage
// https://github.com/neondatabase/neon/issues/5624
return self.load_local(init_order, ctx).await;
return self.load_local(ctx).await;
}
};

Expand Down Expand Up @@ -884,7 +876,6 @@ impl Tenant {
&index_part.metadata,
Some(remote_timeline_client),
self.deletion_queue_client.clone(),
None,
)
.await
.context("resume_deletion")
Expand Down Expand Up @@ -1009,10 +1000,6 @@ impl Tenant {
None
};

// we can load remote timelines during init, but they are assumed to be so rare that
// initialization order is not passed to here.
let init_order = None;

// timeline loading after attach expects to find metadata file for each metadata
save_metadata(
self.conf,
Expand All @@ -1030,7 +1017,6 @@ impl Tenant {
Some(index_part),
remote_metadata,
ancestor,
init_order,
ctx,
)
.await
Expand Down Expand Up @@ -1272,11 +1258,7 @@ impl Tenant {
/// files on disk. Used at pageserver startup.
///
/// No background tasks are started as part of this routine.
async fn load_local(
self: &Arc<Tenant>,
init_order: Option<InitializationOrder>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
async fn load_local(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
span::debug_assert_current_span_has_tenant_id();

debug!("loading tenant task");
Expand All @@ -1302,7 +1284,7 @@ impl Tenant {
// Process loadable timelines first
for (timeline_id, local_metadata) in scan.sorted_timelines_to_load {
if let Err(e) = self
.load_local_timeline(timeline_id, local_metadata, init_order.as_ref(), ctx, false)
.load_local_timeline(timeline_id, local_metadata, ctx, false)
.await
{
match e {
Expand Down Expand Up @@ -1336,13 +1318,7 @@ impl Tenant {
}
Some(local_metadata) => {
if let Err(e) = self
.load_local_timeline(
timeline_id,
local_metadata,
init_order.as_ref(),
ctx,
true,
)
.load_local_timeline(timeline_id, local_metadata, ctx, true)
.await
{
match e {
Expand Down Expand Up @@ -1370,12 +1346,11 @@ impl Tenant {
/// Subroutine of `load_tenant`, to load an individual timeline
///
/// NB: The parent is assumed to be already loaded!
#[instrument(skip(self, local_metadata, init_order, ctx))]
#[instrument(skip(self, local_metadata, ctx))]
async fn load_local_timeline(
self: &Arc<Self>,
timeline_id: TimelineId,
local_metadata: TimelineMetadata,
init_order: Option<&InitializationOrder>,
ctx: &RequestContext,
found_delete_mark: bool,
) -> Result<(), LoadLocalTimelineError> {
Expand All @@ -1392,7 +1367,6 @@ impl Tenant {
&local_metadata,
None,
self.deletion_queue_client.clone(),
init_order,
)
.await
.context("resume deletion")
Expand All @@ -1409,17 +1383,9 @@ impl Tenant {
None
};

self.timeline_init_and_sync(
timeline_id,
resources,
None,
local_metadata,
ancestor,
init_order,
ctx,
)
.await
.map_err(LoadLocalTimelineError::Load)
self.timeline_init_and_sync(timeline_id, resources, None, local_metadata, ancestor, ctx)
.await
.map_err(LoadLocalTimelineError::Load)
}

pub(crate) fn tenant_id(&self) -> TenantId {
Expand Down Expand Up @@ -2314,7 +2280,6 @@ impl Tenant {
new_metadata: &TimelineMetadata,
ancestor: Option<Arc<Timeline>>,
resources: TimelineResources,
init_order: Option<&InitializationOrder>,
cause: CreateTimelineCause,
) -> anyhow::Result<Arc<Timeline>> {
let state = match cause {
Expand All @@ -2329,9 +2294,6 @@ impl Tenant {
CreateTimelineCause::Delete => TimelineState::Stopping,
};

let initial_logical_size_can_start = init_order.map(|x| &x.initial_logical_size_can_start);
let initial_logical_size_attempt = init_order.map(|x| &x.initial_logical_size_attempt);

let pg_version = new_metadata.pg_version();

let timeline = Timeline::new(
Expand All @@ -2345,8 +2307,6 @@ impl Tenant {
Arc::clone(&self.walredo_mgr),
resources,
pg_version,
initial_logical_size_can_start.cloned(),
initial_logical_size_attempt.cloned().flatten(),
state,
self.cancel.child_token(),
);
Expand Down Expand Up @@ -3168,7 +3128,6 @@ impl Tenant {
new_metadata,
ancestor,
resources,
None,
CreateTimelineCause::Load,
)
.context("Failed to create timeline data structure")?;
Expand Down Expand Up @@ -3843,7 +3802,7 @@ pub(crate) mod harness {
match mode {
LoadMode::Local => {
tenant
.load_local(None, ctx)
.load_local(ctx)
.instrument(info_span!("try_load", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
.await?;
}
Expand All @@ -3853,7 +3812,7 @@ pub(crate) mod harness {
.instrument(info_span!("try_load_preload", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
.await?;
tenant
.attach(None, Some(preload), ctx)
.attach(Some(preload), ctx)
.instrument(info_span!("try_load", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
.await?;
}
Expand Down
Loading