Skip to content

Commit

Permalink
pageserver: wait for InProgress slots in page service
Browse files Browse the repository at this point in the history
  • Loading branch information
jcsp committed Oct 31, 2023
1 parent eb1af2d commit a2dbce5
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 68 deletions.
80 changes: 12 additions & 68 deletions pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use async_compression::tokio::write::GzipEncoder;
use bytes::Buf;
use bytes::Bytes;
use futures::Stream;
use pageserver_api::models::TenantState;
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
Expand Down Expand Up @@ -55,16 +54,18 @@ use crate::metrics;
use crate::metrics::LIVE_CONNECTIONS_COUNT;
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant;
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::mgr;
use crate::tenant::mgr::GetTenantError;
use crate::tenant::{Tenant, Timeline};
use crate::tenant::mgr::get_active_tenant_with_timeout;
use crate::tenant::mgr::GetActiveTenantError;
use crate::tenant::Timeline;
use crate::trace::Tracer;

use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ;

const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_secs(10);

/// Read the end of a tar archive.
///
/// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
Expand Down Expand Up @@ -389,7 +390,7 @@ impl PageServerHandler {
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));

// Make request tracer if needed
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
let tenant = mgr::get_active_tenant_with_timeout(tenant_id, ACTIVE_TENANT_TIMEOUT).await?;
let mut tracer = if tenant.get_trace_read_requests() {
let connection_id = ConnectionId::generate();
let path = tenant
Expand Down Expand Up @@ -525,7 +526,7 @@ impl PageServerHandler {
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
// Create empty timeline
info!("creating new timeline");
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
let tenant = get_active_tenant_with_timeout(tenant_id, ACTIVE_TENANT_TIMEOUT).await?;
let timeline = tenant
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
.await?;
Expand Down Expand Up @@ -584,7 +585,7 @@ impl PageServerHandler {
debug_assert_current_span_has_tenant_and_timeline_id();
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));

let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
let timeline = get_active_tenant_timeline(tenant_id, timeline_id).await?;
let last_record_lsn = timeline.get_last_record_lsn();
if last_record_lsn != start_lsn {
return Err(QueryError::Other(
Expand Down Expand Up @@ -792,7 +793,7 @@ impl PageServerHandler {
let started = std::time::Instant::now();

// check that the timeline exists
let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
let timeline = get_active_tenant_timeline(tenant_id, timeline_id).await?;
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
if let Some(lsn) = lsn {
// Backup was requested at a particular LSN. Wait for it to arrive.
Expand Down Expand Up @@ -1048,7 +1049,7 @@ where
.record("timeline_id", field::display(timeline_id));

self.check_permission(Some(tenant_id))?;
let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
let timeline = get_active_tenant_timeline(tenant_id, timeline_id).await?;

let end_of_timeline = timeline.get_last_record_rlsn();

Expand Down Expand Up @@ -1232,7 +1233,7 @@ where

self.check_permission(Some(tenant_id))?;

let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
let tenant = get_active_tenant_with_timeout(tenant_id, ACTIVE_TENANT_TIMEOUT).await?;
pgb.write_message_noflush(&BeMessage::RowDescription(&[
RowDescriptor::int8_col(b"checkpoint_distance"),
RowDescriptor::int8_col(b"checkpoint_timeout"),
Expand Down Expand Up @@ -1278,21 +1279,6 @@ where
}
}

#[derive(thiserror::Error, Debug)]
enum GetActiveTenantError {
#[error(
"Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}"
)]
WaitForActiveTimeout {
latest_state: TenantState,
wait_time: Duration,
},
#[error(transparent)]
NotFound(GetTenantError),
#[error(transparent)]
WaitTenantActive(tenant::WaitToBecomeActiveError),
}

impl From<GetActiveTenantError> for QueryError {
fn from(e: GetActiveTenantError) -> Self {
match e {
Expand All @@ -1305,47 +1291,6 @@ impl From<GetActiveTenantError> for QueryError {
}
}

/// Get active tenant.
///
/// If the tenant is Loading, waits for it to become Active, for up to 30 s. That
/// ensures that queries don't fail immediately after pageserver startup, because
/// all tenants are still loading.
async fn get_active_tenant_with_timeout(
tenant_id: TenantId,
_ctx: &RequestContext, /* require get a context to support cancellation in the future */
) -> Result<Arc<Tenant>, GetActiveTenantError> {
let tenant = match mgr::get_tenant(tenant_id, false) {
Ok(tenant) => tenant,
Err(e @ GetTenantError::NotFound(_)) => return Err(GetActiveTenantError::NotFound(e)),
Err(GetTenantError::NotActive(_)) => {
unreachable!("we're calling get_tenant with active_only=false")
}
Err(GetTenantError::Broken(_)) => {
unreachable!("we're calling get_tenant with active_only=false")
}
Err(GetTenantError::MapState(_)) => {
unreachable!("TenantManager is initialized before page service starts")
}
};
let wait_time = Duration::from_secs(30);
match tokio::time::timeout(wait_time, tenant.wait_to_become_active()).await {
Ok(Ok(())) => Ok(tenant),
// no .context(), the error message is good enough and some tests depend on it
Ok(Err(e)) => Err(GetActiveTenantError::WaitTenantActive(e)),
Err(_) => {
let latest_state = tenant.current_state();
if latest_state == TenantState::Active {
Ok(tenant)
} else {
Err(GetActiveTenantError::WaitForActiveTimeout {
latest_state,
wait_time,
})
}
}
}
}

#[derive(Debug, thiserror::Error)]
enum GetActiveTimelineError {
#[error(transparent)]
Expand All @@ -1367,9 +1312,8 @@ impl From<GetActiveTimelineError> for QueryError {
async fn get_active_tenant_timeline(
tenant_id: TenantId,
timeline_id: TimelineId,
ctx: &RequestContext,
) -> Result<Arc<Timeline>, GetActiveTimelineError> {
let tenant = get_active_tenant_with_timeout(tenant_id, ctx)
let tenant = get_active_tenant_with_timeout(tenant_id, ACTIVE_TENANT_TIMEOUT)
.await
.map_err(GetActiveTimelineError::Tenant)?;
let timeline = tenant
Expand Down
99 changes: 99 additions & 0 deletions pageserver/src/tenant/mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::borrow::Cow;
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::fs;

use anyhow::Context;
Expand Down Expand Up @@ -939,6 +940,104 @@ pub(crate) fn get_tenant(
}
}

#[derive(thiserror::Error, Debug)]
pub(crate) enum GetActiveTenantError {
#[error(
"Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}"
)]
WaitForActiveTimeout {
latest_state: TenantState,
wait_time: Duration,
},
#[error(transparent)]
NotFound(#[from] GetTenantError),
#[error(transparent)]
WaitTenantActive(crate::tenant::WaitToBecomeActiveError),
}

/// Get a [`Tenant`] in its active state. If the tenant_id is currently in [`TenantSlot::InProgress`]
/// state, then wait for up to `timeout`. If the [`Tenant`] is not currently in [`TenantState::Active`],
/// then wait for up to `timeout` (minus however long we waited for the slot).
pub(crate) async fn get_active_tenant_with_timeout(
tenant_id: TenantId,
mut timeout: Duration,
) -> Result<Arc<Tenant>, GetActiveTenantError> {
enum WaitFor {
Barrier(utils::completion::Barrier),
Tenant(Arc<Tenant>),
}

let wait_for = {
let locked = TENANTS.read().unwrap();
let peek_slot =
tenant_map_peek_slot(&locked, &tenant_id, true).map_err(GetTenantError::MapState)?;
match peek_slot {
Some(TenantSlot::Attached(tenant)) => {
match tenant.current_state() {
TenantState::Active => {
// Fast path: we don't need to do any async waiting.
return Ok(tenant.clone());
}
_ => WaitFor::Tenant(tenant.clone()),
}
}
Some(TenantSlot::Secondary) => {
return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive(
tenant_id,
)))
}
Some(TenantSlot::InProgress(barrier)) => WaitFor::Barrier(barrier.clone()),
None => {
return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound(
tenant_id,
)))
}
}
};

tracing::debug!("Waiting for tenant InProgress state to pass...");
let tenant = match wait_for {
WaitFor::Barrier(barrier) => {
let wait_start = Instant::now();
barrier.wait().await;
let wait_duration = Instant::now().duration_since(wait_start);
timeout -= wait_duration;
{
let locked = TENANTS.read().unwrap();
let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, true)
.map_err(GetTenantError::MapState)?;
match peek_slot {
Some(TenantSlot::Attached(tenant)) => tenant.clone(),
_ => {
return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive(
tenant_id,
)))
}
}
}
}
WaitFor::Tenant(tenant) => tenant,
};

tracing::debug!("Waiting for tenant to enter active state...");
match tokio::time::timeout(timeout, tenant.wait_to_become_active()).await {
Ok(Ok(())) => Ok(tenant),
// no .context(), the error message is good enough and some tests depend on it
Ok(Err(e)) => Err(GetActiveTenantError::WaitTenantActive(e)),
Err(_) => {
let latest_state = tenant.current_state();
if latest_state == TenantState::Active {
Ok(tenant)
} else {
Err(GetActiveTenantError::WaitForActiveTimeout {
latest_state,
wait_time: timeout,
})
}
}
}
}

pub(crate) async fn delete_tenant(
conf: &'static PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
Expand Down

0 comments on commit a2dbce5

Please sign in to comment.