Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log-backup: fix early return #13288

Merged
merged 6 commits into from
Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion components/backup-stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ engine_traits = { path = "../engine_traits", default-features = false }
error_code = { path = "../error_code" }
# We cannot update the etcd-client to latest version because of the cyclic requirement.
# Also we need wait until https://github.com/etcdv3/etcd-client/pull/43/files to be merged.
etcd-client = { git = "https://github.com/yujuncen/etcd-client", rev = "e0321a1990ee561cf042973666c0db61c8d82364", features = ["pub-response-field", "tls"] }
etcd-client = { git = "https://github.com/pingcap/etcd-client", rev = "e0321a1990ee561cf042973666c0db61c8d82364", features = ["pub-response-field", "tls"] }
external_storage = { path = "../external_storage", default-features = false }
external_storage_export = { path = "../external_storage/export", default-features = false }
fail = "0.5"
Expand Down
9 changes: 8 additions & 1 deletion components/backup-stream/src/event_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,16 @@ where
// we only need to record the disk throughput of this.
let (stat, disk_read) =
utils::with_record_read_throughput(|| event_loader.fill_entries());
// We must use the size of entry batch here to check whether we have progress.
// Or we may exit too early if there are only records:
// - can be inlined to `write` CF (hence it won't be written to default CF)
// - are prewritten. (hence it will only contains `Prewrite` records).
// In this condition, ALL records generate no ApplyEvent(only lock change),
// and we would exit after the first run of loop :(
let no_progress = event_loader.entry_batch.is_empty();
let stat = stat?;
self.with_resolver(region, |r| event_loader.emit_entries_to(&mut events, r))?;
if events.is_empty() {
if no_progress {
metrics::INITIAL_SCAN_DURATION.observe(start.saturating_elapsed_secs());
return Ok(stats.stat);
}
Expand Down
60 changes: 57 additions & 3 deletions components/backup-stream/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ impl SuiteBuilder {
for id in 1..=(n as u64) {
suite.start_endpoint(id, use_v3);
}
// TODO: The current mock metastore (slash_etc) doesn't supports multi-version.
// We must wait until the endpoints get ready to watching the metastore, or some
// modifies may be lost. Either make Endpoint::with_client wait until watch did
// start or make slash_etc support multi-version, then we can get rid of this
Expand Down Expand Up @@ -318,6 +317,19 @@ impl Suite {
inserted
}

fn commit_keys(&mut self, keys: Vec<Vec<u8>>, start_ts: TimeStamp, commit_ts: TimeStamp) {
let mut region_keys = HashMap::<u64, Vec<Vec<u8>>>::new();
for k in keys {
let enc_key = Key::from_raw(&k).into_encoded();
let region = self.cluster.get_region_id(&enc_key);
region_keys.entry(region).or_default().push(k);
}

for (region, keys) in region_keys {
self.must_kv_commit(region, keys, start_ts, commit_ts);
}
}

fn just_commit_a_key(&mut self, key: Vec<u8>, start_ts: TimeStamp, commit_ts: TimeStamp) {
let enc_key = Key::from_raw(&key).into_encoded();
let region = self.cluster.get_region_id(&enc_key);
Expand Down Expand Up @@ -604,10 +616,13 @@ mod test {
errors::Error, metadata::MetadataClient, router::TaskSelector, GetCheckpointResult,
RegionCheckpointOperation, RegionSet, Task,
};
use pd_client::PdClient;
use tikv_util::{box_err, defer, info, HandyRwLock};
use txn_types::TimeStamp;
use txn_types::{Key, TimeStamp};

use crate::{make_record_key, make_split_key_at_record, run_async_test, SuiteBuilder};
use crate::{
make_record_key, make_split_key_at_record, mutation, run_async_test, SuiteBuilder,
};

#[test]
fn basic() {
Expand Down Expand Up @@ -650,6 +665,45 @@ mod test {
suite.cluster.shutdown();
}

/// This test tests whether we can handle some weird transactions and their
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How the "weird transactions" looks like? Could you add some comments?

/// race with initial scanning.
#[test]
fn with_split_txn() {
let mut suite = super::SuiteBuilder::new_named("split_txn").use_v3().build();
run_async_test(async {
let start_ts = suite.cluster.pd_client.get_tso().await.unwrap();
let keys = (1..1960).map(|i| make_record_key(1, i)).collect::<Vec<_>>();
suite.must_kv_prewrite(
1,
keys.clone()
.into_iter()
.map(|k| mutation(k, b"hello, world".to_vec()))
.collect(),
make_record_key(1, 29),
start_ts,
);
let commit_ts = suite.cluster.pd_client.get_tso().await.unwrap();
suite.commit_keys(keys[1913..].to_vec(), start_ts, commit_ts);
suite.must_register_task(1, "test_split_txn");
suite.commit_keys(keys[..1913].to_vec(), start_ts, commit_ts);
suite.force_flush_files("test_split_txn");
suite.wait_for_flush();
let keys_encoded = keys
.iter()
.map(|v| {
Key::from_raw(v.as_slice())
.append_ts(commit_ts)
.into_encoded()
})
.collect::<Vec<_>>();
suite.check_for_write_records(
suite.flushed_files.path(),
keys_encoded.iter().map(Vec::as_slice),
);
});
suite.cluster.shutdown();
}

#[test]
/// This case tests whether the backup can continue when the leader failes.
fn leader_down() {
Expand Down