Skip to content

Commit

Permalink
Gather related functions
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus committed Sep 10, 2018
1 parent 9b977b5 commit 71ae317
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 78 deletions.
74 changes: 37 additions & 37 deletions src/storage/txn/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,20 @@ pub struct Task {
pub slow_timer: Option<SlowTimer>,
}

impl Drop for Task {
fn drop(&mut self) {
if let Some(ref mut timer) = self.slow_timer {
slow_log!(
timer,
"[region {}] scheduler handle command: {}, ts: {}",
self.region_id,
self.tag,
self.ts
);
}
}
}

impl Task {
/// Creates a task for a running command.
pub fn new(cid: u64, cmd: Command, lock: Lock, cb: StorageCb) -> Task {
Expand Down Expand Up @@ -141,20 +155,6 @@ impl Task {
}
}

impl Drop for Task {
fn drop(&mut self) {
if let Some(ref mut timer) = self.slow_timer {
slow_log!(
timer,
"[region {}] scheduler handle command: {}, ts: {}",
self.region_id,
self.tag,
self.ts
);
}
}
}

impl<E: Engine> Scheduler<E> {
/// Event handler for the completion of get snapshot.
///
Expand Down Expand Up @@ -257,6 +257,29 @@ fn process_read<E: Engine>(
statistics
}

/// Processes a write command within a worker thread, then posts either a `WritePrepareFinished`
/// message if successful or a `WritePrepareFailed` message back to the event loop.
fn process_write<E: Engine>(
cid: u64,
cmd: Command,
scheduler: worker::Scheduler<Msg<E>>,
snapshot: E::Snap,
) -> Statistics {
fail_point!("txn_before_process_write");
debug!("process write cmd(cid={} {}) in worker pool", cid, cmd);
let mut statistics = Statistics::default();
if let Err(e) = process_write_impl(cid, cmd, scheduler.clone(), snapshot, &mut statistics) {
if let Err(err) = scheduler.schedule(Msg::WritePrepareFailed { cid, err: e }) {
// Todo: if this happens, lock will hold for ever
panic!(
"schedule WritePrepareFailed msg failed. cid={}, err={:?}",
cid, err
);
}
}
statistics
}

fn process_read_impl<E: Engine>(
sched_ctx: &mut SchedContext,
mut cmd: Command,
Expand Down Expand Up @@ -399,29 +422,6 @@ fn process_read_impl<E: Engine>(
}
}

/// Processes a write command within a worker thread, then posts either a `WritePrepareFinished`
/// message if successful or a `WritePrepareFailed` message back to the event loop.
fn process_write<E: Engine>(
cid: u64,
cmd: Command,
scheduler: worker::Scheduler<Msg<E>>,
snapshot: E::Snap,
) -> Statistics {
fail_point!("txn_before_process_write");
debug!("process write cmd(cid={} {}) in worker pool", cid, cmd);
let mut statistics = Statistics::default();
if let Err(e) = process_write_impl(cid, cmd, scheduler.clone(), snapshot, &mut statistics) {
if let Err(err) = scheduler.schedule(Msg::WritePrepareFailed { cid, err: e }) {
// Todo: if this happens, lock will hold for ever
panic!(
"schedule WritePrepareFailed msg failed. cid={}, err={:?}",
cid, err
);
}
}
statistics
}

fn process_write_impl<E: Engine>(
cid: u64,
cmd: Command,
Expand Down
90 changes: 49 additions & 41 deletions src/storage/txn/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,23 +245,6 @@ impl<E: Engine> Scheduler<E> {
}
}

/// Calls the callback with an error.
pub fn finish_with_err(&mut self, cid: u64, err: Error) {
debug!("command cid={}, finished with error", cid);
SCHED_STAGE_COUNTER_VEC
.with_label_values(&[self.get_ctx_tag(cid), "error"])
.inc();

let mut ctx = self.remove_ctx(cid);
let cb = ctx.callback.take().unwrap();
let pr = ProcessResult::Failed {
err: StorageError::from(err),
};
execute_callback(cb, pr);

self.release_lock(&ctx.lock, cid);
}

/// Extracts the context of a command.
fn extract_context(&self, cid: u64) -> &Context {
let ctx = &self.cmd_ctxs[&cid];
Expand All @@ -288,12 +271,20 @@ impl<E: Engine> Scheduler<E> {
.inc();
let cid = self.gen_id();
debug!("received new command, cid={}, cmd={}", cid, cmd);
let lock = gen_command_lock(&self.latches, &cmd);
let lock = self.gen_lock(&cmd);
let ctx = Task::new(cid, cmd, lock, callback);
self.insert_ctx(ctx);
self.lock_and_get_snapshot(cid);
}

/// Tries to acquire all the necessary latches. If all the necessary latches are acquired,
/// the method initiates a get snapshot operation for furthur processing.
fn lock_and_get_snapshot(&mut self, cid: u64) {
if self.acquire_lock(cid) {
self.get_snapshot(cid);
}
}

fn too_busy(&self) -> bool {
fail_point!("txn_scheduler_busy", |_| true);
self.running_write_bytes >= self.sched_pending_write_threshold
Expand All @@ -316,20 +307,6 @@ impl<E: Engine> Scheduler<E> {
self.schedule_command(cmd, callback);
}

/// Tries to acquire all the required latches for a command.
///
/// Returns true if successful; returns false otherwise.
fn acquire_lock(&mut self, cid: u64) -> bool {
let ctx = &mut self.cmd_ctxs.get_mut(&cid).unwrap();
assert_eq!(ctx.cid, cid);
let ok = self.latches.acquire(&mut ctx.lock, cid);
if ok {
ctx.latch_timer.take();
ctx.slow_timer = Some(SlowTimer::new());
}
ok
}

/// Initiates an async operation to get a snapshot from the storage engine, then posts a
/// `SnapshotFinished` message back to the event loop when it finishes.
fn get_snapshot(&mut self, cid: u64) {
Expand All @@ -355,6 +332,23 @@ impl<E: Engine> Scheduler<E> {
}
}

/// Calls the callback with an error.
pub fn finish_with_err(&mut self, cid: u64, err: Error) {
debug!("command cid={}, finished with error", cid);
SCHED_STAGE_COUNTER_VEC
.with_label_values(&[self.get_ctx_tag(cid), "error"])
.inc();

let mut ctx = self.remove_ctx(cid);
let cb = ctx.callback.take().unwrap();
let pr = ProcessResult::Failed {
err: StorageError::from(err),
};
execute_callback(cb, pr);

self.release_lock(&ctx.lock, cid);
}

/// Event handler for the success of read.
///
/// If a next command is present, continues to execute; otherwise, delivers the result to the
Expand Down Expand Up @@ -444,21 +438,35 @@ impl<E: Engine> Scheduler<E> {
self.release_lock(&ctx.lock, cid);
}

/// Generates the lock for a command.
///
/// Basically, read-only commands require no latches, write commands require latches hashed
/// by the referenced keys.
fn gen_lock(&self, cmd: &Command) -> Lock {
gen_command_lock(&self.latches, cmd)
}

/// Tries to acquire all the required latches for a command.
///
/// Returns true if successful; returns false otherwise.
fn acquire_lock(&mut self, cid: u64) -> bool {
let ctx = &mut self.cmd_ctxs.get_mut(&cid).unwrap();
assert_eq!(ctx.cid, cid);
let ok = self.latches.acquire(&mut ctx.lock, cid);
if ok {
ctx.latch_timer.take();
ctx.slow_timer = Some(SlowTimer::new());
}
ok
}

/// Releases all the latches held by a command.
fn release_lock(&mut self, lock: &Lock, cid: u64) {
let wakeup_list = self.latches.release(lock, cid);
for wcid in wakeup_list {
self.lock_and_get_snapshot(wcid);
}
}

/// Tries to acquire all the necessary latches. If all the necessary latches are acquired,
/// the method initiates a get snapshot operation for furthur processing.
fn lock_and_get_snapshot(&mut self, cid: u64) {
if self.acquire_lock(cid) {
self.get_snapshot(cid);
}
}
}

impl<E: Engine> Runnable<Msg<E>> for Scheduler<E> {
Expand Down Expand Up @@ -511,7 +519,7 @@ impl<E: Engine> Runnable<Msg<E>> for Scheduler<E> {
///
/// Basically, read-only commands require no latches, write commands require latches hashed
/// by the referenced keys.
pub fn gen_command_lock(latches: &Latches, cmd: &Command) -> Lock {
fn gen_command_lock(latches: &Latches, cmd: &Command) -> Lock {
match *cmd {
Command::Prewrite { ref mutations, .. } => {
let keys: Vec<&Key> = mutations.iter().map(|x| x.key()).collect();
Expand Down

0 comments on commit 71ae317

Please sign in to comment.