Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
202 lines (161 sloc) 6.26 KB


This RFC proposes a new thread pool that supports async operations, which is called ReadPool. It is designed to be specifically used for all sorts of read operations like Kv Get, Kv Scan and Coprocessor Read to improve performance.


Currently, for all KV operations that stay inside Storage, they are executed on the same thread pool. This leads to bad operation isolation, i.e. heavy writes will cause slow reads. In fact, these read operations can be executed on other threads to be not blocked. For Coprocessor operations, we have an end-point worker thread to handle async snapshots. This is a bottleneck when there are heavy coprocessor requests.

By introducing a standalone thread pool that supports async operations (which is called ReadPool) in this RFC, we can fix both problems. For KV operations, read and write operations will be executed on different thread pools, so that they won't affect each other. For Coprocessor operations, since ReadPool supports async operations, the end-point worker is not necessary anymore and the bottleneck no longer exists.

In this RFC, there will be a ReadPool for KV operations and another ReadPool for Coprocessor operations, so that they won't block each other. In the future, it may be possible to merge them together.

Detailed design

The ReadPool provides these features:

  1. Support different priorities (high, normal and low).

  2. Support contexts and periodical ticking so that we can use local metrics.

  3. Support max task threshold as a load control mechanism.

The ReadPool can be implemented via 2 levels. The lower level is a new facility called util::FuturePool, which is a simple encapsulation over CpuPool that supports feature 2. The higher level is server::ReadPool, which assembles multiple FuturePools and supports features 1 and 3. In this way, FuturePool becomes a common facility that can be reused in the future to run other kinds of async operations that need contexts, not just limited to the scenario listed in this RFC.


To support periodical ticking, this RFC defines:

pub trait Context: fmt::Debug + Send {
    fn on_tick(&mut self) {}

The FuturePool user can provide customized impl Context so that some code can be executed periodically within the context of the thread pool's thread.

The FuturePool is defined as follows:

pub struct FuturePool<T: Context + 'static> {
    pool: CpuPool,
    context: Arc<HashMap<thread::ThreadId, T>>
    running_task_count: Arc<AtomicUsize>,

The logic of whether to call on_tick is, when each task finishes running, we check how much time has elapsed. If the elapsed time since the last tick is longer than our tick interval, we call on_tick. In order to do so in a clean way, we can wrap this logic into a ContextDelegator:

struct ContextDelegator<T: Context> {
    tick_interval: Duration,
    inner: RefCell<T>,
    last_tick: Cell<Option<Instant>>,

impl<T: Context> ContextDelegator<T> {
    fn new(context: T, tick_interval: Duration) -> ContextDelegator<T> {
        // ...

    fn on_task_finish(&self) {
        // check and optionally call `self.inner.borrow_mut().on_tick()`

FuturePool users should pass a Future to the FuturePool to execute, just like CpuPool. However, to obtain access to the Context inside Future, the provided Future should be wrapped in a closure which provides access to the Context via the parameter. In the further, the parameter may live multiple Futures, which may run on different threads and different Contexts. Thus, the parameter is a Context accessor instead of a specific Context.

As a result, this RFC further introduces the following struct, which provides an access to the current Context based on running thread:

pub struct ContextDelegators<T: Context> {

impl<T: Context> ContextDelegators<T> {
    pub fn current_thread_context_mut(&self) -> RefMut<T> {

The FuturePool provides spawn interface to execute a Future while providing ContextDelegators:

impl<T: Context + 'static> FuturePool<T> {
    pub fn spawn<F, R>(&self, future_factory: R) -> CpuFuture<F::Item, F::Error>
        R: FnOnce(ContextDelegators<T>) -> F + Send + 'static,
        F: Future + Send + 'static,
        F::Item: Send + 'static,
        F::Error: Send + 'static,


ReadPool is implemented over FuturePool as follows:

pub struct ReadPool<T: Context + 'static> {
    pool_high: FuturePool<T>,
    pool_normal: FuturePool<T>,
    pool_low: FuturePool<T>,
    max_tasks_high: usize,
    max_tasks_normal: usize,
    max_tasks_low: usize,

It also provides an interface similar to FuturePool::spawn, which specifies the priority and returns an error when FuturePool is full:

pub enum Priority {

impl<T: futurepool::Context + 'static> ReadPool<T> {
    pub fn future_execute<F, R>(
        priority: Priority,
        future_factory: R,
    ) -> Result<CpuFuture<F::Item, F::Error>, Full>
        R: FnOnce(ContextDelegators<T>) -> F + Send + 'static,
        F: Future + Send + 'static,
        F::Item: Send + 'static,
        F::Error: Send + 'static,
        let pool = self.get_pool_by_priority(priority);
        let max_tasks = self.get_max_tasks_by_priority(priority);
        let current_tasks = pool.get_running_task_count();
        if current_tasks >= max_tasks {
            Err(Full {
        } else {


The on_tick implementation in FuturePool is not perfect. It is driven by task completion, which means it will not be executed when there is no task. This is acceptable since we are going to use on_tick to report metrics.


We can implement our own future pool instead of utilizing CpuPool. In this way, we can have a true on_tick. However, this is complex and is not a necessary feature we need.

We can implement our own async model instead of utilizing Future. However, there is no benefits compared to the current solution.

Unresolved questions