New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
storage: reduce scheduler msgs #3583
Conversation
d844298
to
639701c
Compare
71ae317
to
4a657df
Compare
89d9178
to
77886c5
Compare
4a657df
to
e7870a9
Compare
f7b2d38
to
49cf999
Compare
src/storage/txn/scheduler.rs
Outdated
Msg::WriteFinished { cid, .. } => write!(f, "WriteFinished [cid={}]", cid), | ||
Msg::ReadFinished { ref task, .. } => write!(f, "ReadFinished [cid={}]", task.cid), | ||
Msg::WriteFinished { ref task, .. } => write!(f, "WriteFinished [cid={}]", task.cid), | ||
Msg::FinishedWithErr { cid, .. } => write!(f, "WriteFinished [cid={}]", cid), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrong format message
src/storage/txn/process.rs
Outdated
false | ||
} | ||
Err(e) => { | ||
panic!("schedule WriteFinished msg failed, err:{:?}", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrong message
src/storage/txn/process.rs
Outdated
@@ -619,6 +684,31 @@ impl ThreadContext for SchedContext { | |||
} | |||
} | |||
|
|||
pub struct Channels<E: Engine> { | |||
scheduler: Option<worker::Scheduler<Msg>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you should add some comments
src/storage/txn/process.rs
Outdated
result, | ||
}, | ||
); | ||
if success { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems that it can be combined into previous one
src/storage/txn/process.rs
Outdated
@@ -619,6 +684,31 @@ impl ThreadContext for SchedContext { | |||
} | |||
} | |||
|
|||
pub struct Channels<E: Engine> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe Executor
is a better name?
src/storage/txn/process.rs
Outdated
|
||
Ok(()) | ||
fn handle_schedule(scheduler: worker::Scheduler<Msg>, msg: Msg) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe send_msg_to_scheduler is a better name. Does this function can be a member of Channels?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or notify_scheduler?
src/storage/txn/process.rs
Outdated
|
||
impl<E: Engine> Scheduler<E> { | ||
// Start the task. | ||
pub fn start(&mut self) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/start/pre_start
/run-all-tests |
src/storage/txn/process.rs
Outdated
let tag = cmd.tag(); | ||
let pool = channels.take_pool(); | ||
if readcmd { | ||
let pool = executor.take_pool(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is more better to move following codes into Executor::exec, so the Executor is more like a executor.
src/storage/txn/process.rs
Outdated
} | ||
} | ||
} | ||
|
||
/// Delivers a command to a worker thread for processing. | ||
pub fn process_by_worker(&mut self, cid: u64, cb_ctx: CbContext, snapshot: E::Snap) { | ||
fn process_by_worker<E: Engine>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe process_by_executor is more better.
src/storage/txn/process.rs
Outdated
let scheduler = self.scheduler.clone(); | ||
if readcmd { | ||
worker_pool.execute(move |ctx: &mut SchedContext| { | ||
let pool = executor.take_pool(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to wrap following codes into Executor.
src/storage/txn/scheduler.rs
Outdated
let task = Task::new(cid, cmd); | ||
|
||
// TODO: enqueue_task should return an reference of the entity. | ||
self.enqueue_task(task, entity); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is weird that enqueue_task
has a entity
arg.
src/storage/txn/scheduler.rs
Outdated
} | ||
|
||
/// Scheduler which schedules the execution of `storage::Command`s. | ||
pub struct Scheduler<E: Engine> { | ||
engine: E, | ||
|
||
// cid -> Task | ||
pub cmd_ctxs: HashMap<u64, Task>, | ||
pending_task: HashMap<u64, Task>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pending_task -> pending_tasks
src/storage/txn/scheduler.rs
Outdated
if ok { | ||
ctx.latch_timer.take(); | ||
ctx.slow_timer = Some(SlowTimer::new()); | ||
entity.on_schedule(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why call on_schedule
in acquire_lock
instead of schedule
?
src/storage/txn/scheduler.rs
Outdated
} else { | ||
execute_callback(cb, pr); | ||
execute_callback(entity.cb, pr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not call the callback inside thread pool?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How?
src/storage/txn/scheduler.rs
Outdated
); | ||
} | ||
// TODO: Moves SchedEntity to Task once we adopt futures in the Engine trait. | ||
struct SchedEntity { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Entity
doesn't seem to be a good name, it makes more sense that task itself is an entity as it contains the actual task data. How about naming it as TaskTracker
just like RequestTracker
in endpoint.rs?
src/storage/txn/scheduler.rs
Outdated
} | ||
|
||
fn dequeue_schedule_entity(&mut self, cid: u64) -> SchedEntity { | ||
let entity = self.schedule_entities.remove(&cid).unwrap(); | ||
fn find_task_context(&mut self, cid: u64) -> TaskContext { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/find_task_context/take_task_context
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
e7870a9
to
f9eb37f
Compare
Signed-off-by: Neil Shen <overvenus@gmail.com>
8d3235d
203ea3f
to
8d3235d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
/run-all-tests |
/run-all-tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Signed-off-by: Neil Shen <overvenus@gmail.com>
What have you changed? (mandatory)
This PR reduces scheduler msgs, also offloads some functions from scheduler to workers.
What are the type of the changes? (mandatory)
How has this PR been tested? (mandatory)
Unit tests and integration tests.
Does this PR affect documentation (docs/docs-cn) update? (mandatory)
No.
Does this PR affect tidb-ansible update? (mandatory)
No.
Refer to a related PR or issue link (optional)
#3581 , #3551 and #3582
Benchmark result if necessary (optional)
Combining these changes #3581, #3551 and #3582, QPS is improved from 40672.1 to 49144.5, and avg latnecy is reduced from 100.6ms to 83.5ms.