Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftstore: use future pool for snap-generator pool #15079

Merged
merged 7 commits into from
Jul 10, 2023
29 changes: 16 additions & 13 deletions components/raftstore/src/store/worker/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,11 @@ use raft::eraftpb::Snapshot as RaftSnapshot;
use tikv_util::{
box_err, box_try,
config::VersionTrack,
defer, error, info, thd_name,
defer, error, info,
time::{Instant, UnixSecs},
warn,
worker::{Runnable, RunnableWithTimer},
};
use yatp::{
pool::{Builder, ThreadPool},
task::future::TaskCell,
yatp_pool::{DefaultTicker, FuturePool, YatpPoolBuilder},
};

use super::metrics::*;
Expand Down Expand Up @@ -373,7 +370,7 @@ where
coprocessor_host: CoprocessorHost<EK>,
router: R,
pd_client: Option<Arc<T>>,
pool: ThreadPool<TaskCell>,
pool: FuturePool,
}

impl<EK, R, T> Runner<EK, R, T>
Expand Down Expand Up @@ -407,8 +404,13 @@ where
coprocessor_host,
router,
pd_client,
pool: Builder::new(thd_name!("snap-generator"))
.max_thread_count(cfg.value().snap_generator_pool_size)
pool: YatpPoolBuilder::new(DefaultTicker::default())
.name_prefix("snap-generator")
.thread_count(
1,
cfg.value().snap_generator_pool_size,
cfg.value().snap_generator_pool_size,
)
.build_future_pool(),
}
}
Expand Down Expand Up @@ -858,7 +860,12 @@ where
for_balance,
allow_multi_files_snapshot,
);
});
}).unwrap_or_else(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need notify raftstore that snapshot will not be generated. Otherwise, leader will not retry generate snapshot.

Copy link
Contributor Author

@nolouch nolouch Jul 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think once the notifier is dropped(after failing), the generator can be retried.

Err(TryRecvError::Disconnected) | Ok(_) => {
*snap_state = SnapState::Relax;
warn!(
"failed to try generating snapshot";
"region_id" => self.region.get_id(),
"peer_id" => self.peer_id,
"times" => *tried_cnt,
"request_peer" => to,
);
}
}
}

|e| {
error!("failed to generate snapshot"; "region_id" => region_id, "err" => ?e);
SNAP_COUNTER.generate.fail.inc();
},
);
}
task @ Task::Apply { .. } => {
fail_point!("on_region_worker_apply", true, |_| {});
Expand Down Expand Up @@ -887,10 +894,6 @@ where
}
}
}

fn shutdown(&mut self) {
self.pool.shutdown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you needs to impl it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think no place to use the shutdown current.

}
}

impl<EK, R, T> RunnableWithTimer for Runner<EK, R, T>
Expand Down