diff --git a/components/backup-stream/src/checkpoint_manager.rs b/components/backup-stream/src/checkpoint_manager.rs index 47ec34d2113..cb7538b4cb1 100644 --- a/components/backup-stream/src/checkpoint_manager.rs +++ b/components/backup-stream/src/checkpoint_manager.rs @@ -499,6 +499,161 @@ pub mod tests { r } +<<<<<<< HEAD +======= + #[derive(Clone)] + pub struct MockSink(Arc>); + + impl MockSink { + fn with_fail_once(code: RpcStatusCode) -> Self { + let mut failed = false; + let inner = MockSinkInner { + items: Vec::default(), + closed: false, + on_error: Box::new(move || { + if failed { + RpcStatusCode::OK + } else { + failed = true; + code + } + }), + }; + Self(Arc::new(Mutex::new(inner))) + } + + fn trivial() -> Self { + let inner = MockSinkInner { + items: Vec::default(), + closed: false, + on_error: Box::new(|| RpcStatusCode::OK), + }; + Self(Arc::new(Mutex::new(inner))) + } + + #[allow(clippy::unused_async)] + pub async fn fail(&self, status: RpcStatus) -> crate::errors::Result<()> { + panic!("failed in a case should never fail: {}", status); + } + } + + struct MockSinkInner { + items: Vec, + closed: bool, + on_error: Box grpcio::RpcStatusCode + Send>, + } + + impl Sink<(SubscribeFlushEventResponse, grpcio::WriteFlags)> for MockSink { + type Error = grpcio::Error; + + fn poll_ready( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Ok(()).into() + } + + fn start_send( + self: std::pin::Pin<&mut Self>, + item: (SubscribeFlushEventResponse, grpcio::WriteFlags), + ) -> Result<(), Self::Error> { + let mut guard = self.0.lock().unwrap(); + let code = (guard.on_error)(); + if code != RpcStatusCode::OK { + return Err(grpcio::Error::RpcFailure(RpcStatus::new(code))); + } + guard.items.push(item.0); + Ok(()) + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Ok(()).into() + } + + fn poll_close( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let mut guard = self.0.lock().unwrap(); + guard.closed = true; + Ok(()).into() + } + } + + fn simple_resolve_result() -> ResolveResult { + let mut region = Region::new(); + region.set_id(42); + ResolveResult { + region, + checkpoint: 42.into(), + checkpoint_type: CheckpointType::MinTs, + } + } + + #[test] + fn test_rpc_sub() { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .build() + .unwrap(); + let mut mgr = super::CheckpointManager::default(); + rt.spawn(mgr.spawn_subscription_mgr()); + + let trivial_sink = MockSink::trivial(); + rt.block_on(mgr.add_subscriber(trivial_sink.clone())) + .unwrap(); + + mgr.resolve_regions(vec![simple_resolve_result()]); + mgr.flush(); + mgr.sync_with_subs_mgr(|_| {}); + assert_eq!(trivial_sink.0.lock().unwrap().items.len(), 1); + } + + #[test] + fn test_rpc_failure() { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .build() + .unwrap(); + let mut mgr = super::CheckpointManager::default(); + rt.spawn(mgr.spawn_subscription_mgr()); + + let error_sink = MockSink::with_fail_once(RpcStatusCode::INTERNAL); + rt.block_on(mgr.add_subscriber(error_sink.clone())).unwrap(); + + mgr.resolve_regions(vec![simple_resolve_result()]); + mgr.flush(); + assert_eq!(mgr.sync_with_subs_mgr(|item| { item.subscribers.len() }), 0); + let sink = error_sink.0.lock().unwrap(); + assert_eq!(sink.items.len(), 0); + // The stream shouldn't be closed when exit by a failure. + assert_eq!(sink.closed, false); + } + + #[test] + fn test_flush() { + let mut mgr = super::CheckpointManager::default(); + mgr.do_update(region(1, 32, 8), TimeStamp::new(8)); + mgr.do_update(region(2, 34, 8), TimeStamp::new(15)); + mgr.do_update(region(2, 35, 8), TimeStamp::new(16)); + mgr.do_update(region(2, 35, 8), TimeStamp::new(14)); + let r = mgr.get_from_region(RegionIdWithVersion::new(1, 32)); + assert_matches::assert_matches!(r, GetCheckpointResult::NotFound { .. }); + + mgr.flush(); + let r = mgr.get_from_region(RegionIdWithVersion::new(1, 32)); + assert_matches::assert_matches!(r, GetCheckpointResult::Ok { checkpoint , .. } if checkpoint.into_inner() == 8); + let r = mgr.get_from_region(RegionIdWithVersion::new(2, 35)); + assert_matches::assert_matches!(r, GetCheckpointResult::Ok { checkpoint , .. } if checkpoint.into_inner() == 16); + mgr.flush(); + let r = mgr.get_from_region(RegionIdWithVersion::new(1, 32)); + assert_matches::assert_matches!(r, GetCheckpointResult::NotFound { .. }); + } + +>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541)) #[test] fn test_mgr() { let mut mgr = super::CheckpointManager::default(); diff --git a/components/backup-stream/src/router.rs b/components/backup-stream/src/router.rs index ead124c103a..86adccbc818 100644 --- a/components/backup-stream/src/router.rs +++ b/components/backup-stream/src/router.rs @@ -871,8 +871,13 @@ impl StreamTaskInfo { // copying. #[allow(clippy::map_entry)] if !w.contains_key(&key) { +<<<<<<< HEAD let path = self.temp_dir.join(key.temp_file_name()); let val = Mutex::new(DataFile::new(path, self.compression_type).await?); +======= + let path = key.temp_file_name(); + let val = Mutex::new(DataFile::new(path, &self.temp_file_pool)?); +>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541)) w.insert(key, val); } @@ -1354,7 +1359,11 @@ impl MetadataInfo { impl DataFile { /// create and open a logfile at the path. /// Note: if a file with same name exists, would truncate it. +<<<<<<< HEAD async fn new(local_path: impl AsRef, compression_type: CompressionType) -> Result { +======= + fn new(local_path: impl AsRef, files: &Arc) -> Result { +>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541)) let sha256 = Hasher::new(MessageDigest::sha256()) .map_err(|err| Error::Other(box_err!("openssl hasher failed to init: {}", err)))?; let inner = @@ -2289,9 +2298,21 @@ mod tests { let mut f = File::create(file_path.clone()).await?; f.write_all("test-data".as_bytes()).await?; +<<<<<<< HEAD let data_file = DataFile::new(file_path, CompressionType::Zstd) .await .unwrap(); +======= + let file_name = format!("{}", uuid::Uuid::new_v4()); + let file_path = Path::new(&file_name); + let tempfile = TempDir::new().unwrap(); + let cfg = make_tempfiles_cfg(tempfile.path()); + let pool = Arc::new(TempFilePool::new(cfg).unwrap()); + let mut f = pool.open_for_write(file_path).unwrap(); + f.write_all(b"test-data").await?; + f.done().await?; + let mut data_file = DataFile::new(file_path, &pool).unwrap(); +>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541)) let info = DataFileInfo::new(); let mut meta = MetadataInfo::with_capacity(1); diff --git a/components/backup-stream/src/subscription_manager.rs b/components/backup-stream/src/subscription_manager.rs index a31a43980b5..d89d9947223 100644 --- a/components/backup-stream/src/subscription_manager.rs +++ b/components/backup-stream/src/subscription_manager.rs @@ -457,7 +457,7 @@ where let now = Instant::now(); let timedout = self.wait(Duration::from_secs(30)).await; if timedout { - warn!("waiting for initial scanning done timed out, forcing progress!"; + warn!("waiting for initial scanning done timed out, forcing progress!"; "take" => ?now.saturating_elapsed(), "timedout" => %timedout); } let regions = leader_checker @@ -468,17 +468,128 @@ where // If there isn't any region observed, the `min_ts` can be used as resolved ts // safely. let rts = min_region.map(|rs| rs.checkpoint).unwrap_or(min_ts); +<<<<<<< HEAD info!("getting checkpoint"; "defined_by_region" => ?min_region); self.subs.warn_if_gap_too_huge(rts); callback(ResolvedRegions::new( rts, cps.into_iter().map(|r| (r.region, r.checkpoint)).collect(), )); +======= + if min_region + .map(|mr| mr.checkpoint_type != CheckpointType::MinTs) + .unwrap_or(false) + { + info!("getting non-trivial checkpoint"; "defined_by_region" => ?min_region); + } + callback(ResolvedRegions::new(rts, cps)); + } + ObserveOp::HighMemUsageWarning { region_id } => { + self.on_high_memory_usage(region_id); +>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541)) } } } } +<<<<<<< HEAD +======= + async fn on_observe_result( + &mut self, + region: Region, + handle: ObserveHandle, + err: Option>, + ) { + let err = match err { + None => { + self.failure_count.remove(®ion.id); + let sub = self.subs.get_subscription_of(region.id); + if let Some(mut sub) = sub { + if sub.value().handle.id == handle.id { + sub.value_mut().resolver.phase_one_done(); + } + } + return; + } + Some(err) => { + if !should_retry(&err) { + self.failure_count.remove(®ion.id); + self.subs + .deregister_region_if(®ion, |sub, _| sub.handle.id == handle.id); + return; + } + err + } + }; + + let region_id = region.id; + match self.retry_observe(region.clone(), handle).await { + Ok(has_resent_req) => { + if !has_resent_req { + self.failure_count.remove(®ion_id); + } + } + Err(e) => { + self.issue_fatal_of( + ®ion, + e.context(format_args!( + "retry encountered error, origin error is {}", + err + )), + ); + self.failure_count.remove(®ion_id); + } + } + } + + fn on_high_memory_usage(&mut self, inconsistent_region_id: u64) { + let mut lame_region = Region::new(); + lame_region.set_id(inconsistent_region_id); + let mut act_region = None; + self.subs.deregister_region_if(&lame_region, |act, _| { + act_region = Some(act.meta.clone()); + true + }); + let delay = OOM_BACKOFF_BASE + + Duration::from_secs(rand::thread_rng().gen_range(0..OOM_BACKOFF_JITTER_SECS)); + info!("log backup triggering high memory usage."; + "region" => %inconsistent_region_id, + "mem_usage" => %self.memory_manager.used_ratio(), + "mem_max" => %self.memory_manager.capacity()); + if let Some(region) = act_region { + self.schedule_start_observe(delay, region, None); + } + } + + fn schedule_start_observe( + &self, + backoff: Duration, + region: Region, + handle: Option, + ) { + let tx = self.messenger.upgrade(); + let region_id = region.id; + if tx.is_none() { + warn!( + "log backup subscription manager: cannot upgrade self-sender, are we shutting down?" + ); + return; + } + let tx = tx.unwrap(); + // tikv_util::Instant cannot be converted to std::time::Instant :( + let start = std::time::Instant::now(); + let scheduled = async move { + tokio::time::sleep_until((start + backoff).into()).await; + let handle = handle.unwrap_or_else(|| ObserveHandle::new()); + if let Err(err) = tx.send(ObserveOp::Start { region, handle }).await { + warn!("log backup failed to schedule start observe."; "err" => %err); + } + }; + tokio::spawn(root!("scheduled_subscription"; scheduled; "after" = ?backoff, region_id)); + } + + #[instrument(skip_all, fields(id = region.id))] +>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541)) async fn refresh_resolver(&self, region: &Region) { let need_refresh_all = !self.subs.try_update_region(region); @@ -654,6 +765,17 @@ where ) { self.subs .register_region(region, handle.clone(), Some(last_checkpoint)); +<<<<<<< HEAD +======= + let feedback_channel = match self.messenger.upgrade() { + Some(ch) => ch, + None => { + warn!("log backup subscription manager is shutting down, aborting new scan."; + utils::slog_region(region), "handle" => ?handle.id); + return; + } + }; +>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541)) self.spawn_scan(ScanCmd { region: region.clone(), handle, diff --git a/components/backup/src/softlimit.rs b/components/backup/src/softlimit.rs index c3a2fc7c796..6afd1f5b2a6 100644 --- a/components/backup/src/softlimit.rs +++ b/components/backup/src/softlimit.rs @@ -38,7 +38,7 @@ impl SoftLimit { Ok(()) } - async fn grant_tokens(&self, n: usize) { + fn grant_tokens(&self, n: usize) { self.0.semaphore.add_permits(n); } @@ -53,9 +53,9 @@ impl SoftLimit { /// Grows the tasks can be executed concurrently by n #[cfg(test)] - pub async fn grow(&self, n: usize) { + pub fn grow(&self, n: usize) { self.0.cap.fetch_add(n, Ordering::SeqCst); - self.grant_tokens(n).await; + self.grant_tokens(n); } /// resize the tasks available concurrently. @@ -66,7 +66,7 @@ impl SoftLimit { self.take_tokens(current - target).await?; } CmpOrder::Less => { - self.grant_tokens(target - current).await; + self.grant_tokens(target - current); } _ => {} } @@ -304,7 +304,7 @@ mod softlimit_test { ) .await; - limit_cloned.grow(1).await; + limit_cloned.grow(1); let working_cloned = working.clone(); should_satisfy_in( Duration::from_secs(10), @@ -314,7 +314,7 @@ mod softlimit_test { .await; let working_cloned = working.clone(); - limit_cloned.grow(2).await; + limit_cloned.grow(2); should_satisfy_in( Duration::from_secs(10), "waiting for worker grow to 4", diff --git a/components/resolved_ts/src/scanner.rs b/components/resolved_ts/src/scanner.rs index 7877de718ba..010b15d9f25 100644 --- a/components/resolved_ts/src/scanner.rs +++ b/components/resolved_ts/src/scanner.rs @@ -49,9 +49,47 @@ pub struct ScanTask { pub mode: ScanMode, pub region: Region, pub checkpoint_ts: TimeStamp, +<<<<<<< HEAD pub is_cancelled: IsCancelledCallback, pub send_entries: OnEntriesCallback, pub on_error: Option, +======= + pub backoff: Option, + pub cancelled: Receiver<()>, + pub scheduler: Scheduler, +} + +impl ScanTask { + fn send_entries(&self, entries: ScanEntries, apply_index: u64) { + let task = Task::ScanLocks { + region_id: self.region.get_id(), + observe_id: self.handle.id, + entries, + apply_index, + }; + if let Err(e) = self.scheduler.schedule(task) { + warn!("resolved_ts scheduler send entries failed"; "err" => ?e); + } + } + + fn is_cancelled(&mut self) -> bool { + matches!(self.cancelled.try_recv(), Err(_) | Ok(Some(_))) + } + + fn on_error(&self, err: Error) { + if let Err(e) = self.scheduler.schedule(Task::ReRegisterRegion { + region_id: self.region.get_id(), + observe_id: self.handle.id, + cause: err, + }) { + warn!("schedule re-register task failed"; + "region_id" => self.region.get_id(), + "observe_id" => ?self.handle.id, + "error" => ?e); + } + RTS_SCAN_TASKS.with_label_values(&["abort"]).inc(); + } +>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541)) } #[derive(Debug)] @@ -171,10 +209,18 @@ impl, E: KvEngine> ScannerPool { entries.push(ScanEntry::Lock(locks)); } } +<<<<<<< HEAD +======= + task.send_entries(ScanEntries::Lock(locks), apply_index); +>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541)) } entries.push(ScanEntry::None); RTS_SCAN_DURATION_HISTOGRAM.observe(start.saturating_elapsed().as_secs_f64()); +<<<<<<< HEAD (task.send_entries)(entries, apply_index); +======= + task.send_entries(ScanEntries::None, apply_index); +>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541)) }; self.workers.spawn(fut); } diff --git a/components/resource_control/src/future.rs b/components/resource_control/src/future.rs index 8027a27b394..cb027a023d4 100644 --- a/components/resource_control/src/future.rs +++ b/components/resource_control/src/future.rs @@ -44,3 +44,292 @@ impl Future for ControlledFuture { res } } +<<<<<<< HEAD +======= + +#[cfg(not(test))] +fn get_thread_io_bytes_stats() -> Result { + file_system::get_thread_io_bytes_total() +} + +#[cfg(test)] +fn get_thread_io_bytes_stats() -> Result { + use std::cell::Cell; + + fail::fail_point!("failed_to_get_thread_io_bytes_stats", |_| { + Err("get_thread_io_bytes_total failed".into()) + }); + thread_local! { + static TOTAL_BYTES: Cell = Cell::new(IoBytes::default()); + } + + let mut new_bytes = TOTAL_BYTES.get(); + new_bytes.read += 100; + new_bytes.write += 50; + TOTAL_BYTES.set(new_bytes); + Ok(new_bytes) +} + +// `LimitedFuture` wraps a Future with ResourceLimiter, it will automically +// statistics the cpu time and io bytes consumed by the future, and do async +// waiting according the configuration of the ResourceLimiter. +#[pin_project] +pub struct LimitedFuture { + #[pin] + f: F, + // `pre_delay` and `post_delay` is used to delay this task, at any time, at most one of the two + // is valid. A future can only be polled once in one round, so we uses two field here to + // workaround this restriction of the rust compiler. + #[pin] + pre_delay: OptionalFuture>, + #[pin] + post_delay: OptionalFuture>, + resource_limiter: Arc, + // if the future is first polled, we need to let it consume a 0 value + // to compensate the debt of previously finished tasks. + is_first_poll: bool, +} + +impl LimitedFuture { + pub fn new(f: F, resource_limiter: Arc) -> Self { + Self { + f, + pre_delay: None.into(), + post_delay: None.into(), + resource_limiter, + is_first_poll: true, + } + } +} + +impl Future for LimitedFuture { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + if *this.is_first_poll { + debug_assert!(this.pre_delay.finished && this.post_delay.finished); + *this.is_first_poll = false; + let wait_dur = this + .resource_limiter + .consume(Duration::ZERO, IoBytes::default(), true) + .min(MAX_WAIT_DURATION); + if wait_dur > Duration::ZERO { + *this.pre_delay = Some( + GLOBAL_TIMER_HANDLE + .delay(std::time::Instant::now() + wait_dur) + .compat(), + ) + .into(); + } + } + if !this.post_delay.finished { + assert!(this.pre_delay.finished); + std::mem::swap(&mut *this.pre_delay, &mut *this.post_delay); + } + if !this.pre_delay.finished { + let res = this.pre_delay.poll(cx); + if res.is_pending() { + return Poll::Pending; + } + } + // get io stats is very expensive, so we only do so if only io control is + // enabled. + let mut last_io_bytes = None; + if this + .resource_limiter + .get_limiter(ResourceType::Io) + .get_rate_limit() + .is_finite() + { + match get_thread_io_bytes_stats() { + Ok(b) => { + last_io_bytes = Some(b); + } + Err(e) => { + warn!("load thread io bytes failed"; "err" => e); + } + } + } + let start = Instant::now(); + let res = this.f.poll(cx); + let dur = start.saturating_elapsed(); + let io_bytes = if let Some(last_io_bytes) = last_io_bytes { + match get_thread_io_bytes_stats() { + Ok(io_bytes) => io_bytes - last_io_bytes, + Err(e) => { + warn!("load thread io bytes failed"; "err" => e); + IoBytes::default() + } + } + } else { + IoBytes::default() + }; + let mut wait_dur = this + .resource_limiter + .consume(dur, io_bytes, res.is_pending()); + if wait_dur == Duration::ZERO || res.is_ready() { + return res; + } + if wait_dur > MAX_WAIT_DURATION { + warn!("limiter future wait too long"; "wait" => ?wait_dur, "io_read" => io_bytes.read, "io_write" => io_bytes.write, "cpu" => ?dur); + wait_dur = MAX_WAIT_DURATION; + } + *this.post_delay = Some( + GLOBAL_TIMER_HANDLE + .delay(std::time::Instant::now() + wait_dur) + .compat(), + ) + .into(); + _ = this.post_delay.poll(cx); + Poll::Pending + } +} + +/// `OptionalFuture` is similar to futures::OptionFuture, but provide an extra +/// `finished` flag to determine if the future requires poll. +#[pin_project] +struct OptionalFuture { + #[pin] + f: Option, + finished: bool, +} + +impl OptionalFuture { + fn new(f: Option) -> Self { + let finished = f.is_none(); + Self { f, finished } + } +} + +impl From> for OptionalFuture { + fn from(f: Option) -> Self { + Self::new(f) + } +} + +impl Future for OptionalFuture { + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + match this.f.as_pin_mut() { + Some(x) => x.poll(cx).map(|r| { + *this.finished = true; + Some(r) + }), + None => Poll::Ready(None), + } + } +} + +pub async fn with_resource_limiter( + f: F, + limiter: Option>, +) -> F::Output { + if let Some(limiter) = limiter { + LimitedFuture::new(f, limiter).await + } else { + f.await + } +} + +#[cfg(test)] +mod tests { + use std::sync::mpsc::{channel, Sender}; + + use tikv_util::yatp_pool::{DefaultTicker, FuturePool, YatpPoolBuilder}; + + use super::*; + use crate::resource_limiter::{GroupStatistics, ResourceType::Io}; + + #[pin_project] + struct NotifyFuture { + #[pin] + f: F, + sender: Sender<()>, + } + + impl NotifyFuture { + fn new(f: F, sender: Sender<()>) -> Self { + Self { f, sender } + } + } + + impl Future for NotifyFuture { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + this.f.poll(cx).map(|r| { + this.sender.send(()).unwrap(); + r + }) + } + } + + #[allow(clippy::unused_async)] + async fn empty() {} + + #[test] + fn test_limited_future() { + let pool = YatpPoolBuilder::new(DefaultTicker::default()) + .thread_count(1, 1, 1) + .name_prefix("test") + .build_future_pool(); + + let resource_limiter = Arc::new(ResourceLimiter::new( + "".into(), + f64::INFINITY, + 1000.0, + 0, + true, + )); + + fn spawn_and_wait(pool: &FuturePool, f: F, limiter: Arc) + where + F: Future + Send + 'static, + ::Output: Send, + { + let (sender, receiver) = channel::<()>(); + let fut = NotifyFuture::new(LimitedFuture::new(f, limiter), sender); + pool.spawn(fut).unwrap(); + receiver.recv().unwrap(); + } + + let mut i = 0; + let mut stats: GroupStatistics; + // consume the remain free limit quota. + loop { + i += 1; + spawn_and_wait(&pool, empty(), resource_limiter.clone()); + stats = resource_limiter.get_limit_statistics(Io); + assert_eq!(stats.total_consumed, i * 150); + if stats.total_wait_dur_us > 0 { + break; + } + } + + let start = Instant::now(); + spawn_and_wait(&pool, empty(), resource_limiter.clone()); + let new_stats = resource_limiter.get_limit_statistics(Io); + let delta = new_stats - stats; + let dur = start.saturating_elapsed(); + assert_eq!(delta.total_consumed, 150); + assert!(delta.total_wait_dur_us >= 140_000 && delta.total_wait_dur_us <= 160_000); + assert!(dur >= Duration::from_millis(150) && dur <= Duration::from_millis(160)); + + // fetch io bytes failed, consumed value is 0. + #[cfg(feature = "failpoints")] + { + fail::cfg("failed_to_get_thread_io_bytes_stats", "1*return").unwrap(); + spawn_and_wait(&pool, empty(), resource_limiter.clone()); + assert_eq!( + resource_limiter.get_limit_statistics(Io).total_consumed, + new_stats.total_consumed + ); + fail::remove("failed_to_get_thread_io_bytes_stats"); + } + } +} +>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541)) diff --git a/components/tikv_util/src/yatp_pool/future_pool.rs b/components/tikv_util/src/yatp_pool/future_pool.rs index f010b508aaa..4960cc291fd 100644 --- a/components/tikv_util/src/yatp_pool/future_pool.rs +++ b/components/tikv_util/src/yatp_pool/future_pool.rs @@ -13,6 +13,7 @@ use std::{ use fail::fail_point; use futures::channel::oneshot::{self, Canceled}; +use futures_util::future::FutureExt; use prometheus::{IntCounter, IntGauge}; use tracker::TrackedFuture; use yatp::{queue::Extras, task::future}; @@ -163,11 +164,13 @@ impl PoolInner { metrics_running_task_count.inc(); - let f = async move { - let _ = future.await; + // NB: Prefer FutureExt::map to async block, because an async block + // doubles memory usage. + // See https://github.com/rust-lang/rust/issues/59087 + let f = future.map(move |_| { metrics_handled_task_count.inc(); metrics_running_task_count.dec(); - }; + }); if let Some(extras) = extras { self.pool.spawn(future::TaskCell::new(f, extras)); @@ -192,12 +195,14 @@ impl PoolInner { let (tx, rx) = oneshot::channel(); metrics_running_task_count.inc(); - self.pool.spawn(async move { - let res = future.await; + // NB: Prefer FutureExt::map to async block, because an async block + // doubles memory usage. + // See https://github.com/rust-lang/rust/issues/59087 + self.pool.spawn(future.map(move |res| { metrics_handled_task_count.inc(); metrics_running_task_count.dec(); let _ = tx.send(res); - }); + })); Ok(rx) } } diff --git a/scripts/clippy b/scripts/clippy index 7685cddfeeb..2766f8ebb41 100755 --- a/scripts/clippy +++ b/scripts/clippy @@ -48,6 +48,36 @@ CLIPPY_LINTS=( -D clippy::disallowed_methods \ -D rust-2018-idioms \ -D clippy::assertions_on_result_states \ +<<<<<<< HEAD +======= + -A clippy::non_canonical_partial_ord_impl \ + -A clippy::arc_with_non_send_sync \ +) + +# TODO: Enables `clippy::needless_return_with_question_mark` after +# https://github.com/rust-lang/rust-clippy/issues/11982 is fixed. +CLIPPY_LINTS+=( + -A clippy::needless_return_with_question_mark \ +) + +# We should be pedantic about writing async code, as it's easy to write +# suboptimal or even bloat code. See: +# - https://github.com/rust-lang/rust/issues/69826 +# - https://github.com/rust-lang/rust/issues/69663 +# - https://github.com/rust-lang/rust/issues/71407 +CLIPPY_LINTS+=( + -D clippy::redundant_async_block \ + -D clippy::unused_async \ + -D clippy::manual_async_fn \ + -D clippy::large_futures \ +) + +# Allow let_underscore_future temporary due to lots of counterexamples in +# tests. +# TODO: deny it. +CLIPPY_LINTS+=( + -A clippy::let_underscore_future \ +>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541)) ) cargo clippy --workspace \ diff --git a/src/read_pool.rs b/src/read_pool.rs index f3f0c5d8250..90298423f9a 100644 --- a/src/read_pool.rs +++ b/src/read_pool.rs @@ -7,8 +7,16 @@ use std::{ }; use file_system::{set_io_type, IoType}; +<<<<<<< HEAD use futures::{channel::oneshot, future::TryFutureExt}; use kvproto::kvrpcpb::CommandPri; +======= +use futures::{ + channel::oneshot, + future::{FutureExt, TryFutureExt}, +}; +use kvproto::{errorpb, kvrpcpb::CommandPri}; +>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541)) use online_config::{ConfigChange, ConfigManager, ConfigValue, Result as CfgResult}; use prometheus::{IntCounter, IntGauge}; use resource_control::{ControlledFuture, ResourceController}; @@ -156,6 +164,7 @@ impl ReadPoolHandle { extras.set_metadata(group_meta.clone()); let task_cell = if let Some(resource_ctl) = resource_ctl { TaskCell::new( +<<<<<<< HEAD TrackedFuture::new(ControlledFuture::new( async move { f.await; @@ -163,15 +172,25 @@ impl ReadPoolHandle { }, resource_ctl.clone(), group_meta, +======= + TrackedFuture::new(with_resource_limiter( + ControlledFuture::new( + f.map(move |_| { + running_tasks.dec(); + }), + resource_ctl.clone(), + group_name, + ), + resource_limiter, +>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541)) )), extras, ) } else { TaskCell::new( - TrackedFuture::new(async move { - f.await; + TrackedFuture::new(f.map(move |_| { running_tasks.dec(); - }), + })), extras, ) }; @@ -194,10 +213,9 @@ impl ReadPoolHandle { { let (tx, rx) = oneshot::channel::(); let res = self.spawn( - async move { - let res = f.await; + f.map(move |res| { let _ = tx.send(res); - }, + }), priority, task_id, group_meta, diff --git a/src/server/status_server/mod.rs b/src/server/status_server/mod.rs index ad7779b121c..bce12fc0cad 100644 --- a/src/server/status_server/mod.rs +++ b/src/server/status_server/mod.rs @@ -129,6 +129,7 @@ where }) } +<<<<<<< HEAD fn list_heap_prof(_req: Request) -> hyper::Result> { let profiles = match list_heap_profiles() { Ok(s) => s, @@ -195,6 +196,9 @@ where #[allow(dead_code)] async fn dump_heap_prof_to_resp(req: Request) -> hyper::Result> { +======= + fn dump_heap_prof_to_resp(req: Request) -> hyper::Result> { +>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541)) let query = req.uri().query().unwrap_or(""); let query_pairs: HashMap<_, _> = url::form_urlencoded::parse(query.as_bytes()).collect(); @@ -245,7 +249,7 @@ where } } - async fn get_config( + fn get_config( req: Request, cfg_controller: &ConfigController, ) -> hyper::Result> { @@ -277,6 +281,86 @@ where }) } +<<<<<<< HEAD +======= + fn get_cmdline(_req: Request) -> hyper::Result> { + let args = args().fold(String::new(), |mut a, b| { + a.push_str(&b); + a.push('\x00'); + a + }); + let response = Response::builder() + .header("Content-Type", mime::TEXT_PLAIN.to_string()) + .header("X-Content-Type-Options", "nosniff") + .body(args.into()) + .unwrap(); + Ok(response) + } + + fn get_symbol_count(req: Request) -> hyper::Result> { + assert_eq!(req.method(), Method::GET); + // We don't know how many symbols we have, but we + // do have symbol information. pprof only cares whether + // this number is 0 (no symbols available) or > 0. + let text = "num_symbols: 1\n"; + let response = Response::builder() + .header("Content-Type", mime::TEXT_PLAIN.to_string()) + .header("X-Content-Type-Options", "nosniff") + .header("Content-Length", text.len()) + .body(text.into()) + .unwrap(); + Ok(response) + } + + // The request and response format follows pprof remote server + // https://gperftools.github.io/gperftools/pprof_remote_servers.html + // Here is the go pprof implementation: + // https://github.com/golang/go/blob/3857a89e7eb872fa22d569e70b7e076bec74ebbb/src/net/http/pprof/pprof.go#L191 + async fn get_symbol(req: Request) -> hyper::Result> { + assert_eq!(req.method(), Method::POST); + let mut text = String::new(); + let body_bytes = hyper::body::to_bytes(req.into_body()).await?; + let body = String::from_utf8(body_bytes.to_vec()).unwrap(); + + // The request body is a list of addr to be resolved joined by '+'. + // Resolve addrs with addr2line and write the symbols each per line in + // response. + for pc in body.split('+') { + let addr = usize::from_str_radix(pc.trim_start_matches("0x"), 16).unwrap_or(0); + if addr == 0 { + info!("invalid addr: {}", addr); + continue; + } + + // Would be multiple symbols if inlined. + let mut syms = vec![]; + backtrace::resolve(addr as *mut std::ffi::c_void, |sym| { + let name = sym + .name() + .unwrap_or_else(|| backtrace::SymbolName::new(b"")); + syms.push(name.to_string()); + }); + + if !syms.is_empty() { + // join inline functions with '--' + let f = syms.join("--"); + // should be + text.push_str(format!("{:#x} {}\n", addr, f).as_str()); + } else { + info!("can't resolve mapped addr: {:#x}", addr); + text.push_str(format!("{:#x} ??\n", addr).as_str()); + } + } + let response = Response::builder() + .header("Content-Type", mime::TEXT_PLAIN.to_string()) + .header("X-Content-Type-Options", "nosniff") + .header("Content-Length", text.len()) + .body(text.into()) + .unwrap(); + Ok(response) + } + +>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541)) async fn update_config( cfg_controller: ConfigController, req: Request, @@ -319,7 +403,7 @@ where }) } - async fn update_config_from_toml_file( + fn update_config_from_toml_file( cfg_controller: ConfigController, _req: Request, ) -> hyper::Result> { @@ -411,6 +495,19 @@ where } } +<<<<<<< HEAD +======= + fn get_engine_type(cfg_controller: &ConfigController) -> hyper::Result> { + let engine_type = cfg_controller.get_engine_type(); + let response = Response::builder() + .header("Content-Type", mime::TEXT_PLAIN.to_string()) + .header("Content-Length", engine_type.len()) + .body(engine_type.into()) + .unwrap(); + Ok(response) + } + +>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541)) pub fn stop(self) { let _ = self.tx.send(()); self.thread_pool.shutdown_timeout(Duration::from_secs(3)); @@ -428,6 +525,50 @@ impl StatusServer where R: 'static + Send + RaftExtension + Clone, { +<<<<<<< HEAD +======= + fn dump_async_trace() -> hyper::Result> { + Ok(make_response( + StatusCode::OK, + tracing_active_tree::layer::global().fmt_bytes_with(|t, buf| { + t.traverse_with(FormatFlat::new(buf)).unwrap_or_else(|err| { + error!("failed to format tree, unreachable!"; "err" => %err); + }) + }), + )) + } + + fn handle_pause_grpc( + mut grpc_service_mgr: GrpcServiceManager, + ) -> hyper::Result> { + if let Err(err) = grpc_service_mgr.pause() { + return Ok(make_response( + StatusCode::INTERNAL_SERVER_ERROR, + format!("fails to pause grpc: {}", err), + )); + } + Ok(make_response( + StatusCode::OK, + "Successfully pause grpc service", + )) + } + + fn handle_resume_grpc( + mut grpc_service_mgr: GrpcServiceManager, + ) -> hyper::Result> { + if let Err(err) = grpc_service_mgr.resume() { + return Ok(make_response( + StatusCode::INTERNAL_SERVER_ERROR, + format!("fails to resume grpc: {}", err), + )); + } + Ok(make_response( + StatusCode::OK, + "Successfully resume grpc service", + )) + } + +>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541)) pub async fn dump_region_meta(req: Request, router: R) -> hyper::Result> { lazy_static! { static ref REGION: Regex = Regex::new(r"/region/(?P\d+)").unwrap(); @@ -584,22 +725,38 @@ where (Method::GET, "/debug/pprof/heap_deactivate") => { Self::deactivate_heap_prof(req) } +<<<<<<< HEAD // (Method::GET, "/debug/pprof/heap") => { // Self::dump_heap_prof_to_resp(req).await // } +======= + (Method::GET, "/debug/pprof/heap") => { + Self::dump_heap_prof_to_resp(req) + } + (Method::GET, "/debug/pprof/cmdline") => Self::get_cmdline(req), + (Method::GET, "/debug/pprof/symbol") => { + Self::get_symbol_count(req) + } + (Method::POST, "/debug/pprof/symbol") => Self::get_symbol(req).await, +>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541)) (Method::GET, "/config") => { - Self::get_config(req, &cfg_controller).await + Self::get_config(req, &cfg_controller) } (Method::POST, "/config") => { Self::update_config(cfg_controller.clone(), req).await } +<<<<<<< HEAD +======= + (Method::GET, "/engine_type") => { + Self::get_engine_type(&cfg_controller) + } +>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541)) // This interface is used for configuration file hosting scenarios, // TiKV will not update configuration files, and this interface will // silently ignore configration items that cannot be updated online, // hand it over to the hosting platform for processing. (Method::PUT, "/config/reload") => { Self::update_config_from_toml_file(cfg_controller.clone(), req) - .await } (Method::GET, "/debug/pprof/profile") => { Self::dump_cpu_prof_to_resp(req).await @@ -619,8 +776,33 @@ where (Method::GET, "/resource_groups") => { Self::handle_get_all_resource_groups(resource_manager.as_ref()) } +<<<<<<< HEAD _ => Ok(make_response(StatusCode::NOT_FOUND, "path not found")), } +======= + (Method::PUT, "/pause_grpc") => { + Self::handle_pause_grpc(grpc_service_mgr) + } + (Method::PUT, "/resume_grpc") => { + Self::handle_resume_grpc(grpc_service_mgr) + } + (Method::GET, "/async_tasks") => Self::dump_async_trace(), + _ => { + is_unknown_path = true; + Ok(make_response(StatusCode::NOT_FOUND, "path not found")) + }, + }; + // Using "unknown" for unknown paths to void creating high cardinality. + let path_label = if is_unknown_path { + "unknown".to_owned() + } else { + path + }; + STATUS_REQUEST_DURATION + .with_label_values(&[method.as_str(), &path_label]) + .observe(start.elapsed().as_secs_f64()); + res +>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541)) } })) } diff --git a/src/storage/txn/sched_pool.rs b/src/storage/txn/sched_pool.rs index 0cff9d51d41..bd51ca1103d 100644 --- a/src/storage/txn/sched_pool.rs +++ b/src/storage/txn/sched_pool.rs @@ -57,6 +57,7 @@ impl PoolTicker for SchedTicker { #[derive(Clone)] pub enum SchedPool { // separated thread pools for different priority commands +<<<<<<< HEAD Vanilla { high_worker_pool: FuturePool, worker_pool: FuturePool, @@ -66,6 +67,100 @@ pub enum SchedPool { worker_pool: FuturePool, resource_ctl: Arc, }, +======= + Vanilla, + // automatically switch between the `single-queue pool` and `priority-queue pool` based on the + // resource group settings, only used when the resource control feature is enabled. + Dynamic, +} + +#[derive(Clone)] +struct VanillaQueue { + high_worker_pool: FuturePool, + worker_pool: FuturePool, +} + +impl VanillaQueue { + fn spawn( + &self, + priority_level: CommandPri, + f: impl futures::Future + Send + 'static, + ) -> Result<(), Full> { + if priority_level == CommandPri::High { + self.high_worker_pool.spawn(f) + } else { + self.worker_pool.spawn(f) + } + } + + fn scale_pool_size(&self, pool_size: usize) { + self.high_worker_pool + .scale_pool_size(std::cmp::max(1, pool_size / 2)); + self.worker_pool.scale_pool_size(pool_size); + } + + fn get_pool_size(&self, priority_level: CommandPri) -> usize { + if priority_level == CommandPri::High { + self.high_worker_pool.get_pool_size() + } else { + self.worker_pool.get_pool_size() + } + } +} + +#[derive(Clone)] +struct PriorityQueue { + worker_pool: FuturePool, + resource_ctl: Arc, + resource_mgr: Arc, +} + +impl PriorityQueue { + fn spawn( + &self, + metadata: TaskMetadata<'_>, + priority_level: CommandPri, + f: impl futures::Future + Send + 'static, + ) -> Result<(), Full> { + let fixed_level = match priority_level { + CommandPri::High => Some(0), + CommandPri::Normal => None, + CommandPri::Low => Some(2), + }; + // TODO: maybe use a better way to generate task_id + let task_id = rand::random::(); + let group_name = metadata.group_name().to_owned(); + let resource_limiter = self.resource_mgr.get_resource_limiter( + unsafe { std::str::from_utf8_unchecked(&group_name) }, + "", + metadata.override_priority() as u64, + ); + let mut extras = Extras::new_multilevel(task_id, fixed_level); + extras.set_metadata(metadata.to_vec()); + self.worker_pool.spawn_with_extras( + with_resource_limiter( + ControlledFuture::new(f, self.resource_ctl.clone(), group_name), + resource_limiter, + ), + extras, + ) + } + + fn scale_pool_size(&self, pool_size: usize) { + self.worker_pool.scale_pool_size(pool_size); + } + + fn get_pool_size(&self) -> usize { + self.worker_pool.get_pool_size() + } +} + +#[derive(Clone)] +pub struct SchedPool { + vanilla: VanillaQueue, + priority: Option, + queue_type: QueueType, +>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541)) } impl SchedPool {