Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Fix unstable integration test test_auto_gc #5212

Merged
merged 3 commits into from Aug 8, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 6 additions & 7 deletions src/storage/gc_worker.rs
Expand Up @@ -704,6 +704,9 @@ impl<S: GCSafePointProvider, R: RegionInfoProvider> GCManager<S, R> {
/// Starts working in another thread. This function moves the `GCManager` and returns a handler
/// of it.
fn start(mut self) -> Result<GCManagerHandle> {
set_status_metrics(GCManagerState::Init);
self.initialize();

let (tx, rx) = mpsc::channel();
self.gc_manager_ctx.set_stop_signal_receiver(rx);
let res: Result<_> = ThreadBuilder::new()
Expand All @@ -728,9 +731,6 @@ impl<S: GCSafePointProvider, R: RegionInfoProvider> GCManager<S, R> {
}

fn run_impl(&mut self) -> GCManagerResult<()> {
set_status_metrics(GCManagerState::Init);
self.initialize()?;

loop {
AUTO_GC_PROCESSED_REGIONS_GAUGE_VEC
.with_label_values(&[PROCESS_TYPE_GC])
Expand All @@ -755,12 +755,11 @@ impl<S: GCSafePointProvider, R: RegionInfoProvider> GCManager<S, R> {
/// The only task of initializing is to simply get the current safe point as the initial value
/// of `safe_point`. TiKV won't do any GC automatically until the first time `safe_point` was
/// updated to a greater value than initial value.
fn initialize(&mut self) -> GCManagerResult<()> {
fn initialize(&mut self) {
debug!("gc-manager is initializing");
self.safe_point = 0;
self.try_update_safe_point();
debug!("gc-manager started"; "safe_point" => self.safe_point);
Ok(())
}

/// Waits until the safe_point updates. Returns the new safe point.
Expand Down Expand Up @@ -1294,7 +1293,7 @@ mod tests {
for safe_point in &safe_points {
test_util.add_next_safe_point(*safe_point);
}
test_util.gc_manager.as_mut().unwrap().initialize().unwrap();
test_util.gc_manager.as_mut().unwrap().initialize();

test_util.gc_manager.as_mut().unwrap().gc_a_round().unwrap();
test_util.stop();
Expand Down Expand Up @@ -1370,7 +1369,7 @@ mod tests {
assert_eq!(gc_manager.safe_point, 0);
test_util.add_next_safe_point(0);
test_util.add_next_safe_point(5);
gc_manager.initialize().unwrap();
gc_manager.initialize();
assert_eq!(gc_manager.safe_point, 0);
assert!(gc_manager.try_update_safe_point());
assert_eq!(gc_manager.safe_point, 5);
Expand Down
8 changes: 5 additions & 3 deletions tests/integrations/storage/test_raft_storage.rs
Expand Up @@ -288,9 +288,6 @@ fn test_auto_gc() {

assert_eq!(storages.len(), count);

// Initialize gc workers
pd_client.set_gc_safe_point(1);

// test_data will be wrote with ts < 50
let test_data: Vec<_> = [
(b"k1", b"v1"),
Expand Down Expand Up @@ -349,4 +346,9 @@ fn test_auto_gc() {
check_data(&mut cluster, &storages, &test_data, 50, false);
check_data(&mut cluster, &storages, &test_data2, 150, true);
check_data(&mut cluster, &storages, &test_data3, 250, true);

// No more signals.
finish_signal_rx
.recv_timeout(Duration::from_millis(300))
.unwrap_err();
}