Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into jcsp/storcon-log-hy…
Browse files Browse the repository at this point in the history
…giene
  • Loading branch information
jcsp committed Mar 19, 2024
2 parents 65d451e + 49be446 commit 654fc72
Show file tree
Hide file tree
Showing 72 changed files with 3,251 additions and 891 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ jobs:

- name: Pytest regression tests
uses: ./.github/actions/run-python-test-set
timeout-minutes: 60
with:
build_type: ${{ matrix.build_type }}
test_selection: regress
Expand Down
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ disallowed-methods = [
"tokio::task::block_in_place",
# Allow this for now, to deny it later once we stop using Handle::block_on completely
# "tokio::runtime::Handle::block_on",
# use tokio_epoll_uring_ext instead
"tokio_epoll_uring::thread_local_system",
]

disallowed-macros = [
Expand Down
8 changes: 5 additions & 3 deletions control_plane/attachment_service/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio_util::sync::CancellationToken;
use utils::auth::{Scope, SwappableJwtAuth};
use utils::failpoint_support::failpoints_handler;
use utils::http::endpoint::{auth_middleware, check_permission_with, request_span};
use utils::http::request::{must_get_query_param, parse_request_param};
use utils::http::request::{must_get_query_param, parse_query_param, parse_request_param};
use utils::id::{TenantId, TimelineId};

use utils::{
Expand Down Expand Up @@ -248,8 +248,10 @@ async fn handle_tenant_secondary_download(
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
service.tenant_secondary_download(tenant_id).await?;
json_response(StatusCode::OK, ())
let wait = parse_query_param(&req, "wait_ms")?.map(Duration::from_millis);

let (status, progress) = service.tenant_secondary_download(tenant_id, wait).await?;
json_response(status, progress)
}

async fn handle_tenant_delete(
Expand Down
93 changes: 76 additions & 17 deletions control_plane/attachment_service/src/reconciler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use pageserver_api::shard::{ShardIdentity, TenantShardId};
use pageserver_client::mgmt_api;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use tokio_util::sync::CancellationToken;
use utils::generation::Generation;
use utils::id::{NodeId, TimelineId};
Expand Down Expand Up @@ -258,22 +258,81 @@ impl Reconciler {
tenant_shard_id: TenantShardId,
node: &Node,
) -> Result<(), ReconcileError> {
match node
.with_client_retries(
|client| async move { client.tenant_secondary_download(tenant_shard_id).await },
&self.service_config.jwt_token,
1,
1,
Duration::from_secs(60),
&self.cancel,
)
.await
{
None => Err(ReconcileError::Cancel),
Some(Ok(_)) => Ok(()),
Some(Err(e)) => {
tracing::info!(" (skipping destination download: {})", e);
Ok(())
// This is not the timeout for a request, but the total amount of time we're willing to wait
// for a secondary location to get up to date before
const TOTAL_DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(300);

// This the long-polling interval for the secondary download requests we send to destination pageserver
// during a migration.
const REQUEST_DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(20);

let started_at = Instant::now();

loop {
let (status, progress) = match node
.with_client_retries(
|client| async move {
client
.tenant_secondary_download(
tenant_shard_id,
Some(REQUEST_DOWNLOAD_TIMEOUT),
)
.await
},
&self.service_config.jwt_token,
1,
3,
REQUEST_DOWNLOAD_TIMEOUT * 2,
&self.cancel,
)
.await
{
None => Err(ReconcileError::Cancel),
Some(Ok(v)) => Ok(v),
Some(Err(e)) => {
// Give up, but proceed: it's unfortunate if we couldn't freshen the destination before
// attaching, but we should not let an issue with a secondary location stop us proceeding
// with a live migration.
tracing::warn!("Failed to prepare by downloading layers on node {node}: {e})");
return Ok(());
}
}?;

if status == StatusCode::OK {
tracing::info!(
"Downloads to {} complete: {}/{} layers, {}/{} bytes",
node,
progress.layers_downloaded,
progress.layers_total,
progress.bytes_downloaded,
progress.bytes_total
);
return Ok(());
} else if status == StatusCode::ACCEPTED {
let total_runtime = started_at.elapsed();
if total_runtime > TOTAL_DOWNLOAD_TIMEOUT {
tracing::warn!("Timed out after {}ms downloading layers to {node}. Progress so far: {}/{} layers, {}/{} bytes",
total_runtime.as_millis(),
progress.layers_downloaded,
progress.layers_total,
progress.bytes_downloaded,
progress.bytes_total
);
// Give up, but proceed: an incompletely warmed destination doesn't prevent migration working,
// it just makes the I/O performance for users less good.
return Ok(());
}

// Log and proceed around the loop to retry. We don't sleep between requests, because our HTTP call
// to the pageserver is a long-poll.
tracing::info!(
"Downloads to {} not yet complete: {}/{} layers, {}/{} bytes",
node,
progress.layers_downloaded,
progress.layers_total,
progress.bytes_downloaded,
progress.bytes_total
);
}
}
}
Expand Down
86 changes: 63 additions & 23 deletions control_plane/attachment_service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,15 @@ use diesel::result::DatabaseErrorKind;
use futures::{stream::FuturesUnordered, StreamExt};
use hyper::StatusCode;
use pageserver_api::{
controller_api::UtilizationScore,
controller_api::{
NodeAvailability, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
TenantCreateResponse, TenantCreateResponseShard, TenantLocateResponse,
TenantShardMigrateRequest, TenantShardMigrateResponse, UtilizationScore,
},
models::{SecondaryProgress, TenantConfigRequest},
};

use pageserver_api::{
models::{
self, LocationConfig, LocationConfigListResponse, LocationConfigMode,
PageserverUtilization, ShardParameters, TenantConfig, TenantCreateRequest,
Expand All @@ -32,14 +40,6 @@ use pageserver_api::{
ValidateResponse, ValidateResponseTenant,
},
};
use pageserver_api::{
controller_api::{
NodeAvailability, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
TenantCreateResponse, TenantCreateResponseShard, TenantLocateResponse,
TenantShardMigrateRequest, TenantShardMigrateResponse,
},
models::TenantConfigRequest,
};
use pageserver_client::mgmt_api;
use tokio::sync::OwnedRwLockWriteGuard;
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -2098,7 +2098,8 @@ impl Service {
pub(crate) async fn tenant_secondary_download(
&self,
tenant_id: TenantId,
) -> Result<(), ApiError> {
wait: Option<Duration>,
) -> Result<(StatusCode, SecondaryProgress), ApiError> {
let _tenant_lock = self.tenant_op_locks.shared(tenant_id).await;

// Acquire lock and yield the collection of shard-node tuples which we will send requests onward to
Expand All @@ -2121,32 +2122,71 @@ impl Service {
targets
};

// TODO: this API, and the underlying pageserver API, should take a timeout argument so that for long running
// downloads, they can return a clean 202 response instead of the HTTP client timing out.

// Issue concurrent requests to all shards' locations
let mut futs = FuturesUnordered::new();
for (tenant_shard_id, node) in targets {
let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
futs.push(async move {
let result = client.tenant_secondary_download(tenant_shard_id).await;
(result, node)
let result = client
.tenant_secondary_download(tenant_shard_id, wait)
.await;
(result, node, tenant_shard_id)
})
}

// Handle any errors returned by pageservers. This includes cases like this request racing with
// a scheduling operation, such that the tenant shard we're calling doesn't exist on that pageserver any more, as
// well as more general cases like 503s, 500s, or timeouts.
while let Some((result, node)) = futs.next().await {
let Err(e) = result else { continue };

// Secondary downloads are always advisory: if something fails, we nevertheless report success, so that whoever
// is calling us will proceed with whatever migration they're doing, albeit with a slightly less warm cache
// than they had hoped for.
tracing::warn!("Ignoring tenant secondary download error from pageserver {node}: {e}",);
let mut aggregate_progress = SecondaryProgress::default();
let mut aggregate_status: Option<StatusCode> = None;
let mut error: Option<mgmt_api::Error> = None;
while let Some((result, node, tenant_shard_id)) = futs.next().await {
match result {
Err(e) => {
// Secondary downloads are always advisory: if something fails, we nevertheless report success, so that whoever
// is calling us will proceed with whatever migration they're doing, albeit with a slightly less warm cache
// than they had hoped for.
tracing::warn!("Secondary download error from pageserver {node}: {e}",);
error = Some(e)
}
Ok((status_code, progress)) => {
tracing::info!(%tenant_shard_id, "Shard status={status_code} progress: {progress:?}");
aggregate_progress.layers_downloaded += progress.layers_downloaded;
aggregate_progress.layers_total += progress.layers_total;
aggregate_progress.bytes_downloaded += progress.bytes_downloaded;
aggregate_progress.bytes_total += progress.bytes_total;
aggregate_progress.heatmap_mtime =
std::cmp::max(aggregate_progress.heatmap_mtime, progress.heatmap_mtime);
aggregate_status = match aggregate_status {
None => Some(status_code),
Some(StatusCode::OK) => Some(status_code),
Some(cur) => {
// Other status codes (e.g. 202) -- do not overwrite.
Some(cur)
}
};
}
}
}

Ok(())
// If any of the shards return 202, indicate our result as 202.
match aggregate_status {
None => {
match error {
Some(e) => {
// No successes, and an error: surface it
Err(ApiError::Conflict(format!("Error from pageserver: {e}")))
}
None => {
// No shards found
Err(ApiError::NotFound(
anyhow::anyhow!("Tenant {} not found", tenant_id).into(),
))
}
}
}
Some(aggregate_status) => Ok((aggregate_status, aggregate_progress)),
}
}

pub(crate) async fn tenant_delete(&self, tenant_id: TenantId) -> Result<StatusCode, ApiError> {
Expand Down
7 changes: 0 additions & 7 deletions control_plane/src/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,13 +568,6 @@ impl PageServerNode {
Ok(self.http_client.list_timelines(*tenant_shard_id).await?)
}

pub async fn tenant_secondary_download(&self, tenant_id: &TenantShardId) -> anyhow::Result<()> {
Ok(self
.http_client
.tenant_secondary_download(*tenant_id)
.await?)
}

pub async fn timeline_create(
&self,
tenant_shard_id: TenantShardId,
Expand Down

0 comments on commit 654fc72

Please sign in to comment.