Skip to content

Commit

Permalink
Merge pull request tikv#18 from MyonKeminta/m/cdc-fix-late-commit-panic
Browse files Browse the repository at this point in the history
cdc: Fix potential panic on late arriving commit
  • Loading branch information
overvenus committed Dec 23, 2019
2 parents 55897ad + e8b5ef7 commit fc06dcc
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 4 deletions.
12 changes: 12 additions & 0 deletions components/cdc/tests/integrations/test_cdc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,18 @@ fn test_cdc_basic() {

// Commit
let commit_ts = suite.cluster.pd_client.get_tso().wait().unwrap();
let mut counter = 0;
loop {
let event = receive_event(true);
// Even if there is no write,
// resolved ts should be advanced regularly.
if let Event_oneof_event::ResolvedTs(_) = event {
counter += 1;
if counter > 5 {
break;
}
}
}
suite.must_kv_commit(vec![k.clone().into_bytes()], start_ts, commit_ts);
let event = receive_event(false);
match event {
Expand Down
17 changes: 13 additions & 4 deletions components/resolved_ts/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,20 @@ impl Resolver {
/// 1. later commit_ts must be great than the min_ts.
pub fn resolve(&mut self, min_ts: TimeStamp) -> Option<TimeStamp> {
self.resolved_ts?;
let min_start_ts = *self.locks.keys().next().unwrap_or(&min_ts);
let min_lock = self.locks.keys().next().cloned();
let has_lock = min_lock.is_some();
let min_start_ts = min_lock.unwrap_or(min_ts);
let new_resolved_ts = cmp::min(min_start_ts, min_ts);
if let Some(old_resolved_ts) = self.resolved_ts {
self.resolved_ts = Some(cmp::max(old_resolved_ts, new_resolved_ts));
} else {
self.resolved_ts = Some(new_resolved_ts);
}
if let Some(old_min_ts) = self.min_ts {
self.min_ts = Some(cmp::max(old_min_ts, min_ts));
let old_min_ts = self.min_ts.unwrap_or_else(TimeStamp::zero);
if has_lock {
self.min_ts = Some(cmp::max(old_min_ts, cmp::min(min_start_ts, min_ts)))
} else {
self.min_ts = Some(min_ts);
self.min_ts = Some(cmp::max(old_min_ts, min_ts));
}
self.resolved_ts
}
Expand Down Expand Up @@ -153,6 +156,12 @@ mod tests {
Event::Unlock(2, None, Key::from_raw(b"a")),
Event::Resolve(3, 3),
],
vec![
Event::Lock(2, Key::from_raw(b"a")),
Event::Resolve(4, 2),
Event::Unlock(2, Some(3), Key::from_raw(b"a")),
Event::Resolve(5, 5),
],
];

for (i, case) in cases.into_iter().enumerate() {
Expand Down

0 comments on commit fc06dcc

Please sign in to comment.