Skip to content

Commit

Permalink
Merge branch 'master' into raft-engine-metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
tabokie committed Nov 22, 2021
2 parents 6db61ce + b6a118c commit bfe4ce4
Show file tree
Hide file tree
Showing 33 changed files with 692 additions and 188 deletions.
8 changes: 4 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ COPY Cargo.lock ./Cargo.lock

COPY --from=prepare /output/ ./

RUN mkdir -p ./cmd/src/bin && \
echo 'fn main() {}' > ./cmd/src/bin/tikv-ctl.rs && \
echo 'fn main() {}' > ./cmd/src/bin/tikv-server.rs && \
RUN mkdir -p ./cmd/tikv-ctl/src ./cmd/tikv-server/src && \
echo 'fn main() {}' > ./cmd/tikv-ctl/src/main.rs && \
echo 'fn main() {}' > ./cmd/tikv-server/src/main.rs && \
for cargotoml in $(find . -type f -name "Cargo.toml"); do \
sed -i '/fuzz/d' ${cargotoml} && \
sed -i '/profiler/d' ${cargotoml} ; \
Expand Down Expand Up @@ -107,4 +107,4 @@ COPY --from=builder /tikv/target/release/tikv-ctl /tikv-ctl

EXPOSE 20160 20180

ENTRYPOINT ["/tikv-server"]
ENTRYPOINT ["/tikv-server"]
184 changes: 139 additions & 45 deletions components/batch-system/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::router::Router;
use crossbeam::channel::{self, after, SendError};
use file_system::{set_io_type, IOType};
use std::borrow::Cow;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::thread::{self, JoinHandle};
Expand Down Expand Up @@ -82,11 +83,43 @@ macro_rules! impl_sched {
impl_sched!(NormalScheduler, FsmTypes::Normal, Fsm = N);
impl_sched!(ControlScheduler, FsmTypes::Control, Fsm = C);

pub struct NormalFsm<N> {
fsm: Box<N>,
timer: Instant,
policy: Option<ReschedulePolicy>,
}

impl<N> NormalFsm<N> {
#[inline]
fn new(fsm: Box<N>) -> NormalFsm<N> {
NormalFsm {
fsm,
timer: Instant::now_coarse(),
policy: None,
}
}
}

impl<N> Deref for NormalFsm<N> {
type Target = N;

#[inline]
fn deref(&self) -> &N {
&self.fsm
}
}

impl<N> DerefMut for NormalFsm<N> {
#[inline]
fn deref_mut(&mut self) -> &mut N {
&mut self.fsm
}
}

/// A basic struct for a round of polling.
#[allow(clippy::vec_box)]
pub struct Batch<N, C> {
normals: Vec<Box<N>>,
timers: Vec<Instant>,
normals: Vec<Option<NormalFsm<N>>>,
control: Option<Box<C>>,
}

Expand All @@ -95,16 +128,14 @@ impl<N: Fsm, C: Fsm> Batch<N, C> {
pub fn with_capacity(cap: usize) -> Batch<N, C> {
Batch {
normals: Vec::with_capacity(cap),
timers: Vec::with_capacity(cap),
control: None,
}
}

fn push(&mut self, fsm: FsmTypes<N, C>) -> bool {
match fsm {
FsmTypes::Normal(n) => {
self.normals.push(n);
self.timers.push(Instant::now_coarse());
self.normals.push(Some(NormalFsm::new(n)));
}
FsmTypes::Control(c) => {
assert!(self.control.is_none());
Expand All @@ -121,7 +152,6 @@ impl<N: Fsm, C: Fsm> Batch<N, C> {

fn clear(&mut self) {
self.normals.clear();
self.timers.clear();
self.control.take();
}

Expand All @@ -130,20 +160,19 @@ impl<N: Fsm, C: Fsm> Batch<N, C> {
/// Only when channel length is larger than `checked_len` will trigger
/// further notification. This function may fail if channel length is
/// larger than the given value before FSM is released.
pub fn release(&mut self, index: usize, checked_len: usize) {
let mut fsm = self.normals.swap_remove(index);
fn release(&mut self, mut fsm: NormalFsm<N>, checked_len: usize) -> Option<NormalFsm<N>> {
let mailbox = fsm.take_mailbox().unwrap();
mailbox.release(fsm);
mailbox.release(fsm.fsm);
if mailbox.len() == checked_len {
self.timers.swap_remove(index);
None
} else {
match mailbox.take_fsm() {
None => (),
// It's rescheduled by other thread.
None => None,
Some(mut s) => {
s.set_mailbox(Cow::Owned(mailbox));
let last_index = self.normals.len();
self.normals.push(s);
self.normals.swap(index, last_index);
fsm.fsm = s;
Some(fsm)
}
}
}
Expand All @@ -154,25 +183,49 @@ impl<N: Fsm, C: Fsm> Batch<N, C> {
/// This method should only be called when the FSM is stopped.
/// If there are still messages in channel, the FSM is untouched and
/// the function will return false to let caller to keep polling.
pub fn remove(&mut self, index: usize) {
let mut fsm = self.normals.swap_remove(index);
fn remove(&mut self, mut fsm: NormalFsm<N>) -> Option<NormalFsm<N>> {
let mailbox = fsm.take_mailbox().unwrap();
if mailbox.is_empty() {
mailbox.release(fsm);
self.timers.swap_remove(index);
// It will be removed only when it's already closed, so no new messages can
// be scheduled, hence don't need to consider rescheduling.
mailbox.release(fsm.fsm);
None
} else {
fsm.set_mailbox(Cow::Owned(mailbox));
let last_index = self.normals.len();
self.normals.push(fsm);
self.normals.swap(index, last_index);
Some(fsm)
}
}

/// Schedule the normal FSM located at `index`.
pub fn reschedule(&mut self, router: &BatchRouter<N, C>, index: usize) {
let fsm = self.normals.swap_remove(index);
self.timers.swap_remove(index);
router.normal_scheduler.schedule(fsm);
///
/// If `inplace`, the relative position of all fsm will not be changed; otherwise, the fsm
/// will be popped and the last fsm will be swap in to reduce memory copy.
pub fn schedule(&mut self, router: &BatchRouter<N, C>, index: usize, inplace: bool) {
let to_schedule = match self.normals[index].take() {
Some(f) => f,
None => {
if !inplace {
self.normals.swap_remove(index);
}
return;
}
};
let mut res = match to_schedule.policy {
Some(ReschedulePolicy::Release(l)) => self.release(to_schedule, l),
Some(ReschedulePolicy::Remove) => self.remove(to_schedule),
Some(ReschedulePolicy::Schedule) => {
router.normal_scheduler.schedule(to_schedule.fsm);
None
}
None => Some(to_schedule),
};
if let Some(f) = &mut res {
// failed to reschedule
f.policy.take();
self.normals[index] = res;
} else if !inplace {
self.normals.swap_remove(index);
}
}

/// Same as `release`, but working on control FSM.
Expand Down Expand Up @@ -201,6 +254,27 @@ impl<N: Fsm, C: Fsm> Batch<N, C> {
}
}

/// The result for `PollHandler::handle_control`.
pub enum HandleResult {
/// The Fsm still needs to be processed.
KeepProcessing,
/// The Fsm should stop at the progress.
StopAt {
/// The count of messages that have been acknowleged by handler. The fsm should be
/// released until new messages arrive.
progress: usize,
/// Whether the fsm should be released before `end`.
skip_end: bool,
},
}

impl HandleResult {
#[inline]
pub fn stop_at(progress: usize, skip_end: bool) -> HandleResult {
HandleResult::StopAt { progress, skip_end }
}
}

/// A handler that poll all FSM in ready.
///
/// A General process works like following:
Expand Down Expand Up @@ -233,10 +307,14 @@ pub trait PollHandler<N, C> {
/// This function is called when handling readiness for normal FSM.
///
/// The returned value is handled in the same way as `handle_control`.
fn handle_normal(&mut self, normal: &mut N) -> Option<usize>;
fn handle_normal(&mut self, normal: &mut impl DerefMut<Target = N>) -> HandleResult;

/// This function is called after `handle_normal` is called for all fsm and before calling
/// `end`. The function is expected to run lightweight work.
fn light_end(&mut self, _batch: &mut [Option<impl DerefMut<Target = N>>]) {}

/// This function is called at the end of every round.
fn end(&mut self, batch: &mut [Box<N>]);
fn end(&mut self, batch: &mut [Option<impl DerefMut<Target = N>>]);

/// This function is called when batch system is going to sleep.
fn pause(&mut self) {}
Expand Down Expand Up @@ -302,6 +380,7 @@ impl<N: Fsm, C: Fsm, Handler: PollHandler<N, C>> Poller<N, C, Handler> {
fn poll(&mut self) {
let mut batch = Batch::with_capacity(self.max_batch_size);
let mut reschedule_fsms = Vec::with_capacity(self.max_batch_size);
let mut to_skip_end = Vec::with_capacity(self.max_batch_size);

// Fetch batch after every round is finished. It's helpful to protect regions
// from becoming hungry if some regions are hot points. Since we fetch new fsm every time
Expand All @@ -325,24 +404,32 @@ impl<N: Fsm, C: Fsm, Handler: PollHandler<N, C>> Poller<N, C, Handler> {

let mut hot_fsm_count = 0;
for (i, p) in batch.normals.iter_mut().enumerate() {
let len = self.handler.handle_normal(p);
let p = p.as_mut().unwrap();
let res = self.handler.handle_normal(p);
if p.is_stopped() {
reschedule_fsms.push((i, ReschedulePolicy::Remove));
p.policy = Some(ReschedulePolicy::Remove);
reschedule_fsms.push(i);
} else if p.get_priority() != self.handler.get_priority() {
reschedule_fsms.push((i, ReschedulePolicy::Schedule));
p.policy = Some(ReschedulePolicy::Schedule);
reschedule_fsms.push(i);
} else {
if batch.timers[i].saturating_elapsed() >= self.reschedule_duration {
if p.timer.saturating_elapsed() >= self.reschedule_duration {
hot_fsm_count += 1;
// We should only reschedule a half of the hot regions, otherwise,
// it's possible all the hot regions are fetched in a batch the
// next time.
if hot_fsm_count % 2 == 0 {
reschedule_fsms.push((i, ReschedulePolicy::Schedule));
p.policy = Some(ReschedulePolicy::Schedule);
reschedule_fsms.push(i);
continue;
}
}
if let Some(l) = len {
reschedule_fsms.push((i, ReschedulePolicy::Release(l)));
if let HandleResult::StopAt { progress, skip_end } = res {
p.policy = Some(ReschedulePolicy::Release(progress));
reschedule_fsms.push(i);
if skip_end {
to_skip_end.push(i);
}
}
}
}
Expand All @@ -357,24 +444,31 @@ impl<N: Fsm, C: Fsm, Handler: PollHandler<N, C>> Poller<N, C, Handler> {
if !run || fsm_cnt >= batch.normals.len() {
break;
}
let len = self.handler.handle_normal(&mut batch.normals[fsm_cnt]);
if batch.normals[fsm_cnt].is_stopped() {
reschedule_fsms.push((fsm_cnt, ReschedulePolicy::Remove));
} else if let Some(l) = len {
reschedule_fsms.push((fsm_cnt, ReschedulePolicy::Release(l)));
let p = batch.normals[fsm_cnt].as_mut().unwrap();
let res = self.handler.handle_normal(p);
if p.is_stopped() {
p.policy = Some(ReschedulePolicy::Remove);
reschedule_fsms.push(fsm_cnt);
} else if let HandleResult::StopAt { progress, skip_end } = res {
p.policy = Some(ReschedulePolicy::Release(progress));
reschedule_fsms.push(fsm_cnt);
if skip_end {
to_skip_end.push(fsm_cnt);
}
}
fsm_cnt += 1;
}
self.handler.light_end(&mut batch.normals);
for offset in &to_skip_end {
batch.schedule(&self.router, *offset, true);
}
to_skip_end.clear();
self.handler.end(&mut batch.normals);

// Because release use `swap_remove` internally, so using pop here
// to remove the correct FSM.
while let Some((r, mark)) = reschedule_fsms.pop() {
match mark {
ReschedulePolicy::Release(l) => batch.release(r, l),
ReschedulePolicy::Remove => batch.remove(r),
ReschedulePolicy::Schedule => batch.reschedule(&self.router, r),
}
while let Some(r) = reschedule_fsms.pop() {
batch.schedule(&self.router, r, false);
}
}
batch.clear();
Expand Down
4 changes: 3 additions & 1 deletion components/batch-system/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ mod router;
#[cfg(feature = "test-runner")]
pub mod test_runner;

pub use self::batch::{create_system, BatchRouter, BatchSystem, HandlerBuilder, PollHandler};
pub use self::batch::{
create_system, BatchRouter, BatchSystem, HandleResult, HandlerBuilder, PollHandler,
};
pub use self::config::Config;
pub use self::fsm::{Fsm, Priority};
pub use self::mailbox::{BasicMailbox, Mailbox};
Expand Down
14 changes: 8 additions & 6 deletions components/batch-system/src/test_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use crate::*;
use derive_more::{Add, AddAssign};
use std::borrow::Cow;
use std::ops::DerefMut;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use tikv_util::mpsc;
Expand Down Expand Up @@ -84,7 +85,7 @@ pub struct Handler {
}

impl Handler {
fn handle(&mut self, r: &mut Runner) -> Option<usize> {
fn handle(&mut self, r: &mut Runner) {
for _ in 0..16 {
match r.recv.try_recv() {
Ok(Message::Loop(count)) => {
Expand All @@ -98,7 +99,6 @@ impl Handler {
Err(_) => break,
}
}
Some(0)
}

pub fn get_priority(&self) -> Priority {
Expand All @@ -113,15 +113,17 @@ impl PollHandler<Runner, Runner> for Handler {

fn handle_control(&mut self, control: &mut Runner) -> Option<usize> {
self.local.control += 1;
self.handle(control)
self.handle(control);
Some(0)
}

fn handle_normal(&mut self, normal: &mut Runner) -> Option<usize> {
fn handle_normal(&mut self, normal: &mut impl DerefMut<Target = Runner>) -> HandleResult {
self.local.normal += 1;
self.handle(normal)
self.handle(normal);
HandleResult::stop_at(0, false)
}

fn end(&mut self, _normals: &mut [Box<Runner>]) {
fn end(&mut self, _normals: &mut [Option<impl DerefMut<Target = Runner>>]) {
let mut c = self.metrics.lock().unwrap();
*c += self.local;
self.local = HandleMetrics::default();
Expand Down
1 change: 1 addition & 0 deletions components/cdc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,5 @@ required-features = ["failpoints"]

[[bench]]
name = "cdc_event"
path = "benches/cdc_event.rs"
harness = false
File renamed without changes.
2 changes: 2 additions & 0 deletions components/engine_panic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,7 @@ pub mod perf_context;
pub use crate::perf_context::*;
pub mod flow_control_factors;
pub use crate::flow_control_factors::*;
pub mod table_properties;
pub use crate::table_properties::*;

mod raft_engine;
Loading

0 comments on commit bfe4ce4

Please sign in to comment.