Skip to content

Commit

Permalink
log_backup: make a more rusty CallbackWaitGroup (#16740) (#16755)
Browse files Browse the repository at this point in the history
close #16739

This make `CallbackWaitGroup` returns an equivalent future of the `BoxFuture` returned by `wait`. Also this fixed where a stale notify may also resolve the future.

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
Signed-off-by: Yu Juncen <yu745514916@live.com>
Signed-off-by: hillium <yujuncen@pingcap.com>

Co-authored-by: 山岚 <36239017+YuJuncen@users.noreply.github.com>
Co-authored-by: Yu Juncen <yu745514916@live.com>
Co-authored-by: hillium <yujuncen@pingcap.com>
  • Loading branch information
4 people committed Apr 3, 2024
1 parent 84bb9e0 commit fcbd162
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 50 deletions.
4 changes: 2 additions & 2 deletions components/backup-stream/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ use crate::{
subscription_manager::{RegionSubscriptionManager, ResolvedRegions},
subscription_track::{Ref, RefMut, ResolveResult, SubscriptionTracer},
try_send,
utils::{self, CallbackWaitGroup, StopWatch, Work},
utils::{self, FutureWaitGroup, StopWatch, Work},
};

const SLOW_EVENT_THRESHOLD: f64 = 120.0;
Expand Down Expand Up @@ -1082,7 +1082,7 @@ where
}

pub fn do_backup(&self, events: Vec<CmdBatch>) {
let wg = CallbackWaitGroup::new();
let wg = FutureWaitGroup::new();
for batch in events {
self.backup_batch(batch, wg.clone().work());
}
Expand Down
18 changes: 10 additions & 8 deletions components/backup-stream/src/subscription_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::{
router::{Router, TaskSelector},
subscription_track::{CheckpointType, ResolveResult, SubscriptionTracer},
try_send,
utils::{self, CallbackWaitGroup, Work},
utils::{self, FutureWaitGroup, Work},
Task,
};

Expand Down Expand Up @@ -296,7 +296,7 @@ pub struct RegionSubscriptionManager<S, R, PDC> {

messenger: Sender<ObserveOp>,
scan_pool_handle: Arc<ScanPoolHandle>,
scans: Arc<CallbackWaitGroup>,
scans: Arc<FutureWaitGroup>,
}

impl<S, R, PDC> Clone for RegionSubscriptionManager<S, R, PDC>
Expand All @@ -317,7 +317,7 @@ where
subs: self.subs.clone(),
messenger: self.messenger.clone(),
scan_pool_handle: self.scan_pool_handle.clone(),
scans: CallbackWaitGroup::new(),
scans: FutureWaitGroup::new(),
}
}
}
Expand Down Expand Up @@ -373,7 +373,7 @@ where
subs: initial_loader.tracing,
messenger: tx,
scan_pool_handle: Arc::new(scan_pool_handle),
scans: CallbackWaitGroup::new(),
scans: FutureWaitGroup::new(),
};
let fut = op.clone().region_operator_loop(rx, leader_checker);
(op, fut)
Expand All @@ -390,8 +390,10 @@ where
}

