Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#16541
Browse files Browse the repository at this point in the history
close tikv#16540

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
overvenus authored and ti-chi-bot committed Mar 18, 2024
1 parent 4e32aff commit 81bea72
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 58 deletions.
1 change: 1 addition & 0 deletions components/backup-stream/src/checkpoint_manager.rs
Expand Up @@ -613,6 +613,7 @@ pub mod tests {
Self(Arc::new(Mutex::new(inner)))
}

#[allow(clippy::unused_async)]
pub async fn fail(&self, status: RpcStatus) -> crate::errors::Result<()> {
panic!("failed in a case should never fail: {}", status);
}
Expand Down
6 changes: 3 additions & 3 deletions components/backup-stream/src/router.rs
Expand Up @@ -938,7 +938,7 @@ impl StreamTaskInfo {
#[allow(clippy::map_entry)]
if !w.contains_key(&key) {
let path = key.temp_file_name();
let val = Mutex::new(DataFile::new(path, &self.temp_file_pool).await?);
let val = Mutex::new(DataFile::new(path, &self.temp_file_pool)?);
w.insert(key, val);
}

Expand Down Expand Up @@ -1442,7 +1442,7 @@ impl MetadataInfo {
impl DataFile {
/// create and open a logfile at the path.
/// Note: if a file with same name exists, would truncate it.
async fn new(local_path: impl AsRef<Path>, files: &Arc<TempFilePool>) -> Result<Self> {
fn new(local_path: impl AsRef<Path>, files: &Arc<TempFilePool>) -> Result<Self> {
let sha256 = Hasher::new(MessageDigest::sha256())
.map_err(|err| Error::Other(box_err!("openssl hasher failed to init: {}", err)))?;
let inner = files.open_for_write(local_path.as_ref())?;
Expand Down Expand Up @@ -2432,7 +2432,7 @@ mod tests {
let mut f = pool.open_for_write(file_path).unwrap();
f.write_all(b"test-data").await?;
f.done().await?;
let mut data_file = DataFile::new(&file_path, &pool).await.unwrap();
let mut data_file = DataFile::new(file_path, &pool).unwrap();
let info = DataFileInfo::new();

let mut meta = MetadataInfo::with_capacity(1);
Expand Down
117 changes: 116 additions & 1 deletion components/backup-stream/src/subscription_manager.rs
Expand Up @@ -475,7 +475,7 @@ where
let now = Instant::now();
let timedout = self.wait(Duration::from_secs(5)).await;
if timedout {
warn!("waiting for initial scanning done timed out, forcing progress!";
warn!("waiting for initial scanning done timed out, forcing progress!";
"take" => ?now.saturating_elapsed(), "timedout" => %timedout);
}
let regions = resolver.resolve(self.subs.current_regions(), min_ts).await;
Expand All @@ -492,10 +492,114 @@ where
}
callback(ResolvedRegions::new(rts, cps));
}
<<<<<<< HEAD
=======
ObserveOp::HighMemUsageWarning { region_id } => {
self.on_high_memory_usage(region_id);
}
>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541))
}
}
}

<<<<<<< HEAD
=======
async fn on_observe_result(
&mut self,
region: Region,
handle: ObserveHandle,
err: Option<Box<Error>>,
) {
let err = match err {
None => {
self.failure_count.remove(&region.id);
let sub = self.subs.get_subscription_of(region.id);
if let Some(mut sub) = sub {
if sub.value().handle.id == handle.id {
sub.value_mut().resolver.phase_one_done();
}
}
return;
}
Some(err) => {
if !should_retry(&err) {
self.failure_count.remove(&region.id);
self.subs
.deregister_region_if(&region, |sub, _| sub.handle.id == handle.id);
return;
}
err
}
};

let region_id = region.id;
match self.retry_observe(region.clone(), handle).await {
Ok(has_resent_req) => {
if !has_resent_req {
self.failure_count.remove(&region_id);
}
}
Err(e) => {
self.issue_fatal_of(
&region,
e.context(format_args!(
"retry encountered error, origin error is {}",
err
)),
);
self.failure_count.remove(&region_id);
}
}
}

fn on_high_memory_usage(&mut self, inconsistent_region_id: u64) {
let mut lame_region = Region::new();
lame_region.set_id(inconsistent_region_id);
let mut act_region = None;
self.subs.deregister_region_if(&lame_region, |act, _| {
act_region = Some(act.meta.clone());
true
});
let delay = OOM_BACKOFF_BASE
+ Duration::from_secs(rand::thread_rng().gen_range(0..OOM_BACKOFF_JITTER_SECS));
info!("log backup triggering high memory usage.";
"region" => %inconsistent_region_id,
"mem_usage" => %self.memory_manager.used_ratio(),
"mem_max" => %self.memory_manager.capacity());
if let Some(region) = act_region {
self.schedule_start_observe(delay, region, None);
}
}

