Skip to content

Commit

Permalink
pageserver: simplify GetActiveTenantError & add Cancelled case
Browse files Browse the repository at this point in the history
  • Loading branch information
jcsp committed Nov 2, 2023
1 parent a972ba1 commit 19467ad
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 115 deletions.
3 changes: 1 addition & 2 deletions pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1313,8 +1313,7 @@ impl From<GetActiveTenantError> for QueryError {
GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
),
GetActiveTenantError::WaitTenantActive(e) => QueryError::Other(anyhow::Error::new(e)),
GetActiveTenantError::NotFound(e) => QueryError::Other(anyhow::Error::new(e)),
e => QueryError::Other(anyhow::anyhow!(e)),
}
}
}
Expand Down
45 changes: 7 additions & 38 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ use self::config::TenantConf;
use self::delete::DeleteTenantFlow;
use self::metadata::LoadMetadataError;
use self::metadata::TimelineMetadata;
use self::mgr::GetActiveTenantError;
use self::mgr::GetTenantError;
use self::mgr::TenantsMap;
use self::remote_timeline_client::RemoteTimelineClient;
use self::timeline::uninit::TimelineUninitMark;
Expand Down Expand Up @@ -365,34 +367,6 @@ impl Debug for SetStoppingError {
}
}

#[derive(Debug, thiserror::Error)]
pub(crate) enum WaitToBecomeActiveError {
WillNotBecomeActive {
tenant_id: TenantId,
state: TenantState,
},
TenantDropped {
tenant_id: TenantId,
},
}

impl std::fmt::Display for WaitToBecomeActiveError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WaitToBecomeActiveError::WillNotBecomeActive { tenant_id, state } => {
write!(
f,
"Tenant {} will not become active. Current state: {:?}",
tenant_id, state
)
}
WaitToBecomeActiveError::TenantDropped { tenant_id } => {
write!(f, "Tenant {tenant_id} will not become active (dropped)")
}
}
}
}