/// wait initial scanning get finished.
pub fn wait(&self, timeout: Duration) -> future![bool] {
tokio::time::timeout(timeout, self.scans.wait()).map(|result| result.is_err())
pub async fn wait(&self, timeout: Duration) -> bool {
tokio::time::timeout(timeout, self.scans.wait())
.map(move |result| result.is_err())
.await
}

/// the handler loop.
Expand Down Expand Up @@ -754,7 +756,7 @@ mod test {
use std::time::Duration;

use super::ScanCmd;
use crate::{subscription_manager::spawn_executors, utils::CallbackWaitGroup};
use crate::{subscription_manager::spawn_executors, utils::FutureWaitGroup};

fn should_finish_in(f: impl FnOnce() + Send + 'static, d: std::time::Duration) {
let (tx, rx) = futures::channel::oneshot::channel();
Expand All @@ -771,7 +773,7 @@ mod test {
}

let pool = spawn_executors(NoopInitialScan, 1);
let wg = CallbackWaitGroup::new();
let wg = FutureWaitGroup::new();
fail::cfg("execute_scan_command_sleep_100", "return").unwrap();
for _ in 0..100 {
let wg = wg.clone();
Expand Down
94 changes: 54 additions & 40 deletions components/backup-stream/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,21 @@ use std::{
borrow::Borrow,
cell::RefCell,
collections::{hash_map::RandomState, BTreeMap, HashMap},
future::Future,
ops::{Bound, RangeBounds},
path::Path,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
task::Context,
task::{Context, Waker},
time::Duration,
};

use async_compression::{tokio::write::ZstdEncoder, Level};
use engine_rocks::ReadPerfInstant;
use engine_traits::{CfName, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE};
use futures::{ready, task::Poll, FutureExt};
use futures::{ready, task::Poll};
use kvproto::{
brpb::CompressionType,
metapb::Region,
Expand All @@ -37,13 +38,12 @@ use tikv_util::{
use tokio::{
fs::File,
io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter},
sync::{oneshot, Mutex, RwLock},
sync::{Mutex, RwLock},
};
use txn_types::{Key, Lock, LockType};

use crate::{
errors::{Error, Result},
metadata::store::BoxFuture,
router::TaskSelector,
Task,
};
Expand Down Expand Up @@ -378,9 +378,43 @@ pub fn should_track_lock(l: &Lock) -> bool {
}
}

pub struct CallbackWaitGroup {
pub struct FutureWaitGroup {
running: AtomicUsize,
on_finish_all: std::sync::Mutex<Vec<Box<dyn FnOnce() + Send + 'static>>>,
wakers: std::sync::Mutex<Vec<Waker>>,
}

pub struct Work(Arc<FutureWaitGroup>);

impl Drop for Work {
fn drop(&mut self) {
self.0.work_done();
}
}

pub struct WaitAll<'a>(&'a FutureWaitGroup);

impl<'a> Future for WaitAll<'a> {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Fast path: nothing to wait.
let running = self.0.running.load(Ordering::SeqCst);
if running == 0 {
return Poll::Ready(());
}

// <1>
let mut callbacks = self.0.wakers.lock().unwrap();
callbacks.push(cx.waker().clone());
let running = self.0.running.load(Ordering::SeqCst);
// Unlikely path: if all background tasks finish at <1>, there will be a long
// period that nobody will wake the `wakers` even the condition is ready.
// We need to help ourselves here.
if running == 0 {
callbacks.drain(..).for_each(|w| w.wake());
}
Poll::Pending
}
}

/// A shortcut for making an opaque future type for return type or argument
Expand All @@ -392,42 +426,26 @@ macro_rules! future {
($t:ty) => { impl core::future::Future<Output = $t> + Send + 'static };
}

impl CallbackWaitGroup {
impl FutureWaitGroup {
pub fn new() -> Arc<Self> {
Arc::new(Self {
running: AtomicUsize::new(0),
on_finish_all: std::sync::Mutex::default(),
wakers: Default::default(),
})
}

fn work_done(&self) {
let last = self.running.fetch_sub(1, Ordering::SeqCst);
if last == 1 {
self.on_finish_all
.lock()
.unwrap()
.drain(..)
.for_each(|x| x())
self.wakers.lock().unwrap().drain(..).for_each(|x| {
x.wake();
})
}
}

/// wait until all running tasks done.
pub fn wait(&self) -> BoxFuture<()> {
// Fast path: no uploading.
if self.running.load(Ordering::SeqCst) == 0 {
return Box::pin(futures::future::ready(()));
}

let (tx, rx) = oneshot::channel();
self.on_finish_all.lock().unwrap().push(Box::new(move || {
// The waiter may timed out.
let _ = tx.send(());
}));
// try to acquire the lock again.
if self.running.load(Ordering::SeqCst) == 0 {
return Box::pin(futures::future::ready(()));
}
Box::pin(rx.map(|_| ()))
pub fn wait(&self) -> WaitAll<'_> {
WaitAll(self)
}

/// make a work, as long as the return value held, mark a work in the group
Expand All @@ -438,14 +456,6 @@ impl CallbackWaitGroup {
}
}

pub struct Work(Arc<CallbackWaitGroup>);

impl Drop for Work {
fn drop(&mut self) {
self.0.work_done();
}
}

struct ReadThroughputRecorder {
// The system tool set.
ins: Option<OsInspector>,
Expand Down Expand Up @@ -805,7 +815,7 @@ mod test {
use kvproto::metapb::{Region, RegionEpoch};
use tokio::io::{AsyncWriteExt, BufReader};

use crate::utils::{is_in_range, CallbackWaitGroup, SegmentMap};
use crate::utils::{is_in_range, FutureWaitGroup, SegmentMap};

#[test]
fn test_redact() {
Expand Down Expand Up @@ -914,8 +924,8 @@ mod test {
}

fn run_case(c: Case) {
let wg = FutureWaitGroup::new();
for i in 0..c.repeat {
let wg = CallbackWaitGroup::new();
let cnt = Arc::new(AtomicUsize::new(c.bg_task));
for _ in 0..c.bg_task {
let cnt = cnt.clone();
Expand All @@ -926,7 +936,7 @@ mod test {
});
}
block_on(tokio::time::timeout(Duration::from_secs(20), wg.wait())).unwrap();
assert_eq!(cnt.load(Ordering::SeqCst), 0, "{:?}@{}", c, i);
assert_eq!(cnt.load(Ordering::SeqCst), 0, "{:?}@{}", c, i,);
}
}

Expand All @@ -943,6 +953,10 @@ mod test {
bg_task: 512,
repeat: 1,
},
Case {
bg_task: 16,
repeat: 10000,
},
Case {
bg_task: 2,
repeat: 100000,
Expand Down

0 comments on commit fcbd162

Please sign in to comment.