-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
store: cache region heartbeat #2701
Conversation
// After ttl, there must be exact one heartbeat. | ||
sleep(Duration::from_millis(heartbeat_interval_ms * heartbeat_ttl)); | ||
assert_eq!(pd_client.get_region_heartbeat_count(), count + 1); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need a test that heartbeat is sent when ttl is not expired and heartbeat is updated.
Can we make default region heartbeat interval shorter? (to make updating regions update faster) |
Also need to handle read statistics at https://github.com/pingcap/tikv/blob/master/src/pd/pd.rs#L511. |
d52b4b9
to
0616758
Compare
0616758
to
d2921d8
Compare
d2921d8
to
23f794c
Compare
A friendly ping. |
In tests, the mock pd-server(TestPdClient::Cluster) does not push region heartbeat responses. This PR try to simulate pushing by polling responses in the background. It's part of region heartbeat cache #2701, for fixing tests.
23f794c
to
b54cd7b
Compare
b54cd7b
to
120c296
Compare
src/pd/pd.rs
Outdated
let cache = self.heartbeat_cache | ||
.entry(region_id) | ||
.or_insert_with(RegionHeartbeatCache::default); | ||
info!("debug: before merge {:?}\n{:?}", cache, hb_task); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove these debug logs.
src/pd/pd.rs
Outdated
self.store_stat | ||
.region_bytes_written | ||
.observe(region_stat.written_bytes as f64); | ||
.observe(cache.written_bytes_delta as f64); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it mean that the previous statistics are wrong?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are both right, the written_bytes
is a delta. See line 555 https://github.com/pingcap/tikv/pull/2701/files#diff-a2b393fbe5f436bb88e05e100af8c1b4L555
src/pd/pd.rs
Outdated
self.store_stat | ||
.region_keys_read | ||
.observe(region_stat.read_keys as f64); | ||
.observe(cache.read_keys_delta as f64); | ||
|
||
// Now we use put region protocol for heartbeat. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wha is "put region protocol"?
PTAL |
src/pd/pd.rs
Outdated
fn handle_reconnect(&mut self) { | ||
if self.is_handle_reconnect_scheduled { | ||
if self.invalidate_cache.swap(false, Ordering::Relaxed) { | ||
info!("invalidate region heartbeat cache"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what situation will the runner be re-run?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And we need to reset is_handle_reconnect_scheduled
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the client connects to a new PD leader.
src/pd/pd.rs
Outdated
self.last_written_bytes = total_written_bytes; | ||
self.valid = false; | ||
} | ||
// TODO: what if last_written_bytes > total_written_bytes? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why could it happend?
A friendly ping. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
A friendly ping. |
src/pd/pd.rs
Outdated
@@ -486,20 +591,36 @@ impl<T: PdClient> Runner<T> { | |||
self.is_hb_receiver_scheduled = true; | |||
} | |||
|
|||
fn handle_reconnect(&mut self) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function does two things:
- If
is_handle_reconnect_scheduled
is false, it means that the reconnect handler in pd client hasn't been initialized, we should set it. - If
is_handle_reconnect_scheduled
is true, it means that we have initialized the reconnect handler, then we should check if a reconnect has happened, and invalidate the cache if it did.
Am I right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I prefer that keep things together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it is very confusing, it takes me a while to figure this.
I think you can just initialize the reconnect handler somewhere, and just handle reconnect here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need comments here.
src/pd/pd.rs
Outdated
self.store_stat | ||
.region_bytes_written | ||
.observe(region_stat.written_bytes as f64); | ||
.observe(cache.written_bytes_delta as f64); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if written_bytes_delta
is not changed between multi handle_heartbeat
?
@@ -957,8 +957,8 @@ impl Peer { | |||
} | |||
self.mut_store().apply_state = res.apply_state.clone(); | |||
self.mut_store().applied_index_term = res.applied_index_term; | |||
self.peer_stat.written_keys += res.metrics.written_keys; | |||
self.peer_stat.written_bytes += res.metrics.written_bytes; | |||
self.peer_stat.total_written_keys += res.metrics.written_keys; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to update peer_stat
for followers and learners? And, I think we also need to clear peer_stat
in on_role_changed
is we do so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we change peer_stat
's behavior, it may incompatible with PD and it becomes a breaking change.
rest LGTM. |
@@ -486,20 +595,29 @@ impl<T: PdClient> Runner<T> { | |||
self.is_hb_receiver_scheduled = true; | |||
} | |||
|
|||
fn maybe_handle_reconnect(&mut self) { | |||
if self.invalidate_cache.swap(false, Ordering::Relaxed) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer Ordering::SeqCst
to save other's brain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Relaxed
means that there is no memory ordering problem, so I think Relaxed
is more clear.
🙄 The build failed because of the Rust version upgrade having a weird Rustfmt incompatability. Please trigger a rebuild without the cache @overvenus . |
@Hoverbear Never mind, we are not going to merge this PR since PD works smoothly even if there are many region heartbeats. |
@overvenus So should we close this then? Or is it just delayed for now? |
Let's close it. |
In tests, the mock pd-server(TestPdClient::Cluster) does not push region heartbeat responses. This PR try to simulate pushing by polling responses in the background. It's part of region heartbeat cache tikv#2701, for fixing tests.
Caching region heartbeats if the current heartbeat is same as the previous one, it should reduce a large number of unnecessary heartbeats.