Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

throttling: exclude throttled time from basebackup (fixup of #6953) #7072

Merged
merged 1 commit into from
Mar 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion libs/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ pub mod launch_timestamp;
mod wrappers;
pub use wrappers::{CountedReader, CountedWriter};
mod hll;
pub mod metric_vec_duration;
pub use hll::{HyperLogLog, HyperLogLogVec};
#[cfg(target_os = "linux")]
pub mod more_process_metrics;
Expand Down
23 changes: 0 additions & 23 deletions libs/metrics/src/metric_vec_duration.rs

This file was deleted.

63 changes: 58 additions & 5 deletions pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use enum_map::EnumMap;
use metrics::metric_vec_duration::DurationResultObserver;
use metrics::{
register_counter_vec, register_gauge_vec, register_histogram, register_histogram_vec,
register_int_counter, register_int_counter_pair_vec, register_int_counter_vec,
Expand Down Expand Up @@ -1283,11 +1282,65 @@ pub(crate) static BASEBACKUP_QUERY_TIME: Lazy<BasebackupQueryTime> = Lazy::new(|
})
});

impl DurationResultObserver for BasebackupQueryTime {
fn observe_result<T, E>(&self, res: &Result<T, E>, duration: std::time::Duration) {
pub(crate) struct BasebackupQueryTimeOngoingRecording<'a, 'c> {
parent: &'a BasebackupQueryTime,
ctx: &'c RequestContext,
start: std::time::Instant,
}

impl BasebackupQueryTime {
pub(crate) fn start_recording<'c: 'a, 'a>(
&'a self,
ctx: &'c RequestContext,
) -> BasebackupQueryTimeOngoingRecording<'_, '_> {
let start = Instant::now();
match ctx.micros_spent_throttled.open() {
Ok(()) => (),
Err(error) => {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
let mut rate_limit = LOGGED.lock().unwrap();
rate_limit.call(|| {
warn!(error, "error opening micros_spent_throttled; this message is logged at a global rate limit");
});
}
}
BasebackupQueryTimeOngoingRecording {
parent: self,
ctx,
start,
}
}
}

impl<'a, 'c> BasebackupQueryTimeOngoingRecording<'a, 'c> {
pub(crate) fn observe<T, E>(self, res: &Result<T, E>) {
let elapsed = self.start.elapsed();
let ex_throttled = self
.ctx
.micros_spent_throttled
.close_and_checked_sub_from(elapsed);
let ex_throttled = match ex_throttled {
Ok(ex_throttled) => ex_throttled,
Err(error) => {
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
let mut rate_limit = LOGGED.lock().unwrap();
rate_limit.call(|| {
warn!(error, "error deducting time spent throttled; this message is logged at a global rate limit");
});
elapsed
}
};
let label_value = if res.is_ok() { "ok" } else { "error" };
let metric = self.0.get_metric_with_label_values(&[label_value]).unwrap();
metric.observe(duration.as_secs_f64());
let metric = self
.parent
.0
.get_metric_with_label_values(&[label_value])
.unwrap();
metric.observe(ex_throttled.as_secs_f64());
}
}

Expand Down
50 changes: 25 additions & 25 deletions pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1199,7 +1199,7 @@ impl PageServerHandler {
prev_lsn: Option<Lsn>,
full_backup: bool,
gzip: bool,
ctx: RequestContext,
ctx: &RequestContext,
) -> Result<(), QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
Expand All @@ -1214,7 +1214,7 @@ impl PageServerHandler {
if let Some(lsn) = lsn {
// Backup was requested at a particular LSN. Wait for it to arrive.
info!("waiting for {}", lsn);
timeline.wait_lsn(lsn, &ctx).await?;
timeline.wait_lsn(lsn, ctx).await?;
timeline
.check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
.context("invalid basebackup lsn")?;
Expand All @@ -1236,7 +1236,7 @@ impl PageServerHandler {
lsn,
prev_lsn,
full_backup,
&ctx,
ctx,
)
.await?;
} else {
Expand All @@ -1257,7 +1257,7 @@ impl PageServerHandler {
lsn,
prev_lsn,
full_backup,
&ctx,
ctx,
)
.await?;
// shutdown the encoder to ensure the gzip footer is written
Expand All @@ -1269,7 +1269,7 @@ impl PageServerHandler {
lsn,
prev_lsn,
full_backup,
&ctx,
ctx,
)
.await?;
}
Expand Down Expand Up @@ -1449,25 +1449,25 @@ where
false
};

::metrics::metric_vec_duration::observe_async_block_duration_by_result(
&*metrics::BASEBACKUP_QUERY_TIME,
async move {
self.handle_basebackup_request(
pgb,
tenant_id,
timeline_id,
lsn,
None,
false,
gzip,
ctx,
)
.await?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
Result::<(), QueryError>::Ok(())
},
)
.await?;
let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording(&ctx);
let res = async {
self.handle_basebackup_request(
pgb,
tenant_id,
timeline_id,
lsn,
None,
false,
gzip,
&ctx,
)
.await?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
Result::<(), QueryError>::Ok(())
}
.await;
metric_recording.observe(&res);
res?;
}
// return pair of prev_lsn and last_lsn
else if query_string.starts_with("get_last_record_rlsn ") {
Expand Down Expand Up @@ -1563,7 +1563,7 @@ where
prev_lsn,
true,
false,
ctx,
&ctx,
)
.await?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
Expand Down