Skip to content

Commit

Permalink
Gather Schedulers
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus committed Sep 11, 2018
1 parent 82ef9f1 commit 77886c5
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 26 deletions.
60 changes: 43 additions & 17 deletions src/storage/txn/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ use storage::{
};
use storage::{Key, KvPair, MvccInfo, Value};
use util::collections::HashMap;
use util::threadpool::{
Context as ThreadContext, ContextFactory as ThreadContextFactory, Scheduler as PoolScheduler,
};
use util::threadpool::{self, Context as ThreadContext, ContextFactory as ThreadContextFactory};
use util::time::SlowTimer;
use util::worker::{self, ScheduleError};

Expand Down Expand Up @@ -162,8 +160,7 @@ impl Task {
self,
cb_ctx: CbContext,
snapshot: EngineResult<E::Snap>,
pool_scheduler: PoolScheduler<SchedContext<E>>,
scheduler: worker::Scheduler<Msg>,
mut channels: Channels<E>,
) {
debug!(
"receive snapshot finish msg for cid={}, cb_ctx={:?}",
Expand All @@ -176,15 +173,15 @@ impl Task {
.with_label_values(&[self.tag, "snapshot_ok"])
.inc();

self.process_by_worker(cb_ctx, snapshot, pool_scheduler, scheduler);
self.process_by_worker(cb_ctx, snapshot, channels);
}
Err(err) => {
error!("get snapshot failed for cid={}, error {:?}", self.cid, err);
SCHED_STAGE_COUNTER_VEC
.with_label_values(&[self.tag, "snapshot_err"])
.inc();
handle_schedule(
scheduler,
channels.take_scheduler(),
Msg::FinishedWithErr {
cid: self.cid,
err: Error::from(err),
Expand All @@ -199,8 +196,7 @@ impl Task {
mut self,
cb_ctx: CbContext,
snapshot: E::Snap,
pool_scheduler: PoolScheduler<SchedContext<E>>,
scheduler: worker::Scheduler<Msg>,
mut channels: Channels<E>,
) {
SCHED_STAGE_COUNTER_VEC
.with_label_values(&[self.tag, "process"])
Expand All @@ -215,28 +211,29 @@ impl Task {
}
let readcmd = cmd.readonly();
let tag = cmd.tag();
let pool = channels.take_pool();
if readcmd {
pool_scheduler.schedule(move |ctx: &mut SchedContext<E>| {
pool.schedule(move |ctx: &mut SchedContext<E>| {
fail_point!("scheduler_async_snapshot_finish");

let _processing_read_timer = ctx
.processing_read_duration
.with_label_values(&[tag])
.start_coarse_timer();

let s = self.process_read(ctx, cmd, scheduler, snapshot);
let s = self.process_read(ctx, cmd, snapshot, channels);
ctx.add_statistics(tag, &s);
});
} else {
pool_scheduler.schedule(move |ctx: &mut SchedContext<E>| {
pool.schedule(move |ctx: &mut SchedContext<E>| {
fail_point!("scheduler_async_snapshot_finish");

let _processing_write_timer = ctx
.processing_write_duration
.with_label_values(&[tag])
.start_coarse_timer();

let s = self.process_write(cmd, scheduler, snapshot, ctx);
let s = self.process_write(ctx, cmd, snapshot, channels);
ctx.add_statistics(tag, &s);
});
}
Expand All @@ -248,8 +245,8 @@ impl Task {
self,
sched_ctx: &mut SchedContext<E>,
cmd: Command,
scheduler: worker::Scheduler<Msg>,
snapshot: E::Snap,
mut channels: Channels<E>,
) -> Statistics {
fail_point!("txn_before_process_read");
debug!("process read cmd(cid={}) in worker pool", self.cid);
Expand All @@ -258,23 +255,27 @@ impl Task {
Err(e) => ProcessResult::Failed { err: e.into() },
Ok(pr) => pr,
};
handle_schedule(scheduler, Msg::ReadFinished { task: self, pr });
handle_schedule(
channels.take_scheduler(),
Msg::ReadFinished { task: self, pr },
);
statistics
}

/// Processes a write command within a worker thread, then posts either a `WritePrepareFinished`
/// message if successful or a `WritePrepareFailed` message back to the event loop.
fn process_write<E: Engine>(
self,
sched_ctx: &SchedContext<E>,
cmd: Command,
scheduler: worker::Scheduler<Msg>,
snapshot: E::Snap,
sched_ctx: &SchedContext<E>,
mut channels: Channels<E>,
) -> Statistics {
fail_point!("txn_before_process_write");
let cid = self.cid;
let mut statistics = Statistics::default();
let cmd_tag = cmd.tag();
let scheduler = channels.take_scheduler();
let msg = match process_write_impl(cmd, snapshot, &mut statistics) {
// Initiates an async write operation on the storage engine, there'll be a `WriteFinished`
// message when it finishes.
Expand Down Expand Up @@ -683,6 +684,31 @@ impl<E: Engine> ThreadContext for SchedContext<E> {
}
}

pub struct Channels<E: Engine> {
scheduler: Option<worker::Scheduler<Msg>>,
pool: Option<threadpool::Scheduler<SchedContext<E>>>,
}

impl<E: Engine> Channels<E> {
pub fn new(
scheduler: worker::Scheduler<Msg>,
pool: threadpool::Scheduler<SchedContext<E>>,
) -> Self {
Channels {
scheduler: Some(scheduler),
pool: Some(pool),
}
}

fn take_scheduler(&mut self) -> worker::Scheduler<Msg> {
self.scheduler.take().unwrap()
}

fn take_pool(&mut self) -> threadpool::Scheduler<SchedContext<E>> {
self.pool.take().unwrap()
}
}

// Make clippy happy.
type MultipleReturnValue = (Option<MvccLock>, Vec<(u64, Write)>, Vec<(u64, Value)>);

Expand Down
20 changes: 11 additions & 9 deletions src/storage/txn/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ use util::worker::{self, Runnable};

use super::super::metrics::*;
use super::latch::{Latches, Lock};
use super::process::{execute_callback, ProcessResult, SchedContext, SchedContextFactory, Task};
use super::process::{
execute_callback, Channels, ProcessResult, SchedContext, SchedContextFactory, Task,
};
use super::Error;

pub const CMD_BATCH_SIZE: usize = 256;
Expand Down Expand Up @@ -214,11 +216,14 @@ impl<E: Engine> Scheduler<E> {
entity
}

pub fn fetch_worker_pool(&self, priority: CommandPri) -> &ThreadPool<SchedContext<E>> {
match priority {
pub fn fetch_channels(&self, priority: CommandPri) -> Channels<E> {
let pool = match priority {
CommandPri::Low | CommandPri::Normal => &self.worker_pool,
CommandPri::High => &self.high_priority_pool,
}
};
let pool_scheduler = pool.scheduler();
let scheduler = self.scheduler.clone();
Channels::new(scheduler, pool_scheduler)
}

/// Event handler for new command.
Expand Down Expand Up @@ -289,14 +294,11 @@ impl<E: Engine> Scheduler<E> {
let mut task = self.dequeue_task(cid);
let tag = task.tag;
let ctx = task.cmd.as_ref().unwrap().get_context().clone();
let scheduler = self.scheduler.clone();
let pool_scheduler = self
.fetch_worker_pool(task.cmd.as_ref().unwrap().priority())
.scheduler();
let channels = self.fetch_channels(task.cmd.as_ref().unwrap().priority());

task.start();
let cb = box move |(cb_ctx, snapshot)| {
task.on_snapshot_finished(cb_ctx, snapshot, pool_scheduler, scheduler);
task.on_snapshot_finished(cb_ctx, snapshot, channels);
};
if let Err(e) = self.engine.async_snapshot(&ctx, cb) {
error!("engine async_snapshot failed, err: {:?}", e);
Expand Down

0 comments on commit 77886c5

Please sign in to comment.