New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add free() for raft node #423
Conversation
how to add test? or not |
src/tracker/inflights.rs
Outdated
/// free buffer memory | ||
#[inline] | ||
pub fn shrink(&mut self) { | ||
self.buffer.shrink_to_fit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After shrink the capacity will change, the size for determine full() will be effected, is it fine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's wrong. It should be able to reallocate when new messages need to be sent.
src/raft.rs
Outdated
@@ -2678,4 +2678,27 @@ impl<T: Storage> Raft<T> { | |||
pub fn uncommitted_size(&self) -> usize { | |||
self.uncommitted_state.uncommitted_size | |||
} | |||
|
|||
/// Free resources for all the raft node | |||
pub fn free_resources_for_all(&mut self) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just this method is enough. Choosing target id is too verbose.
Tests should be added to check if flow control still work as expected before and after calling shrink. |
src/tracker/inflights.rs
Outdated
pub fn free_mem(&mut self) { | ||
let mut v = Vec::with_capacity(self.count); | ||
for i in 0..self.count { | ||
v.push(self.buffer[self.start + i]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get it. v
doesn't seem to be used, and why append the element from buffer to v
? On the other hand, self.buffer
should be dropped to free memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am trying to re-create an buffer
with length
& capacity
equal to count
, in order to decrease the allocated memory,
and remain only from start
to start + count - 1
On the other hand,
buffer
should be dropped to free memory
If count
is zero, buffer
will be Vec
with capacity 0, I am not sure what is the implementation inside, but I guess there is no memory allocated?
If count
is positive, we should at least remain count
sized Vec (?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shrink_to_fit
that mentioned before, I think it will do nothing if the Vec
is full once
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the issue left related to the memory problem? Is #10156?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is 10156?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, is this tikv/tikv#10156, found it under tikv/rfcs#59
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/cc @hicqu
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The left issue is unchanged for raft-rs. Memory usage of Inflight
is too large if there are lots of raft peers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BusyJay do you mean free_mem
should only free empty inflights, but keep no-empty inflights unchanged?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have discussed with Jay. He thinks the current implemenatation is OK. BTW how about using vec.extend_from_slice
instead?
How about lazily allocate memory instead of adding |
How about this solution:
Or this:
Any suggestions @BusyJay |
src/tracker/inflights.rs
Outdated
|
||
/// reallocate buffer memory | ||
#[inline] | ||
pub fn reallocate(&mut self) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Progress should reallocate when necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, buffer will be reallocated when you call this function, what is other situation when we need reallocation?
@jayzhan211 we can try this solution:
@BusyJay please help to confirm whether it fits with your design or not. |
@jayzhan211 any updates? |
Does it fit @BusyJay 's design |
LGTM, thanks! |
I have difficult for adding test (not famailiar with APIs). |
let len = self.buffer.len(); | ||
for _ in self.start + self.count..len { | ||
self.buffer.pop(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We initialize the buffer with fixed capacity, so we need to remove element for shrink to fit
to reduce allocated memory, any better ways?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about don't touch it if it's not empty? So the resource can be freed later, and no allocations are introduced.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about don't touch it if it's not empty?
I am not sure how frequently the buffer is empty. If it happens a lot, this approach is good enough.
If most of the time the buffer is non-empty but small, then memory might be wasted (0..start - 1).
// .expect(""); | ||
// r.read_messages(); | ||
// } | ||
// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After this line, inflights 2 (follower) is filled with 10 element, inflights 1 (leader) is empty, so it seems that we need to transfer leader from 2 to 1, then transfer 1 to 2 again, so peer 2 will call free_mem
?
Signed-off-by: qupeng <qupeng@pingcap.com>
@jayzhan211 I have send a patch PR to your code. Could you take a look? |
Signed-off-by: qupeng <qupeng@pingcap.com>
r.step(resp).unwrap(); | ||
|
||
// Test `maybe_free_inflight_buffers` works as expected. | ||
r.maybe_free_inflight_buffers(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems the case can pass even the function does nothing.
src/tracker/inflights.rs
Outdated
@@ -35,6 +38,7 @@ impl Clone for Inflights { | |||
start: self.start, | |||
count: self.count, | |||
buffer, | |||
cap: self.cap, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
L36 can be also removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
buffer
must have it's capacity set correctly on clone, normally it does not.
If it is not true, I think we can use derive(Clone) for inflight
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that capacity is maintained explicitly, so the statement is not valid anymore. derive(Clone)
is a good idea!.
|
||
/* | ||
1: cap=0/start=0/count=0/buffer=[] | ||
2: cap=256/start=0/count=1/buffer=[2] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
count should be 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure why MsgPropose
sends 2
to others, but it did.
resp.index = r.raft_log.last_index(); | ||
r.step(resp).unwrap(); | ||
|
||
for (&id, pr) in r.prs().iter() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The loop is not necessary if only tests id == 2
.
I will use buffer.capacity, not cap(), because I found that we initialize buffer with |
@hicqu PTAL |
@jayzhan211 seems some commits break the DCO check. Could you rebase and squash the PR like this: git checkout free -b backup-free
git branch -D free
git checkout master -b free
git merge backup-free --no-commits --no-logs --squash
git commit -s # then cleanup squashed commit logs |
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hicqu you may need a method to monitor the memory correctly.
|
||
/// Number of inflight messages. | ||
#[inline] | ||
pub fn count(&self) -> usize { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about making it doc hiddern as it's only used in tests?
@BusyJay Do you mean we need |
Add a function that return |
Yes, something like that. |
the buffer size is |
@jayzhan211 we need to add a new method for +
+ /// Get the inflight buffer size.
+ pub fn inflight_buffers_size(&self) -> usize {
+ let mut total_size = 0;
+ for (_, pr) in self.prs().iter() {
+ total_size += pr.ins.buffer.capacity();
+ }
+ total_size
+ } to get the allocated buffer size for a raft peer. |
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
Close #419 and #448 .
Add two API
Signed-off-by: jayzhan jayzhan211@gmail.com