fn schedule_start_observe(
&self,
backoff: Duration,
region: Region,
handle: Option<ObserveHandle>,
) {
let tx = self.messenger.upgrade();
let region_id = region.id;
if tx.is_none() {
warn!(
"log backup subscription manager: cannot upgrade self-sender, are we shutting down?"
);
return;
}
let tx = tx.unwrap();
// tikv_util::Instant cannot be converted to std::time::Instant :(
let start = std::time::Instant::now();
let scheduled = async move {
tokio::time::sleep_until((start + backoff).into()).await;
let handle = handle.unwrap_or_else(|| ObserveHandle::new());
if let Err(err) = tx.send(ObserveOp::Start { region, handle }).await {
warn!("log backup failed to schedule start observe."; "err" => %err);
}
};
tokio::spawn(root!("scheduled_subscription"; scheduled; "after" = ?backoff, region_id));
}

#[instrument(skip_all, fields(id = region.id))]
>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541))
async fn refresh_resolver(&self, region: &Region) {
let need_refresh_all = !self.subs.try_update_region(region);

Expand Down Expand Up @@ -722,6 +826,17 @@ where
) {
self.subs
.register_region(region, handle.clone(), Some(last_checkpoint));
<<<<<<< HEAD
=======
let feedback_channel = match self.messenger.upgrade() {
Some(ch) => ch,
None => {
warn!("log backup subscription manager is shutting down, aborting new scan.";
utils::slog_region(region), "handle" => ?handle.id);
return;
}
};
>>>>>>> 66847e9c5a (*: remove unnecessary async blocks to save memory (#16541))
self.spawn_scan(ScanCmd {
region: region.clone(),
handle,
Expand Down
12 changes: 6 additions & 6 deletions components/backup/src/softlimit.rs
Expand Up @@ -38,7 +38,7 @@ impl SoftLimit {
Ok(())
}

async fn grant_tokens(&self, n: usize) {
fn grant_tokens(&self, n: usize) {
self.0.semaphore.add_permits(n);
}

Expand All @@ -53,9 +53,9 @@ impl SoftLimit {

/// Grows the tasks can be executed concurrently by n
#[cfg(test)]
pub async fn grow(&self, n: usize) {
pub fn grow(&self, n: usize) {
self.0.cap.fetch_add(n, Ordering::SeqCst);
self.grant_tokens(n).await;
self.grant_tokens(n);
}

/// resize the tasks available concurrently.
Expand All @@ -66,7 +66,7 @@ impl SoftLimit {
self.take_tokens(current - target).await?;
}
CmpOrder::Less => {
self.grant_tokens(target - current).await;
self.grant_tokens(target - current);
}
_ => {}
}
Expand Down Expand Up @@ -304,7 +304,7 @@ mod softlimit_test {
)
.await;

limit_cloned.grow(1).await;
limit_cloned.grow(1);
let working_cloned = working.clone();
should_satisfy_in(
Duration::from_secs(10),
Expand All @@ -314,7 +314,7 @@ mod softlimit_test {
.await;

let working_cloned = working.clone();
limit_cloned.grow(2).await;
limit_cloned.grow(2);
should_satisfy_in(
Duration::from_secs(10),
"waiting for worker grow to 4",
Expand Down
7 changes: 3 additions & 4 deletions components/resolved_ts/src/scanner.rs
Expand Up @@ -43,7 +43,7 @@ pub struct ScanTask {
}

impl ScanTask {
async fn send_entries(&self, entries: ScanEntries, apply_index: u64) {
fn send_entries(&self, entries: ScanEntries, apply_index: u64) {
let task = Task::ScanLocks {
region_id: self.region.get_id(),
observe_id: self.handle.id,
Expand Down Expand Up @@ -159,11 +159,10 @@ impl<T: 'static + CdcHandle<E>, E: KvEngine> ScannerPool<T, E> {
if has_remaining {
start_key = Some(locks.last().unwrap().0.clone())
}
task.send_entries(ScanEntries::Lock(locks), apply_index)
.await;
task.send_entries(ScanEntries::Lock(locks), apply_index);
}
RTS_SCAN_DURATION_HISTOGRAM.observe(start.saturating_elapsed().as_secs_f64());
task.send_entries(ScanEntries::None, apply_index).await;
task.send_entries(ScanEntries::None, apply_index);
};
self.workers.spawn(fut);
}
Expand Down
1 change: 1 addition & 0 deletions components/resource_control/src/future.rs
Expand Up @@ -275,6 +275,7 @@ mod tests {
}
}

#[allow(clippy::unused_async)]
async fn empty() {}

#[test]
Expand Down
17 changes: 11 additions & 6 deletions components/tikv_util/src/yatp_pool/future_pool.rs
Expand Up @@ -13,6 +13,7 @@ use std::{

use fail::fail_point;
use futures::channel::oneshot::{self, Canceled};
use futures_util::future::FutureExt;
use prometheus::{IntCounter, IntGauge};
use tracker::TrackedFuture;
use yatp::{queue::Extras, task::future};
Expand Down Expand Up @@ -216,11 +217,13 @@ impl PoolInner {

metrics_running_task_count.inc();

let f = async move {
let _ = future.await;
// NB: Prefer FutureExt::map to async block, because an async block
// doubles memory usage.
// See https://github.com/rust-lang/rust/issues/59087
let f = future.map(move |_| {
metrics_handled_task_count.inc();
metrics_running_task_count.dec();
};
});

if let Some(extras) = extras {
self.pool.spawn(future::TaskCell::new(f, extras));
Expand All @@ -246,12 +249,14 @@ impl PoolInner {

let (tx, rx) = oneshot::channel();
metrics_running_task_count.inc();
self.pool.spawn(async move {
let res = future.await;
// NB: Prefer FutureExt::map to async block, because an async block
// doubles memory usage.
// See https://github.com/rust-lang/rust/issues/59087
self.pool.spawn(future.map(move |res| {
metrics_handled_task_count.inc();
metrics_running_task_count.dec();
let _ = tx.send(res);
});
}));
Ok(rx)
}
}
Expand Down
27 changes: 24 additions & 3 deletions scripts/clippy
Expand Up @@ -21,8 +21,6 @@ fi
# - `derive_partial_eq_without_eq` has compilation overhead.
# - Blocking issue for enabling `result_large_err` is the protobuf messages.
# - Blocking issue for clippy::large_enum_variant is the raftstore peer message.
# - Enables `clippy::needless_return_with_question_mark` after
# https://github.com/rust-lang/rust-clippy/issues/11982 is fixed.
CLIPPY_LINTS=(
-A clippy::module_inception \
-A clippy::result_large_err \
Expand Down Expand Up @@ -50,9 +48,32 @@ CLIPPY_LINTS=(
-D clippy::disallowed_methods \
-D rust-2018-idioms \
-D clippy::assertions_on_result_states \
-A clippy::needless_return_with_question_mark \
-A clippy::non_canonical_partial_ord_impl \
-A clippy::arc_with_non_send_sync \
)

# TODO: Enables `clippy::needless_return_with_question_mark` after
# https://github.com/rust-lang/rust-clippy/issues/11982 is fixed.
CLIPPY_LINTS+=(
-A clippy::needless_return_with_question_mark \
)

# We should be pedantic about writing async code, as it's easy to write
# suboptimal or even bloat code. See:
# - https://github.com/rust-lang/rust/issues/69826
# - https://github.com/rust-lang/rust/issues/69663
# - https://github.com/rust-lang/rust/issues/71407
CLIPPY_LINTS+=(
-D clippy::redundant_async_block \
-D clippy::unused_async \
-D clippy::manual_async_fn \
-D clippy::large_futures \
)

# Allow let_underscore_future temporary due to lots of counterexamples in
# tests.
# TODO: deny it.
CLIPPY_LINTS+=(
-A clippy::let_underscore_future \
)

Expand Down
20 changes: 10 additions & 10 deletions src/read_pool.rs
Expand Up @@ -12,7 +12,10 @@ use std::{
};

use file_system::{set_io_type, IoType};
use futures::{channel::oneshot, future::TryFutureExt};
use futures::{
channel::oneshot,
future::{FutureExt, TryFutureExt},
};
use kvproto::{errorpb, kvrpcpb::CommandPri};
use online_config::{ConfigChange, ConfigManager, ConfigValue, Result as CfgResult};
use prometheus::{core::Metric, Histogram, IntCounter, IntGauge};
Expand Down Expand Up @@ -172,10 +175,9 @@ impl ReadPoolHandle {
TaskCell::new(
TrackedFuture::new(with_resource_limiter(
ControlledFuture::new(
async move {
f.await;
f.map(move |_| {
running_tasks.dec();
},
}),
resource_ctl.clone(),
group_name,
),
Expand All @@ -185,10 +187,9 @@ impl ReadPoolHandle {
)
} else {
TaskCell::new(
TrackedFuture::new(async move {
f.await;
TrackedFuture::new(f.map(move |_| {
running_tasks.dec();
}),
})),
extras,
)
};
Expand All @@ -212,10 +213,9 @@ impl ReadPoolHandle {
{
let (tx, rx) = oneshot::channel::<T>();
let res = self.spawn(
async move {
let res = f.await;
f.map(move |res| {
let _ = tx.send(res);
},
}),
priority,
task_id,
metadata,
Expand Down

0 comments on commit 81bea72

Please sign in to comment.