Skip to content

Commit

Permalink
cdc: pick some fixes from master (#11632)
Browse files Browse the repository at this point in the history
* resolved_ts: return early after regions already had quorum (#11352)

* close #11351

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* fix tests

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* fix tests

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* Apply suggestions from code review

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

Co-authored-by: Zixiong Liu <liuzixiong@pingcap.com>

* address comments

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* make clippy happy

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

* close #11400

Signed-off-by: 5kbpers <tangminghua@pingcap.com>

Co-authored-by: qupeng <qupeng@pingcap.com>
Co-authored-by: Zixiong Liu <liuzixiong@pingcap.com>
Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>

* Support to iterate table properties for KvEngine  (#11388)

* support to iterate table properties for KvEngine
* close #11387

Signed-off-by: qupeng <qupeng@pingcap.com>

* cdc: introduce TsFilter into incremental scan (#11385)

* close #11384

Signed-off-by: qupeng <qupeng@pingcap.com>

* support to get approximate keys and size for a given range from UserCollectedProperties (#11465)

Signed-off-by: qupeng <qupeng@pingcap.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>

* cherry pick pr #11615

Signed-off-by: qupeng <qupeng@pingcap.com>

Co-authored-by: 5kbpers <tangminghua@pingcap.com>
Co-authored-by: Zixiong Liu <liuzixiong@pingcap.com>
Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
4 people committed Dec 10, 2021
1 parent 6c14247 commit b3a6200
Show file tree
Hide file tree
Showing 32 changed files with 1,058 additions and 630 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions components/backup/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ mod tests {
use std::path::Path;
use tempfile::TempDir;
use tikv::storage::TestEngineBuilder;
use txn_types::OldValue;

type CfKvs<'a> = (engine_traits::CfName, &'a [(&'a [u8], &'a [u8])]);

Expand Down Expand Up @@ -475,7 +476,7 @@ mod tests {
vec![TxnEntry::Commit {
default: (vec![], vec![]),
write: (vec![b'a'], vec![b'a']),
old_value: None,
old_value: OldValue::None,
}]
.into_iter(),
false,
Expand Down Expand Up @@ -515,12 +516,12 @@ mod tests {
TxnEntry::Commit {
default: (vec![b'a'], vec![b'a']),
write: (vec![b'a'], vec![b'a']),
old_value: None,
old_value: OldValue::None,
},
TxnEntry::Commit {
default: (vec![], vec![]),
write: (vec![b'b'], vec![]),
old_value: None,
old_value: OldValue::None,
},
]
.into_iter(),
Expand Down
2 changes: 2 additions & 0 deletions components/cdc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@ failpoints = ["tikv/failpoints"]
[dependencies]
bitflags = "1.0"
crossbeam = "0.8"
engine_rocks = { path = "../engine_rocks", default-features = false }
engine_traits = { path = "../engine_traits", default-features = false }
futures = "0.3"
grpcio = { version = "0.9", default-features = false, features = ["openssl-vendored"] }
kvproto = { git = "https://github.com/pingcap/kvproto.git", default-features = false }
keys = { path = "../keys" }
pd_client = { path = "../pd_client", default-features = false }
raft = { version = "0.6.0-alpha", default-features = false }
raftstore = { path = "../raftstore", default-features = false }
Expand Down
48 changes: 19 additions & 29 deletions components/cdc/src/delegate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use raftstore::store::util::compare_region_epoch;
use raftstore::Error as RaftStoreError;
use resolved_ts::Resolver;
use tikv::storage::txn::TxnEntry;
use tikv_util::time::Instant;
use tikv::storage::Statistics;
use tikv_util::{debug, info, warn};
use txn_types::{Key, Lock, LockType, TimeStamp, WriteBatchFlags, WriteRef, WriteType};

Expand Down Expand Up @@ -402,6 +402,7 @@ impl Delegate {
batch: CmdBatch,
old_value_cb: &OldValueCallback,
old_value_cache: &mut OldValueCache,
statistics: &mut Statistics,
) -> Result<()> {
// Stale CmdBatch, drop it silently.
if batch.cdc_id != self.handle.id {
Expand All @@ -426,6 +427,7 @@ impl Delegate {
request.requests.into(),
old_value_cb,
old_value_cache,
statistics,
is_one_pc,
)?;
} else {
Expand Down Expand Up @@ -462,7 +464,7 @@ impl Delegate {
current_rows_size = 0;
}
current_rows_size += row_size;
row.old_value = old_value.unwrap_or_default();
row.old_value = old_value.finalized().unwrap_or_default();
rows.last_mut().unwrap().push(row);
}
Some(TxnEntry::Commit {
Expand All @@ -489,7 +491,7 @@ impl Delegate {
continue;
}
set_event_row_type(&mut row, EventLogType::Committed);
row.old_value = old_value.unwrap_or_default();
row.old_value = old_value.finalized().unwrap_or_default();
let row_size = row.key.len() + row.value.len();
if current_rows_size + row_size >= CDC_EVENT_MAX_BYTES {
rows.push(Vec::with_capacity(entries_len));
Expand Down Expand Up @@ -531,35 +533,24 @@ impl Delegate {
requests: Vec<Request>,
old_value_cb: &OldValueCallback,
old_value_cache: &mut OldValueCache,
statistics: &mut Statistics,
is_one_pc: bool,
) -> Result<()> {
let txn_extra_op = self.txn_extra_op;
let mut read_old_value = |row: &mut EventRow, read_old_ts| {
let mut read_old_value = |row: &mut EventRow, read_old_ts| -> Result<()> {
if txn_extra_op == TxnExtraOp::ReadOldValue {
let key = Key::from_raw(&row.key).append_ts(row.start_ts.into());
let start = Instant::now();
let (old_value, statistics) = old_value_cb(key, read_old_ts, old_value_cache);
let old_value = old_value_cb(key, read_old_ts, old_value_cache, statistics)?;
row.old_value = old_value.unwrap_or_default();
CDC_OLD_VALUE_DURATION_HISTOGRAM
.with_label_values(&["all"])
.observe(start.saturating_elapsed().as_secs_f64());
if let Some(statistics) = statistics {
for (cf, cf_details) in statistics.details().iter() {
for (tag, count) in cf_details.iter() {
CDC_OLD_VALUE_SCAN_DETAILS
.with_label_values(&[*cf, *tag])
.inc_by(*count as u64);
}
}
}
}
Ok(())
};

let mut rows: HashMap<Vec<u8>, EventRow> = HashMap::default();
for mut req in requests {
match req.get_cmd_type() {
CmdType::Put => {
self.sink_put(req.take_put(), is_one_pc, &mut rows, &mut read_old_value)
self.sink_put(req.take_put(), is_one_pc, &mut rows, &mut read_old_value)?;
}
CmdType::Delete => self.sink_delete(req.take_delete()),
_ => {
Expand Down Expand Up @@ -620,20 +611,19 @@ impl Delegate {
mut put: PutRequest,
is_one_pc: bool,
rows: &mut HashMap<Vec<u8>, EventRow>,
mut read_old_value: impl FnMut(&mut EventRow, /* read_old_ts */ TimeStamp),
) {
mut read_old_value: impl FnMut(&mut EventRow, TimeStamp) -> Result<()>,
) -> Result<()> {
match put.cf.as_str() {
"write" => {
let mut row = EventRow::default();
let skip = decode_write(put.take_key(), put.get_value(), &mut row, true);
if skip {
return;
if decode_write(put.take_key(), put.get_value(), &mut row, true) {
return Ok(());
}

let commit_ts = if is_one_pc {
set_event_row_type(&mut row, EventLogType::Committed);
let commit_ts = TimeStamp::from(row.commit_ts);
read_old_value(&mut row, commit_ts.prev());
read_old_value(&mut row, commit_ts.prev())?;
Some(commit_ts)
} else {
// 2PC
Expand Down Expand Up @@ -668,13 +658,12 @@ impl Delegate {
let mut row = EventRow::default();
let lock = Lock::parse(put.get_value()).unwrap();
let for_update_ts = lock.for_update_ts;
let skip = decode_lock(put.take_key(), lock, &mut row);
if skip {
return;
if decode_lock(put.take_key(), lock, &mut row) {
return Ok(());
}

let read_old_ts = std::cmp::max(for_update_ts, row.start_ts.into());
read_old_value(&mut row, read_old_ts);
read_old_value(&mut row, read_old_ts)?;
let occupied = rows.entry(row.key.clone()).or_default();
if !occupied.value.is_empty() {
assert!(row.value.is_empty());
Expand Down Expand Up @@ -712,6 +701,7 @@ impl Delegate {
panic!("invalid cf {}", other);
}
}
Ok(())
}

fn sink_delete(&mut self, mut delete: DeleteRequest) {
Expand Down
Loading

0 comments on commit b3a6200

Please sign in to comment.