From d549f91628142e3439efacab667fa5608138414b Mon Sep 17 00:00:00 2001 From: NingLin-P Date: Fri, 25 Mar 2022 19:46:34 +0800 Subject: [PATCH 1/2] cherry pick #12251 to release-5.3 Signed-off-by: ti-srebot --- components/raftstore/src/store/peer.rs | 8 +- components/test_raftstore/src/cluster.rs | 22 ++ tests/failpoints/cases/test_merge.rs | 442 +++++++++++++++++++++++ 3 files changed, 471 insertions(+), 1 deletion(-) diff --git a/components/raftstore/src/store/peer.rs b/components/raftstore/src/store/peer.rs index 0f5db68ef01..e0f1778c9fb 100644 --- a/components/raftstore/src/store/peer.rs +++ b/components/raftstore/src/store/peer.rs @@ -960,7 +960,13 @@ where &mut kv_wb, ®ion, PeerState::Tombstone, - self.pending_merge_state.clone(), + // Only persist the `merge_state` if the merge is known to be succeeded + // which is determined by the `keep_data` flag + if keep_data { + self.pending_merge_state.clone() + } else { + None + }, )?; // write kv rocksdb first in case of restart happen between two write let mut write_opts = WriteOptions::new(); diff --git a/components/test_raftstore/src/cluster.rs b/components/test_raftstore/src/cluster.rs index 662c646e4dc..8f435d7c23b 100644 --- a/components/test_raftstore/src/cluster.rs +++ b/components/test_raftstore/src/cluster.rs @@ -1120,6 +1120,28 @@ impl Cluster { .unwrap() } + pub fn must_peer_state(&self, region_id: u64, store_id: u64, peer_state: PeerState) { + for _ in 0..100 { + let state = self + .get_engine(store_id) + .c() + .get_msg_cf::( + engine_traits::CF_RAFT, + &keys::region_state_key(region_id), + ) + .unwrap() + .unwrap(); + if state.get_state() == peer_state { + return; + } + sleep_ms(10); + } + panic!( + "[region {}] peer state still not reach {:?}", + region_id, peer_state + ); + } + pub fn wait_last_index( &mut self, region_id: u64, diff --git a/tests/failpoints/cases/test_merge.rs b/tests/failpoints/cases/test_merge.rs index 0bc098592d6..143eddca40e 100644 --- a/tests/failpoints/cases/test_merge.rs +++ b/tests/failpoints/cases/test_merge.rs @@ -1302,3 +1302,445 @@ fn test_merge_election_and_restart() { // If logs from different term are process correctly, store 2 should have latest updates. must_get_equal(&cluster.get_engine(2), b"k14", b"v14"); } +<<<<<<< HEAD +======= + +/// Testing that the source peer's read delegate should not be removed by the target peer +/// and only removed when the peer is destroyed +#[test] +fn test_source_peer_read_delegate_after_apply() { + let mut cluster = new_node_cluster(0, 3); + configure_for_merge(&mut cluster); + + let pd_client = Arc::clone(&cluster.pd_client); + pd_client.disable_default_operator(); + + cluster.run(); + + cluster.must_split(&cluster.get_region(b""), b"k2"); + let target = cluster.get_region(b"k1"); + let source = cluster.get_region(b"k3"); + + cluster.must_transfer_leader(target.get_id(), find_peer(&target, 1).unwrap().to_owned()); + + let on_destroy_peer_fp = "destroy_peer"; + fail::cfg(on_destroy_peer_fp, "pause").unwrap(); + + // Merge finish means the leader of the target region have call `on_ready_commit_merge` + pd_client.must_merge(source.get_id(), target.get_id()); + + // The source peer's `ReadDelegate` should not be removed yet and mark as `pending_remove` + assert!( + cluster.store_metas[&1] + .lock() + .unwrap() + .readers + .get(&source.get_id()) + .unwrap() + .pending_remove + ); + + fail::remove(on_destroy_peer_fp); + // Wait for source peer is destroyed + sleep_ms(100); + + assert!( + cluster.store_metas[&1] + .lock() + .unwrap() + .readers + .get(&source.get_id()) + .is_none() + ); +} + +#[test] +fn test_merge_with_concurrent_pessimistic_locking() { + let mut cluster = new_server_cluster(0, 2); + configure_for_merge(&mut cluster); + cluster.cfg.pessimistic_txn.pipelined = true; + cluster.cfg.pessimistic_txn.in_memory = true; + cluster.run(); + + cluster.must_transfer_leader(1, new_peer(1, 1)); + + cluster.must_put(b"k1", b"v1"); + cluster.must_put(b"k3", b"v3"); + + let region = cluster.get_region(b"k1"); + cluster.must_split(®ion, b"k2"); + let left = cluster.get_region(b"k1"); + let right = cluster.get_region(b"k3"); + + // Transfer the leader of the right region to store 2. The leaders of source and target + // regions don't need to be on the same store. + cluster.must_transfer_leader(right.id, new_peer(2, 2)); + + let snapshot = cluster.must_get_snapshot_of_region(left.id); + let txn_ext = snapshot.txn_ext.unwrap(); + assert!( + txn_ext + .pessimistic_locks + .write() + .insert(vec![( + Key::from_raw(b"k0"), + PessimisticLock { + primary: b"k0".to_vec().into_boxed_slice(), + start_ts: 10.into(), + ttl: 3000, + for_update_ts: 20.into(), + min_commit_ts: 30.into(), + }, + )]) + .is_ok() + ); + + let addr = cluster.sim.rl().get_addr(1); + let env = Arc::new(Environment::new(1)); + let channel = ChannelBuilder::new(env).connect(&addr); + let client = TikvClient::new(channel); + + fail::cfg("before_propose_locks_on_region_merge", "pause").unwrap(); + + // 1. Locking before proposing pessimistic locks in the source region can succeed. + let client2 = client.clone(); + let mut mutation = Mutation::default(); + mutation.set_op(Op::PessimisticLock); + mutation.key = b"k1".to_vec(); + let mut req = PessimisticLockRequest::default(); + req.set_context(cluster.get_ctx(b"k1")); + req.set_mutations(vec![mutation].into()); + req.set_start_version(10); + req.set_for_update_ts(10); + req.set_primary_lock(b"k1".to_vec()); + fail::cfg("txn_before_process_write", "pause").unwrap(); + let res = thread::spawn(move || client2.kv_pessimistic_lock(&req).unwrap()); + thread::sleep(Duration::from_millis(150)); + cluster.merge_region(left.id, right.id, Callback::None); + thread::sleep(Duration::from_millis(150)); + fail::remove("txn_before_process_write"); + let resp = res.join().unwrap(); + assert!(!resp.has_region_error()); + fail::remove("before_propose_locks_on_region_merge"); + + // 2. After locks are proposed, later pessimistic lock request should fail. + let mut mutation = Mutation::default(); + mutation.set_op(Op::PessimisticLock); + mutation.key = b"k11".to_vec(); + let mut req = PessimisticLockRequest::default(); + req.set_context(cluster.get_ctx(b"k11")); + req.set_mutations(vec![mutation].into()); + req.set_start_version(10); + req.set_for_update_ts(10); + req.set_primary_lock(b"k11".to_vec()); + fail::cfg("txn_before_process_write", "pause").unwrap(); + let res = thread::spawn(move || client.kv_pessimistic_lock(&req).unwrap()); + thread::sleep(Duration::from_millis(200)); + fail::remove("txn_before_process_write"); + let resp = res.join().unwrap(); + assert!(resp.has_region_error()); +} + +#[test] +fn test_merge_pessimistic_locks_with_concurrent_prewrite() { + let mut cluster = new_server_cluster(0, 2); + configure_for_merge(&mut cluster); + cluster.cfg.pessimistic_txn.pipelined = true; + cluster.cfg.pessimistic_txn.in_memory = true; + let pd_client = Arc::clone(&cluster.pd_client); + pd_client.disable_default_operator(); + + cluster.run(); + + cluster.must_transfer_leader(1, new_peer(1, 1)); + + cluster.must_put(b"k1", b"v1"); + cluster.must_put(b"k3", b"v3"); + + let region = cluster.get_region(b"k1"); + cluster.must_split(®ion, b"k2"); + let left = cluster.get_region(b"k1"); + let right = cluster.get_region(b"k3"); + + cluster.must_transfer_leader(right.id, new_peer(2, 2)); + + let addr = cluster.sim.rl().get_addr(1); + let env = Arc::new(Environment::new(1)); + let channel = ChannelBuilder::new(env).connect(&addr); + let client = TikvClient::new(channel); + + let snapshot = cluster.must_get_snapshot_of_region(left.id); + let txn_ext = snapshot.txn_ext.unwrap(); + let lock = PessimisticLock { + primary: b"k0".to_vec().into_boxed_slice(), + start_ts: 10.into(), + ttl: 3000, + for_update_ts: 20.into(), + min_commit_ts: 30.into(), + }; + assert!( + txn_ext + .pessimistic_locks + .write() + .insert(vec![ + (Key::from_raw(b"k0"), lock.clone()), + (Key::from_raw(b"k1"), lock), + ]) + .is_ok() + ); + + let mut mutation = Mutation::default(); + mutation.set_op(Op::Put); + mutation.set_key(b"k0".to_vec()); + mutation.set_value(b"v".to_vec()); + let mut req = PrewriteRequest::default(); + req.set_context(cluster.get_ctx(b"k0")); + req.set_mutations(vec![mutation].into()); + req.set_is_pessimistic_lock(vec![true]); + req.set_start_version(10); + req.set_for_update_ts(40); + req.set_primary_lock(b"k0".to_vec()); + + // First, pause apply and prewrite. + fail::cfg("on_handle_apply", "pause").unwrap(); + let req2 = req.clone(); + let client2 = client.clone(); + let resp = thread::spawn(move || client2.kv_prewrite(&req2).unwrap()); + thread::sleep(Duration::from_millis(150)); + + // Then, start merging. PrepareMerge should wait until prewrite is done. + cluster.merge_region(left.id, right.id, Callback::None); + thread::sleep(Duration::from_millis(150)); + assert!(txn_ext.pessimistic_locks.read().is_writable()); + + // But a later prewrite request should fail because we have already banned all later proposals. + req.mut_mutations()[0].set_key(b"k1".to_vec()); + let resp2 = thread::spawn(move || client.kv_prewrite(&req).unwrap()); + + fail::remove("on_handle_apply"); + let resp = resp.join().unwrap(); + assert!(!resp.has_region_error(), "{:?}", resp); + + let resp2 = resp2.join().unwrap(); + assert!(resp2.has_region_error()); +} + +#[test] +fn test_retry_pending_prepare_merge_fail() { + let mut cluster = new_server_cluster(0, 2); + configure_for_merge(&mut cluster); + cluster.cfg.pessimistic_txn.pipelined = true; + cluster.cfg.pessimistic_txn.in_memory = true; + let pd_client = Arc::clone(&cluster.pd_client); + pd_client.disable_default_operator(); + + cluster.run(); + + cluster.must_transfer_leader(1, new_peer(1, 1)); + + cluster.must_put(b"k1", b"v1"); + cluster.must_put(b"k3", b"v3"); + + let region = cluster.get_region(b"k1"); + cluster.must_split(®ion, b"k2"); + let left = cluster.get_region(b"k1"); + let right = cluster.get_region(b"k3"); + + cluster.must_transfer_leader(right.id, new_peer(2, 2)); + + // Insert lock l1 into the left region + let snapshot = cluster.must_get_snapshot_of_region(left.id); + let txn_ext = snapshot.txn_ext.unwrap(); + let l1 = PessimisticLock { + primary: b"k1".to_vec().into_boxed_slice(), + start_ts: 10.into(), + ttl: 3000, + for_update_ts: 20.into(), + min_commit_ts: 30.into(), + }; + assert!( + txn_ext + .pessimistic_locks + .write() + .insert(vec![(Key::from_raw(b"k1"), l1)]) + .is_ok() + ); + + // Pause apply and write some data to the left region + fail::cfg("on_handle_apply", "pause").unwrap(); + let (propose_tx, propose_rx) = mpsc::sync_channel(10); + fail::cfg_callback("after_propose", move || propose_tx.send(()).unwrap()).unwrap(); + + let rx = cluster.async_put(b"k1", b"v11").unwrap(); + propose_rx.recv_timeout(Duration::from_secs(2)).unwrap(); + assert!(rx.recv_timeout(Duration::from_millis(200)).is_err()); + + // Then, start merging. PrepareMerge should become pending because applied_index is smaller + // than proposed_index. + cluster.merge_region(left.id, right.id, Callback::None); + propose_rx.recv_timeout(Duration::from_secs(2)).unwrap(); + thread::sleep(Duration::from_millis(200)); + assert!(txn_ext.pessimistic_locks.read().is_writable()); + + // Set disk full error to let PrepareMerge fail. (Set both peer to full to avoid transferring leader) + fail::cfg("disk_already_full_peer_1", "return").unwrap(); + fail::cfg("disk_already_full_peer_2", "return").unwrap(); + fail::remove("on_handle_apply"); + let res = rx.recv_timeout(Duration::from_secs(1)).unwrap(); + assert!(!res.get_header().has_error(), "{:?}", res); + + propose_rx.recv_timeout(Duration::from_secs(2)).unwrap(); + fail::remove("disk_already_full_peer_1"); + fail::remove("disk_already_full_peer_2"); + + // Merge should not succeed because the disk is full. + thread::sleep(Duration::from_millis(300)); + cluster.reset_leader_of_region(left.id); + assert_eq!(cluster.get_region(b"k1"), left); + + cluster.must_put(b"k1", b"v12"); +} + +#[test] +fn test_merge_pessimistic_locks_propose_fail() { + let mut cluster = new_server_cluster(0, 2); + configure_for_merge(&mut cluster); + cluster.cfg.pessimistic_txn.pipelined = true; + cluster.cfg.pessimistic_txn.in_memory = true; + let pd_client = Arc::clone(&cluster.pd_client); + pd_client.disable_default_operator(); + + cluster.run(); + + cluster.must_transfer_leader(1, new_peer(1, 1)); + + cluster.must_put(b"k1", b"v1"); + cluster.must_put(b"k3", b"v3"); + + let region = cluster.get_region(b"k1"); + cluster.must_split(®ion, b"k2"); + let left = cluster.get_region(b"k1"); + let right = cluster.get_region(b"k3"); + + // Sending a TransferLeaeder message to make left region fail to propose. + + let snapshot = cluster.must_get_snapshot_of_region(left.id); + let txn_ext = snapshot.ext().get_txn_ext().unwrap().clone(); + let lock = PessimisticLock { + primary: b"k1".to_vec().into_boxed_slice(), + start_ts: 10.into(), + ttl: 3000, + for_update_ts: 20.into(), + min_commit_ts: 30.into(), + }; + assert!( + txn_ext + .pessimistic_locks + .write() + .insert(vec![(Key::from_raw(b"k1"), lock)]) + .is_ok() + ); + + fail::cfg("raft_propose", "pause").unwrap(); + + cluster.merge_region(left.id, right.id, Callback::None); + thread::sleep(Duration::from_millis(200)); + assert_eq!( + txn_ext.pessimistic_locks.read().status, + LocksStatus::MergingRegion + ); + + // With the fail point set, we will fail to propose the locks or the PrepareMerge request. + fail::cfg("raft_propose", "return()").unwrap(); + + // But after that, the pessimistic locks status should remain unchanged. + for _ in 0..5 { + thread::sleep(Duration::from_millis(200)); + if txn_ext.pessimistic_locks.read().status == LocksStatus::Normal { + return; + } + } + panic!( + "pessimistic locks status should return to Normal, but got {:?}", + txn_ext.pessimistic_locks.read().status + ); +} + +// Testing that when the source peer is destroyed while merging, it should not persist the `merge_state` +// thus won't generate gc message to destroy other peers +#[test] +fn test_destroy_source_peer_while_merging() { + let mut cluster = new_node_cluster(0, 5); + configure_for_merge(&mut cluster); + let pd_client = Arc::clone(&cluster.pd_client); + pd_client.disable_default_operator(); + + cluster.run(); + + cluster.must_put(b"k1", b"v1"); + cluster.must_put(b"k3", b"v3"); + for i in 1..=5 { + must_get_equal(&cluster.get_engine(i), b"k1", b"v1"); + must_get_equal(&cluster.get_engine(i), b"k3", b"v3"); + } + + cluster.must_split(&pd_client.get_region(b"k1").unwrap(), b"k2"); + let left = pd_client.get_region(b"k1").unwrap(); + let right = pd_client.get_region(b"k3").unwrap(); + cluster.must_transfer_leader(right.get_id(), new_peer(1, 1)); + + let schedule_merge_fp = "on_schedule_merge"; + fail::cfg(schedule_merge_fp, "return()").unwrap(); + + // Start merge and wait until peer 5 apply prepare merge + cluster.must_try_merge(right.get_id(), left.get_id()); + cluster.must_peer_state(right.get_id(), 5, PeerState::Merging); + + // filter heartbeat and append message for peer 5 + cluster.add_send_filter(CloneFilterFactory( + RegionPacketFilter::new(right.get_id(), 5) + .direction(Direction::Recv) + .msg_type(MessageType::MsgHeartbeat) + .msg_type(MessageType::MsgAppend), + )); + + // remove peer from target region to trigger merge rollback. + pd_client.must_remove_peer(left.get_id(), find_peer(&left, 2).unwrap().clone()); + must_get_none(&cluster.get_engine(2), b"k1"); + + // Merge must rollbacked if we can put more data to the source region + fail::remove(schedule_merge_fp); + cluster.must_put(b"k4", b"v4"); + for i in 1..=4 { + must_get_equal(&cluster.get_engine(i), b"k4", b"v4"); + } + + // remove peer 5 from peer list so it will destroy itself by tombstone message + // and should not persist the `merge_state` + pd_client.must_remove_peer(right.get_id(), new_peer(5, 5)); + must_get_none(&cluster.get_engine(5), b"k3"); + + // so that other peers will send message to store 5 + pd_client.must_add_peer(right.get_id(), new_peer(5, 6)); + // but it is still in tombstone state due to the message filter + let state = cluster.region_local_state(right.get_id(), 5); + assert_eq!(state.get_state(), PeerState::Tombstone); + + // let the peer on store 4 have a larger peer id + pd_client.must_remove_peer(right.get_id(), new_peer(4, 4)); + pd_client.must_add_peer(right.get_id(), new_peer(4, 7)); + must_get_equal(&cluster.get_engine(4), b"k4", b"v4"); + + // if store 5 have persist the merge state, peer 2 and peer 3 will be destroyed because + // store 5 will response their request vote message with a gc message, and peer 7 will cause + // store 5 panic because peer 7 have larger peer id than the peer in the merge state + cluster.clear_send_filters(); + cluster.add_send_filter(IsolationFilterFactory::new(1)); + + cluster.must_put(b"k5", b"v5"); + assert!(!state.has_merge_state(), "{:?}", state); + for i in 2..=5 { + must_get_equal(&cluster.get_engine(i), b"k5", b"v5"); + } +} +>>>>>>> 3364702e1... raftstore: only persist merge target if the merge is known to be succeeded (#12251) From f1983596303ab44c4b5c44965fa8a66c39aeba22 Mon Sep 17 00:00:00 2001 From: linning Date: Tue, 29 Mar 2022 15:28:16 +0800 Subject: [PATCH 2/2] resolve conflict Signed-off-by: linning --- tests/failpoints/cases/test_merge.rs | 364 --------------------------- 1 file changed, 364 deletions(-) diff --git a/tests/failpoints/cases/test_merge.rs b/tests/failpoints/cases/test_merge.rs index 143eddca40e..cd08d553f50 100644 --- a/tests/failpoints/cases/test_merge.rs +++ b/tests/failpoints/cases/test_merge.rs @@ -1302,369 +1302,6 @@ fn test_merge_election_and_restart() { // If logs from different term are process correctly, store 2 should have latest updates. must_get_equal(&cluster.get_engine(2), b"k14", b"v14"); } -<<<<<<< HEAD -======= - -/// Testing that the source peer's read delegate should not be removed by the target peer -/// and only removed when the peer is destroyed -#[test] -fn test_source_peer_read_delegate_after_apply() { - let mut cluster = new_node_cluster(0, 3); - configure_for_merge(&mut cluster); - - let pd_client = Arc::clone(&cluster.pd_client); - pd_client.disable_default_operator(); - - cluster.run(); - - cluster.must_split(&cluster.get_region(b""), b"k2"); - let target = cluster.get_region(b"k1"); - let source = cluster.get_region(b"k3"); - - cluster.must_transfer_leader(target.get_id(), find_peer(&target, 1).unwrap().to_owned()); - - let on_destroy_peer_fp = "destroy_peer"; - fail::cfg(on_destroy_peer_fp, "pause").unwrap(); - - // Merge finish means the leader of the target region have call `on_ready_commit_merge` - pd_client.must_merge(source.get_id(), target.get_id()); - - // The source peer's `ReadDelegate` should not be removed yet and mark as `pending_remove` - assert!( - cluster.store_metas[&1] - .lock() - .unwrap() - .readers - .get(&source.get_id()) - .unwrap() - .pending_remove - ); - - fail::remove(on_destroy_peer_fp); - // Wait for source peer is destroyed - sleep_ms(100); - - assert!( - cluster.store_metas[&1] - .lock() - .unwrap() - .readers - .get(&source.get_id()) - .is_none() - ); -} - -#[test] -fn test_merge_with_concurrent_pessimistic_locking() { - let mut cluster = new_server_cluster(0, 2); - configure_for_merge(&mut cluster); - cluster.cfg.pessimistic_txn.pipelined = true; - cluster.cfg.pessimistic_txn.in_memory = true; - cluster.run(); - - cluster.must_transfer_leader(1, new_peer(1, 1)); - - cluster.must_put(b"k1", b"v1"); - cluster.must_put(b"k3", b"v3"); - - let region = cluster.get_region(b"k1"); - cluster.must_split(®ion, b"k2"); - let left = cluster.get_region(b"k1"); - let right = cluster.get_region(b"k3"); - - // Transfer the leader of the right region to store 2. The leaders of source and target - // regions don't need to be on the same store. - cluster.must_transfer_leader(right.id, new_peer(2, 2)); - - let snapshot = cluster.must_get_snapshot_of_region(left.id); - let txn_ext = snapshot.txn_ext.unwrap(); - assert!( - txn_ext - .pessimistic_locks - .write() - .insert(vec![( - Key::from_raw(b"k0"), - PessimisticLock { - primary: b"k0".to_vec().into_boxed_slice(), - start_ts: 10.into(), - ttl: 3000, - for_update_ts: 20.into(), - min_commit_ts: 30.into(), - }, - )]) - .is_ok() - ); - - let addr = cluster.sim.rl().get_addr(1); - let env = Arc::new(Environment::new(1)); - let channel = ChannelBuilder::new(env).connect(&addr); - let client = TikvClient::new(channel); - - fail::cfg("before_propose_locks_on_region_merge", "pause").unwrap(); - - // 1. Locking before proposing pessimistic locks in the source region can succeed. - let client2 = client.clone(); - let mut mutation = Mutation::default(); - mutation.set_op(Op::PessimisticLock); - mutation.key = b"k1".to_vec(); - let mut req = PessimisticLockRequest::default(); - req.set_context(cluster.get_ctx(b"k1")); - req.set_mutations(vec![mutation].into()); - req.set_start_version(10); - req.set_for_update_ts(10); - req.set_primary_lock(b"k1".to_vec()); - fail::cfg("txn_before_process_write", "pause").unwrap(); - let res = thread::spawn(move || client2.kv_pessimistic_lock(&req).unwrap()); - thread::sleep(Duration::from_millis(150)); - cluster.merge_region(left.id, right.id, Callback::None); - thread::sleep(Duration::from_millis(150)); - fail::remove("txn_before_process_write"); - let resp = res.join().unwrap(); - assert!(!resp.has_region_error()); - fail::remove("before_propose_locks_on_region_merge"); - - // 2. After locks are proposed, later pessimistic lock request should fail. - let mut mutation = Mutation::default(); - mutation.set_op(Op::PessimisticLock); - mutation.key = b"k11".to_vec(); - let mut req = PessimisticLockRequest::default(); - req.set_context(cluster.get_ctx(b"k11")); - req.set_mutations(vec![mutation].into()); - req.set_start_version(10); - req.set_for_update_ts(10); - req.set_primary_lock(b"k11".to_vec()); - fail::cfg("txn_before_process_write", "pause").unwrap(); - let res = thread::spawn(move || client.kv_pessimistic_lock(&req).unwrap()); - thread::sleep(Duration::from_millis(200)); - fail::remove("txn_before_process_write"); - let resp = res.join().unwrap(); - assert!(resp.has_region_error()); -} - -#[test] -fn test_merge_pessimistic_locks_with_concurrent_prewrite() { - let mut cluster = new_server_cluster(0, 2); - configure_for_merge(&mut cluster); - cluster.cfg.pessimistic_txn.pipelined = true; - cluster.cfg.pessimistic_txn.in_memory = true; - let pd_client = Arc::clone(&cluster.pd_client); - pd_client.disable_default_operator(); - - cluster.run(); - - cluster.must_transfer_leader(1, new_peer(1, 1)); - - cluster.must_put(b"k1", b"v1"); - cluster.must_put(b"k3", b"v3"); - - let region = cluster.get_region(b"k1"); - cluster.must_split(®ion, b"k2"); - let left = cluster.get_region(b"k1"); - let right = cluster.get_region(b"k3"); - - cluster.must_transfer_leader(right.id, new_peer(2, 2)); - - let addr = cluster.sim.rl().get_addr(1); - let env = Arc::new(Environment::new(1)); - let channel = ChannelBuilder::new(env).connect(&addr); - let client = TikvClient::new(channel); - - let snapshot = cluster.must_get_snapshot_of_region(left.id); - let txn_ext = snapshot.txn_ext.unwrap(); - let lock = PessimisticLock { - primary: b"k0".to_vec().into_boxed_slice(), - start_ts: 10.into(), - ttl: 3000, - for_update_ts: 20.into(), - min_commit_ts: 30.into(), - }; - assert!( - txn_ext - .pessimistic_locks - .write() - .insert(vec![ - (Key::from_raw(b"k0"), lock.clone()), - (Key::from_raw(b"k1"), lock), - ]) - .is_ok() - ); - - let mut mutation = Mutation::default(); - mutation.set_op(Op::Put); - mutation.set_key(b"k0".to_vec()); - mutation.set_value(b"v".to_vec()); - let mut req = PrewriteRequest::default(); - req.set_context(cluster.get_ctx(b"k0")); - req.set_mutations(vec![mutation].into()); - req.set_is_pessimistic_lock(vec![true]); - req.set_start_version(10); - req.set_for_update_ts(40); - req.set_primary_lock(b"k0".to_vec()); - - // First, pause apply and prewrite. - fail::cfg("on_handle_apply", "pause").unwrap(); - let req2 = req.clone(); - let client2 = client.clone(); - let resp = thread::spawn(move || client2.kv_prewrite(&req2).unwrap()); - thread::sleep(Duration::from_millis(150)); - - // Then, start merging. PrepareMerge should wait until prewrite is done. - cluster.merge_region(left.id, right.id, Callback::None); - thread::sleep(Duration::from_millis(150)); - assert!(txn_ext.pessimistic_locks.read().is_writable()); - - // But a later prewrite request should fail because we have already banned all later proposals. - req.mut_mutations()[0].set_key(b"k1".to_vec()); - let resp2 = thread::spawn(move || client.kv_prewrite(&req).unwrap()); - - fail::remove("on_handle_apply"); - let resp = resp.join().unwrap(); - assert!(!resp.has_region_error(), "{:?}", resp); - - let resp2 = resp2.join().unwrap(); - assert!(resp2.has_region_error()); -} - -#[test] -fn test_retry_pending_prepare_merge_fail() { - let mut cluster = new_server_cluster(0, 2); - configure_for_merge(&mut cluster); - cluster.cfg.pessimistic_txn.pipelined = true; - cluster.cfg.pessimistic_txn.in_memory = true; - let pd_client = Arc::clone(&cluster.pd_client); - pd_client.disable_default_operator(); - - cluster.run(); - - cluster.must_transfer_leader(1, new_peer(1, 1)); - - cluster.must_put(b"k1", b"v1"); - cluster.must_put(b"k3", b"v3"); - - let region = cluster.get_region(b"k1"); - cluster.must_split(®ion, b"k2"); - let left = cluster.get_region(b"k1"); - let right = cluster.get_region(b"k3"); - - cluster.must_transfer_leader(right.id, new_peer(2, 2)); - - // Insert lock l1 into the left region - let snapshot = cluster.must_get_snapshot_of_region(left.id); - let txn_ext = snapshot.txn_ext.unwrap(); - let l1 = PessimisticLock { - primary: b"k1".to_vec().into_boxed_slice(), - start_ts: 10.into(), - ttl: 3000, - for_update_ts: 20.into(), - min_commit_ts: 30.into(), - }; - assert!( - txn_ext - .pessimistic_locks - .write() - .insert(vec![(Key::from_raw(b"k1"), l1)]) - .is_ok() - ); - - // Pause apply and write some data to the left region - fail::cfg("on_handle_apply", "pause").unwrap(); - let (propose_tx, propose_rx) = mpsc::sync_channel(10); - fail::cfg_callback("after_propose", move || propose_tx.send(()).unwrap()).unwrap(); - - let rx = cluster.async_put(b"k1", b"v11").unwrap(); - propose_rx.recv_timeout(Duration::from_secs(2)).unwrap(); - assert!(rx.recv_timeout(Duration::from_millis(200)).is_err()); - - // Then, start merging. PrepareMerge should become pending because applied_index is smaller - // than proposed_index. - cluster.merge_region(left.id, right.id, Callback::None); - propose_rx.recv_timeout(Duration::from_secs(2)).unwrap(); - thread::sleep(Duration::from_millis(200)); - assert!(txn_ext.pessimistic_locks.read().is_writable()); - - // Set disk full error to let PrepareMerge fail. (Set both peer to full to avoid transferring leader) - fail::cfg("disk_already_full_peer_1", "return").unwrap(); - fail::cfg("disk_already_full_peer_2", "return").unwrap(); - fail::remove("on_handle_apply"); - let res = rx.recv_timeout(Duration::from_secs(1)).unwrap(); - assert!(!res.get_header().has_error(), "{:?}", res); - - propose_rx.recv_timeout(Duration::from_secs(2)).unwrap(); - fail::remove("disk_already_full_peer_1"); - fail::remove("disk_already_full_peer_2"); - - // Merge should not succeed because the disk is full. - thread::sleep(Duration::from_millis(300)); - cluster.reset_leader_of_region(left.id); - assert_eq!(cluster.get_region(b"k1"), left); - - cluster.must_put(b"k1", b"v12"); -} - -#[test] -fn test_merge_pessimistic_locks_propose_fail() { - let mut cluster = new_server_cluster(0, 2); - configure_for_merge(&mut cluster); - cluster.cfg.pessimistic_txn.pipelined = true; - cluster.cfg.pessimistic_txn.in_memory = true; - let pd_client = Arc::clone(&cluster.pd_client); - pd_client.disable_default_operator(); - - cluster.run(); - - cluster.must_transfer_leader(1, new_peer(1, 1)); - - cluster.must_put(b"k1", b"v1"); - cluster.must_put(b"k3", b"v3"); - - let region = cluster.get_region(b"k1"); - cluster.must_split(®ion, b"k2"); - let left = cluster.get_region(b"k1"); - let right = cluster.get_region(b"k3"); - - // Sending a TransferLeaeder message to make left region fail to propose. - - let snapshot = cluster.must_get_snapshot_of_region(left.id); - let txn_ext = snapshot.ext().get_txn_ext().unwrap().clone(); - let lock = PessimisticLock { - primary: b"k1".to_vec().into_boxed_slice(), - start_ts: 10.into(), - ttl: 3000, - for_update_ts: 20.into(), - min_commit_ts: 30.into(), - }; - assert!( - txn_ext - .pessimistic_locks - .write() - .insert(vec![(Key::from_raw(b"k1"), lock)]) - .is_ok() - ); - - fail::cfg("raft_propose", "pause").unwrap(); - - cluster.merge_region(left.id, right.id, Callback::None); - thread::sleep(Duration::from_millis(200)); - assert_eq!( - txn_ext.pessimistic_locks.read().status, - LocksStatus::MergingRegion - ); - - // With the fail point set, we will fail to propose the locks or the PrepareMerge request. - fail::cfg("raft_propose", "return()").unwrap(); - - // But after that, the pessimistic locks status should remain unchanged. - for _ in 0..5 { - thread::sleep(Duration::from_millis(200)); - if txn_ext.pessimistic_locks.read().status == LocksStatus::Normal { - return; - } - } - panic!( - "pessimistic locks status should return to Normal, but got {:?}", - txn_ext.pessimistic_locks.read().status - ); -} // Testing that when the source peer is destroyed while merging, it should not persist the `merge_state` // thus won't generate gc message to destroy other peers @@ -1743,4 +1380,3 @@ fn test_destroy_source_peer_while_merging() { must_get_equal(&cluster.get_engine(i), b"k5", b"v5"); } } ->>>>>>> 3364702e1... raftstore: only persist merge target if the merge is known to be succeeded (#12251)