-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
cdc: check leadership before subscribing #6691
Conversation
Signed-off-by: 5kbpers <tangminghua@pingcap.com>
Signed-off-by: 5kbpers <tangminghua@pingcap.com>
components/cdc/src/endpoint.rs
Outdated
@@ -101,6 +101,7 @@ impl fmt::Debug for Task { | |||
} | |||
|
|||
pub struct Endpoint { | |||
// TODO: add RaftRouter here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add it now?
Signed-off-by: 5kbpers <tangminghua@pingcap.com>
Signed-off-by: 5kbpers <tangminghua@pingcap.com>
Signed-off-by: 5kbpers <tangminghua@pingcap.com>
Signed-off-by: 5kbpers <tangminghua@pingcap.com>
Signed-off-by: 5kbpers <tangminghua@pingcap.com>
I will submit another PR for reorganizing the tests of cdc after this one is landed. |
Signed-off-by: 5kbpers <tangminghua@pingcap.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM
components/cdc/src/endpoint.rs
Outdated
capture_regions: HashMap<u64, Delegate>, | ||
scheduler: Scheduler<Task>, | ||
apply_router: ApplyRouter, | ||
casual_router: T, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you name it to raft_router? casual_router sounds confusing.
components/cdc/src/endpoint.rs
Outdated
@@ -257,6 +269,11 @@ impl Endpoint { | |||
fn on_region_ready(&mut self, region_id: u64, resolver: Resolver, region: Region) { | |||
if let Some(delegate) = self.capture_regions.get_mut(®ion_id) { | |||
delegate.on_region_ready(resolver, region); | |||
println!("region ready: has_failed({})", delegate.has_failed()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove this line.
components/cdc/src/endpoint.rs
Outdated
cb: Callback::Read(Box::new(move |resp: ReadResponse<_>| { | ||
} | ||
}; | ||
if let Err(e) = self.casual_router.send( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you log the error?
Signed-off-by: 5kbpers <tangminghua@pingcap.com>
Signed-off-by: 5kbpers <tangminghua@pingcap.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM
} | ||
self.ctx | ||
.apply_router | ||
.schedule_task(self.region_id(), ApplyTask::Change { cmd, cb }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that the leader changes after this but before the cmd is executed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but role change events will be caught by the cdc observer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
/run-all-tests |
Signed-off-by: 5kbpers <tangminghua@pingcap.com>
Signed-off-by: 5kbpers <tangminghua@pingcap.com>
Signed-off-by: 5kbpers tangminghua@pingcap.com
What have you changed?
Add
CasualMessage::CaptureChange
for checking leadership before starting to subscribe changes of a region.What is the type of the changes?
How is the PR tested?
Does this PR affect documentation (docs) or should it be mentioned in the release notes?
No.
Does this PR affect
tidb-ansible
?No.