New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
tikv_util: introduce future channel #13407
Conversation
It's a super set of batch channel. It can be used as a batch channel or just a future channel. Signed-off-by: Jay Lee <BusyJayLee@gmail.com>
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
/release |
return Poll::Ready(Some(t)); | ||
} | ||
queue.waker.register(cx.waker()); | ||
if let Some(t) = queue.queue.pop() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comment for why try pop twice
if let Some(t) = queue.queue.pop() { | ||
return Poll::Ready(Some(t)); | ||
} | ||
queue.waker.register(cx.waker()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the below pop consume the element, then the next wake would be unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get it. What do you mean "consume the element"? User is expected to keep calling poll_next
until None
is returned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, should it be
if let Some(t) = queue.queue.pop() {
return Poll::Ready(Some(t));
}
if let Some(t) = queue.queue.pop() {
return Poll::Ready(Some(t));
}
queue.waker.register(cx.waker());
``
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's for edge cases only, which should hardly happen. If that happens, clear the waker may cause unnecessary contention.
Signed-off-by: Jay Lee <BusyJayLee@gmail.com>
Benched with sysbench read_write, insert, update index, point select, the differences in througput and latency are within 3%. PTAL |
let elem = ctx.elem.take(); | ||
if let Some(m) = received { | ||
let collection = ctx.elem.get_or_insert_with(&ctx.initializer); | ||
let _received = ctx.collector.collect(collection, m); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why collect again
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise received
will be dropped and message is lost.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic is simplified, now it won't collect again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rest LGTM
} | ||
} | ||
|
||
unsafe impl<T: Send> Send for Sender<T> {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about also Sync
? Sometimes non-Sync
types may cause difficulties when Sync
bounds are required.
break false; | ||
} | ||
count += 1; | ||
if count >= ctx.max_batch_size { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If max_batch_size
belongs to BatchReceiver
, in which case do we expect BatchCollector::collect
returns Some
and split?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
max_batch_size
is for the number of message, BatchCollector
can add other limits like data size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The limit is for raft client in the past. Now raft client uses its own batch algorithm, so the API is unnecessary. I remove it for simplicity.
Signed-off-by: Jay Lee <BusyJayLee@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM anyway
/// `initializer` is used to generate a initial value, and `collector` | ||
/// will collect every (at most `max_batch_size`) raw items into the | ||
/// batched value. | ||
pub fn new(rx: Receiver<T>, max_batch_size: usize, initializer: I, collector: C) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the use case for the collector? Isn't vec
adequate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check
Lines 2357 to 2369 in 8e311cd
struct BatchRespCollector; | |
impl BatchCollector<MeasuredBatchResponse, MeasuredSingleResponse> for BatchRespCollector { | |
fn collect( | |
&mut self, | |
v: &mut MeasuredBatchResponse, | |
mut e: MeasuredSingleResponse, | |
) -> Option<MeasuredSingleResponse> { | |
v.batch_resp.mut_request_ids().push(e.id); | |
v.batch_resp.mut_responses().push(e.resp.consume()); | |
v.measures.push(e.measure); | |
None | |
} | |
} |
/merge |
@BusyJay: It seems you want to merge this PR, I will help you trigger all the tests: /run-all-tests You only need to trigger If you have any questions about the PR merge process, please refer to pr process. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
This pull request has been accepted and is ready to merge. Commit hash: 74d4d82
|
@BusyJay: Your PR was out of date, I have automatically updated it for you. At the same time I will also trigger all tests for you: /run-all-tests If the CI test fails, you just re-trigger the test that failed and the bot will merge the PR for you after the CI passes. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
This reverts commit 7382d4e. Signed-off-by: tabokie <xy.tao@outlook.com>
This reverts commit 7382d4e.
ref tikv#13394, ref tikv#13407 Signed-off-by: tabokie <xy.tao@outlook.com>
ref tikv#13394, ref tikv#13407 Signed-off-by: tabokie <xy.tao@outlook.com>
What is changed and how it works?
Issue Number: Ref #12842
What's Changed:
I bench it with the old implementation, the result is highly unstable as if the receiver goes into sleep state, it will take more time to be woken up. Anyway, I post the result as below. There is no big difference between implementations.
Check List
Tests
Release note