Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(Layer): metric regression with too many canceled evictions #7363

Merged
merged 10 commits into from
Apr 18, 2024
51 changes: 46 additions & 5 deletions libs/utils/src/sync/heavier_once_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,14 @@ impl<T> OnceCell<T> {
}
}

/// Like [`Guard::take_and_deinit`], but will return `None` if this OnceCell was never
/// initialized.
pub fn take_and_deinit(&mut self) -> Option<(T, InitPermit)> {
let inner = self.inner.get_mut().unwrap();
koivunej marked this conversation as resolved.
Show resolved Hide resolved

inner.take_and_deinit()
}

/// Return the number of [`Self::get_or_init`] calls waiting for initialization to complete.
pub fn initializer_count(&self) -> usize {
self.initializers.load(Ordering::Relaxed)
Expand Down Expand Up @@ -246,15 +254,23 @@ impl<'a, T> Guard<'a, T> {
/// The permit will be on a semaphore part of the new internal value, and any following
/// [`OnceCell::get_or_init`] will wait on it to complete.
pub fn take_and_deinit(mut self) -> (T, InitPermit) {
self.0
.take_and_deinit()
.expect("guard is not created unless value has been initialized")
}
}

impl<T> Inner<T> {
pub fn take_and_deinit(&mut self) -> Option<(T, InitPermit)> {
let value = self.value.take()?;

let mut swapped = Inner::default();
let sem = swapped.init_semaphore.clone();
// acquire and forget right away, moving the control over to InitPermit
sem.try_acquire().expect("we just created this").forget();
std::mem::swap(&mut *self.0, &mut swapped);
swapped
.value
.map(|v| (v, InitPermit(sem)))
.expect("guard is not created unless value has been initialized")
let permit = InitPermit(sem);
std::mem::swap(self, &mut swapped);
Some((value, permit))
}
}

Expand All @@ -263,6 +279,13 @@ impl<'a, T> Guard<'a, T> {
/// On drop, this type will return the permit.
pub struct InitPermit(Arc<tokio::sync::Semaphore>);

impl std::fmt::Debug for InitPermit {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let ptr = Arc::as_ptr(&self.0) as *const ();
f.debug_tuple("InitPermit").field(&ptr).finish()
}
}

impl Drop for InitPermit {
fn drop(&mut self) {
assert_eq!(
Expand Down Expand Up @@ -559,4 +582,22 @@ mod tests {

assert_eq!(*target.get().unwrap(), 11);
}

#[tokio::test]
async fn take_and_deinit_on_mut() {
use std::convert::Infallible;

let mut target = OnceCell::<u32>::default();
assert!(target.take_and_deinit().is_none());

target
.get_or_init(|permit| async move { Ok::<_, Infallible>((42, permit)) })
.await
.unwrap();

let again = target.take_and_deinit();
assert!(matches!(again, Some((42, _))), "{again:?}");

assert!(target.take_and_deinit().is_none());
}
}
16 changes: 12 additions & 4 deletions pageserver/src/tenant/storage_layer/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,9 +604,17 @@ enum Status {

impl Drop for LayerInner {
fn drop(&mut self) {
// if there was a pending cancellation, mark it cancelled here to balance metrics
koivunej marked this conversation as resolved.
Show resolved Hide resolved
if let Some((ResidentOrWantedEvicted::WantedEvicted(..), _)) = self.inner.take_and_deinit()
{
// eviction has already been started
LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone);

// eviction request is intentionally not honored as no one is present to wait for it
// and we could be delaying shutdown for nothing.
}

if !*self.wanted_deleted.get_mut() {
// should we try to evict if the last wish was for eviction? seems more like a hazard
// than a clear win.
return;
}

Expand Down Expand Up @@ -1552,8 +1560,8 @@ impl Drop for DownloadedLayer {
if let Some(owner) = self.owner.upgrade() {
owner.on_downloaded_layer_drop(self.version);
} else {
// no need to do anything, we are shutting down
LAYER_IMPL_METRICS.inc_eviction_cancelled(EvictionCancelled::LayerGone);
// Layer::drop will handle cancelling the eviction; because of drop order and
// `DownloadedLayer` never leaking, we cannot know here if eviction was requested.
}
}
}
Expand Down
97 changes: 97 additions & 0 deletions pageserver/src/tenant/storage_layer/layer/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,103 @@ async fn evict_and_wait_does_not_wait_for_download() {
layer.evict_and_wait(FOREVER).await.unwrap();
}

/// Asserts that there is no miscalculation when Layer is dropped while it is being kept resident,
/// which is the last value.
///
/// Also checks that the same does not happen on a non-evicted layer (regression test).
#[tokio::test(start_paused = true)]
async fn eviction_cancellation_on_drop() {
use crate::repository::Value;
use bytes::Bytes;

// this is the runtime on which Layer spawns the blocking tasks on
let handle = tokio::runtime::Handle::current();

let h = TenantHarness::create("eviction_cancellation_on_drop").unwrap();
utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
let (tenant, ctx) = h.load().await;

let timeline = tenant
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
.await
.unwrap();

{
// create_test_timeline wrote us one layer, write another
let mut writer = timeline.writer().await;
writer
.put(
Key::from_i128(5),
Lsn(0x20),
&Value::Image(Bytes::from_static(b"this does not matter either")),
&ctx,
)
.await
.unwrap();

writer.finish_write(Lsn(0x20));
}

timeline.freeze_and_flush().await.unwrap();

// wait for the upload to complete so our Arc::strong_count assertion holds
timeline
.remote_client
.as_ref()
.unwrap()
.wait_completion()
.await
.unwrap();

let (evicted_layer, not_evicted) = {
let mut layers = {
let mut guard = timeline.layers.write().await;
let layers = guard.likely_resident_layers().collect::<Vec<_>>();
// remove the layers from layermap
guard.finish_gc_timeline(&layers);

layers
};

assert_eq!(layers.len(), 2);

(layers.pop().unwrap(), layers.pop().unwrap())
};

let victims = [(evicted_layer, true), (not_evicted, false)];

for (victim, evict) in victims {
let resident = victim.keep_resident().await.unwrap();
drop(victim);

assert_eq!(Arc::strong_count(&resident.owner.0), 1);

if evict {
let evict_and_wait = resident.owner.evict_and_wait(FOREVER);

// drive the future to await on the status channel, and then drop it
tokio::time::timeout(ADVANCE, evict_and_wait)
.await
.expect_err("should had been a timeout since we are holding the layer resident");
}

// 1 == we only evict one of the layers
assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());

drop(resident);

// run any spawned
tokio::time::sleep(ADVANCE).await;

SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;

assert_eq!(
1,
LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::LayerGone].get()
);
}
}

#[test]
fn layer_size() {
assert_eq!(std::mem::size_of::<LayerAccessStats>(), 2040);
Expand Down
Loading