Skip to content

Commit

Permalink
fix: on-demand downloads can outlive timeline shutdown (#7051)
Browse files Browse the repository at this point in the history
## Problem

Before this PR, it was possible that on-demand downloads were started
after `Timeline::shutdown()`.

For example, we have observed a walreceiver-connection-handler-initiated
on-demand download that was started after `Timeline::shutdown()`s final
`task_mgr::shutdown_tasks()` call.

The underlying issue is that `task_mgr::shutdown_tasks()` isn't sticky,
i.e., new tasks can be spawned during or after
`task_mgr::shutdown_tasks()`.

Cc: #4175 in lieu of a more
specific issue for task_mgr. We already decided we want to get rid of it
anyways.

Original investigation:
https://neondb.slack.com/archives/C033RQ5SPDH/p1709824952465949

## Changes

- enter gate while downloading
- use timeline cancellation token for cancelling download

thereby, fixes #7054

Entering the gate might also remove recent "kept the gate from closing"
in staging.
  • Loading branch information
koivunej authored Mar 9, 2024
1 parent 74d2458 commit b09d686
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 31 deletions.
26 changes: 16 additions & 10 deletions libs/remote_storage/tests/test_real_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use remote_storage::{
};
use test_context::test_context;
use test_context::AsyncTestContext;
use tokio::io::AsyncBufReadExt;
use tokio_util::sync::CancellationToken;
use tracing::info;

Expand Down Expand Up @@ -484,32 +485,33 @@ async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) {
))
.unwrap();

let len = upload_large_enough_file(&ctx.client, &path, &cancel).await;
let file_len = upload_large_enough_file(&ctx.client, &path, &cancel).await;

{
let mut stream = ctx
let stream = ctx
.client
.download(&path, &cancel)
.await
.expect("download succeeds")
.download_stream;

let first = stream
.next()
.await
.expect("should have the first blob")
.expect("should have succeeded");
let mut reader = std::pin::pin!(tokio_util::io::StreamReader::new(stream));

let first = reader.fill_buf().await.expect("should have the first blob");

tracing::info!(len = first.len(), "downloaded first chunk");
let len = first.len();
tracing::info!(len, "downloaded first chunk");

assert!(
first.len() < len,
first.len() < file_len,
"uploaded file is too small, we downloaded all on first chunk"
);

reader.consume(len);

cancel.cancel();

let next = stream.next().await.expect("stream should have more");
let next = reader.fill_buf().await;

let e = next.expect_err("expected an error, but got a chunk?");

Expand All @@ -520,6 +522,10 @@ async fn download_is_cancelled(ctx: &mut MaybeEnabledStorage) {
.is_some_and(|e| matches!(e, DownloadError::Cancelled)),
"{inner:?}"
);

let e = DownloadError::from(e);

assert!(matches!(e, DownloadError::Cancelled), "{e:?}");
}

let cancel = CancellationToken::new();
Expand Down
3 changes: 0 additions & 3 deletions pageserver/src/task_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,6 @@ pub enum TaskKind {
// Task that uploads a file to remote storage
RemoteUploadTask,

// Task that downloads a file from remote storage
RemoteDownloadTask,

// task that handles the initial downloading of all tenants
InitialLoad,

Expand Down
27 changes: 10 additions & 17 deletions pageserver/src/tenant/storage_layer/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,23 +880,18 @@ impl LayerInner {
) -> Result<heavier_once_cell::InitPermit, DownloadError> {
debug_assert_current_span_has_tenant_and_timeline_id();

let task_name = format!("download layer {}", self);

let (tx, rx) = tokio::sync::oneshot::channel();

// this is sadly needed because of task_mgr::shutdown_tasks, otherwise we cannot
// block tenant::mgr::remove_tenant_from_memory.

let this: Arc<Self> = self.clone();

crate::task_mgr::spawn(
&tokio::runtime::Handle::current(),
crate::task_mgr::TaskKind::RemoteDownloadTask,
Some(self.desc.tenant_shard_id),
Some(self.desc.timeline_id),
&task_name,
false,
async move {
let guard = timeline
.gate
.enter()
.map_err(|_| DownloadError::DownloadCancelled)?;

tokio::task::spawn(async move {

let _guard = guard;

let client = timeline
.remote_client
Expand All @@ -906,7 +901,7 @@ impl LayerInner {
let result = client.download_layer_file(
&this.desc.filename(),
&this.metadata(),
&crate::task_mgr::shutdown_token()
&timeline.cancel
)
.await;

Expand All @@ -929,7 +924,6 @@ impl LayerInner {

tokio::select! {
_ = tokio::time::sleep(backoff) => {},
_ = crate::task_mgr::shutdown_token().cancelled_owned() => {},
_ = timeline.cancel.cancelled() => {},
};

Expand Down Expand Up @@ -959,11 +953,10 @@ impl LayerInner {
}
}
}

Ok(())
}
.in_current_span(),
);

match rx.await {
Ok((Ok(()), permit)) => {
if let Some(reason) = self
Expand Down
2 changes: 2 additions & 0 deletions test_runner/regress/test_tenant_delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
# So by ignoring these instead of waiting for empty upload queue
# we execute more distinct code paths.
'.*stopping left-over name="remote upload".*',
# an on-demand is cancelled by shutdown
".*initial size calculation failed: downloading failed, possibly for shutdown",
]
)

Expand Down
4 changes: 3 additions & 1 deletion test_runner/regress/test_timeline_delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,9 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
# This happens when timeline remains are cleaned up during loading
".*Timeline dir entry become invalid.*",
# In one of the branches we poll for tenant to become active. Polls can generate this log message:
f".*Tenant {env.initial_tenant} is not active*",
f".*Tenant {env.initial_tenant} is not active.*",
# an on-demand is cancelled by shutdown
".*initial size calculation failed: downloading failed, possibly for shutdown",
]
)

Expand Down

1 comment on commit b09d686

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2576 tests run: 2441 passed, 0 failed, 135 skipped (full report)


Flaky tests (4)

Postgres 15

  • test_compute_pageserver_connection_stress: release
  • test_wal_restore_initdb: debug
  • test_wal_restore: debug

Postgres 14

  • test_sharding_split_smoke: debug

Code coverage* (full report)

  • functions: 28.8% (7033 of 24442 functions)
  • lines: 47.6% (43455 of 91358 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
b09d686 at 2024-03-09T14:12:08.951Z :recycle:

Please sign in to comment.