#[derive(thiserror::Error, Debug)]
pub enum CreateTimelineError {
#[error("a timeline with the given ID already exists")]
Expand Down Expand Up @@ -2028,30 +2002,25 @@ impl Tenant {
self.state.subscribe()
}

pub(crate) async fn wait_to_become_active(&self) -> Result<(), WaitToBecomeActiveError> {
pub(crate) async fn wait_to_become_active(&self) -> Result<(), GetActiveTenantError> {
let mut receiver = self.state.subscribe();
loop {
let current_state = receiver.borrow_and_update().clone();
match current_state {
TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => {
// in these states, there's a chance that we can reach ::Active
receiver.changed().await.map_err(
|_e: tokio::sync::watch::error::RecvError| {
WaitToBecomeActiveError::TenantDropped {
tenant_id: self.tenant_id,
}
},
|_e: tokio::sync::watch::error::RecvError|
// Tenant existed but was dropped: report it as non-existent
GetActiveTenantError::NotFound(GetTenantError::NotFound(self.tenant_id))
)?;
}
TenantState::Active { .. } => {
return Ok(());
}
TenantState::Broken { .. } | TenantState::Stopping { .. } => {
// There's no chance the tenant can transition back into ::Active
return Err(WaitToBecomeActiveError::WillNotBecomeActive {
tenant_id: self.tenant_id,
state: current_state,
});
return Err(GetActiveTenantError::WillNotBecomeActive(current_state));
}
}
}
Expand Down
34 changes: 18 additions & 16 deletions pageserver/src/tenant/mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -915,17 +915,27 @@ pub(crate) fn get_tenant(

#[derive(thiserror::Error, Debug)]
pub(crate) enum GetActiveTenantError {
/// We may time out either while TenantSlot is InProgress, or while the Tenant
/// is in a non-Active state
#[error(
"Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}"
)]
WaitForActiveTimeout {
latest_state: TenantState,
latest_state: Option<TenantState>,
wait_time: Duration,
},

/// The TenantSlot is absent, or in secondary mode
#[error(transparent)]
NotFound(#[from] GetTenantError),
#[error(transparent)]
WaitTenantActive(crate::tenant::WaitToBecomeActiveError),

/// Cancellation token fired while we were waiting
#[error("cancelled")]
Cancelled,

/// Tenant exists, but is in a state that cannot become active (e.g. Stopping, Broken)
#[error("will not become active. Current state: {0}")]
WillNotBecomeActive(TenantState),
}

enum TimeoutCancellableError {
Expand Down Expand Up @@ -1004,13 +1014,11 @@ pub(crate) async fn get_active_tenant_with_timeout(
.map_err(|e| match e {
TimeoutCancellableError::Timeout => {
GetActiveTenantError::WaitForActiveTimeout {
latest_state: TenantState::Loading,
latest_state: None,
wait_time: wait_start.elapsed(),
}
}
TimeoutCancellableError::Cancelled => {
GetActiveTenantError::NotFound(GetTenantError::NotFound(tenant_id))
}
TimeoutCancellableError::Cancelled => GetActiveTenantError::Cancelled,
})?;
let wait_duration = Instant::now().duration_since(wait_start);
timeout -= wait_duration;
Expand All @@ -1034,25 +1042,19 @@ pub(crate) async fn get_active_tenant_with_timeout(
tracing::debug!("Waiting for tenant to enter active state...");
match timeout_cancellable(timeout, tenant.wait_to_become_active(), cancel).await {
Ok(Ok(())) => Ok(tenant),
// no .context(), the error message is good enough and some tests depend on it
Ok(Err(e)) => Err(GetActiveTenantError::WaitTenantActive(e)),
Ok(Err(e)) => Err(e),
Err(TimeoutCancellableError::Timeout) => {
let latest_state = tenant.current_state();
if latest_state == TenantState::Active {
Ok(tenant)
} else {
Err(GetActiveTenantError::WaitForActiveTimeout {
latest_state,
latest_state: Some(latest_state),
wait_time: timeout,
})
}
}
Err(TimeoutCancellableError::Cancelled) => {
Err(GetActiveTenantError::WaitForActiveTimeout {
latest_state: TenantState::Loading,
wait_time: timeout,
})
}
Err(TimeoutCancellableError::Cancelled) => Err(GetActiveTenantError::Cancelled),
}
}

Expand Down
78 changes: 19 additions & 59 deletions test_runner/regress/test_tenant_detach.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@
from fixtures.utils import query_scalar, wait_until
from prometheus_client.samples import Sample

# In tests that overlap endpoint activity with tenant attach/detach, there are
# a variety of warnings that the page service may emit when it cannot acquire
# an active tenant to serve a request
PERMIT_PAGE_SERVICE_ERRORS = [
".*page_service.*Tenant .* not found",
".*page_service.*Tenant .* is not active",
".*page_service.*cancelled",
".*page_service.*will not become active.*",
]


def do_gc_target(
pageserver_http: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId
Expand Down Expand Up @@ -60,12 +70,7 @@ def test_tenant_reattach(
# create new nenant
tenant_id, timeline_id = env.neon_cli.create_tenant()

# Attempts to connect from compute to pageserver while the tenant is
# temporarily detached produces these errors in the pageserver log.
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
env.pageserver.allowed_errors.append(
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
)
env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS)

with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
with endpoint.cursor() as cur:
Expand Down Expand Up @@ -235,10 +240,7 @@ async def reattach_while_busy(

# Attempts to connect from compute to pageserver while the tenant is
# temporarily detached produces these errors in the pageserver log.
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
env.pageserver.allowed_errors.append(
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
)
env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS)

endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)

Expand All @@ -259,7 +261,7 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()

env.pageserver.allowed_errors.append(".*NotFound: Tenant .*")
env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS)

# first check for non existing tenant
tenant_id = TenantId.generate()
Expand All @@ -271,19 +273,9 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):

assert excinfo.value.status_code == 404

# the error will be printed to the log too
env.pageserver.allowed_errors.append(".*NotFound: tenant *")

# create new nenant
tenant_id, timeline_id = env.neon_cli.create_tenant()

# Attempts to connect from compute to pageserver while the tenant is
# temporarily detached produces these errors in the pageserver log.
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
env.pageserver.allowed_errors.append(
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
)

# assert tenant exists on disk
assert env.pageserver.tenant_dir(tenant_id).exists()

Expand Down Expand Up @@ -345,12 +337,7 @@ def test_tenant_detach_ignored_tenant(neon_simple_env: NeonEnv):
# create a new tenant
tenant_id, _ = env.neon_cli.create_tenant()

# Attempts to connect from compute to pageserver while the tenant is
# temporarily detached produces these errors in the pageserver log.
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
env.pageserver.allowed_errors.append(
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
)
env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS)

# assert tenant exists on disk
assert env.pageserver.tenant_dir(tenant_id).exists()
Expand Down Expand Up @@ -401,12 +388,7 @@ def test_tenant_detach_regular_tenant(neon_simple_env: NeonEnv):
# create a new tenant
tenant_id, _ = env.neon_cli.create_tenant()

# Attempts to connect from compute to pageserver while the tenant is
# temporarily detached produces these errors in the pageserver log.
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
env.pageserver.allowed_errors.append(
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
)
env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS)

# assert tenant exists on disk
assert env.pageserver.tenant_dir(tenant_id).exists()
Expand Down Expand Up @@ -453,12 +435,7 @@ def test_detach_while_attaching(
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline

# Attempts to connect from compute to pageserver while the tenant is
# temporarily detached produces these errors in the pageserver log.
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
env.pageserver.allowed_errors.append(
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
)
env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS)

# Create table, and insert some rows. Make it big enough that it doesn't fit in
# shared_buffers, otherwise the SELECT after restart will just return answer
Expand Down Expand Up @@ -593,12 +570,7 @@ def test_ignored_tenant_download_missing_layers(neon_env_builder: NeonEnvBuilder
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline

# Attempts to connect from compute to pageserver while the tenant is
# temporarily detached produces these errors in the pageserver log.
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
env.pageserver.allowed_errors.append(
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
)
env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS)

data_id = 1
data_secret = "very secret secret"
Expand Down Expand Up @@ -649,12 +621,7 @@ def test_load_attach_negatives(neon_env_builder: NeonEnvBuilder):

tenant_id = env.initial_tenant

# Attempts to connect from compute to pageserver while the tenant is
# temporarily detached produces these errors in the pageserver log.
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
env.pageserver.allowed_errors.append(
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
)
env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS)

env.pageserver.allowed_errors.append(".*tenant .*? already exists, state:.*")
with pytest.raises(
Expand Down Expand Up @@ -693,14 +660,7 @@ def test_ignore_while_attaching(
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline

# Attempts to connect from compute to pageserver while the tenant is
# temporarily detached produces these errors in the pageserver log.
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
env.pageserver.allowed_errors.append(
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
)
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found")
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} is not active")
env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS)

data_id = 1
data_secret = "very secret secret"
Expand Down

0 comments on commit 19467ad

Please sign in to comment.