diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index 28c0db02eee2..0a6c4e07282e 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -632,7 +632,12 @@ struct Store { stopped: bool, start_time: Option, consistency_check_time: HashMap, - last_unreachable_report: HashMap, + store_reachability: HashMap, +} + +struct StoreReachability { + last_broadcast: Instant, + received_message_count: u64, } pub struct StoreFsm @@ -656,7 +661,7 @@ where stopped: false, start_time: None, consistency_check_time: HashMap::default(), - last_unreachable_report: HashMap::default(), + store_reachability: HashMap::default(), }, receiver: rx, }); @@ -2795,22 +2800,35 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER fn on_store_unreachable(&mut self, store_id: u64) { let now = Instant::now(); let unreachable_backoff = self.ctx.cfg.unreachable_backoff.0; - if self - .fsm - .store - .last_unreachable_report - .get(&store_id) - .map_or(unreachable_backoff, |t| now.saturating_duration_since(*t)) - < unreachable_backoff - { - return; - } + let new_messages = MESSAGE_RECV_BY_STORE + .with_label_values(&[&format!("{}", store_id)]) + .get(); + match self.fsm.store.store_reachability.entry(store_id) { + HashMapEntry::Vacant(x) => { + x.insert(StoreReachability { + last_broadcast: now, + received_message_count: new_messages, + }); + } + HashMapEntry::Occupied(x) => { + let ob = x.into_mut(); + if now.saturating_duration_since(ob.last_broadcast) < unreachable_backoff + // If there are no new messages come from `store_id`, it's not + // necessary to do redundant broadcasts. + || (new_messages <= ob.received_message_count && new_messages > 0) + { + return; + } + ob.last_broadcast = now; + ob.received_message_count = new_messages; + } + }; + info!( "broadcasting unreachable"; "store_id" => self.fsm.store.id, "unreachable_store_id" => store_id, ); - self.fsm.store.last_unreachable_report.insert(store_id, now); // It's possible to acquire the lock and only send notification to // involved regions. However loop over all the regions can take a // lot of time, which may block other operations. diff --git a/components/raftstore/src/store/metrics.rs b/components/raftstore/src/store/metrics.rs index b0f44c30c0f0..57f84cf662a3 100644 --- a/components/raftstore/src/store/metrics.rs +++ b/components/raftstore/src/store/metrics.rs @@ -794,4 +794,11 @@ lazy_static! { "Total snapshot generate limit used", ) .unwrap(); + + pub static ref MESSAGE_RECV_BY_STORE: IntCounterVec = register_int_counter_vec!( + "tikv_raftstore_message_recv_by_store", + "Messages received by store", + &["store"] + ) + .unwrap(); } diff --git a/components/test_raftstore-v2/src/server.rs b/components/test_raftstore-v2/src/server.rs new file mode 100644 index 000000000000..85941088e2e5 --- /dev/null +++ b/components/test_raftstore-v2/src/server.rs @@ -0,0 +1,1014 @@ +// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. + +use std::{ + path::Path, + sync::{Arc, Mutex, RwLock}, + thread, + time::Duration, +}; + +use api_version::{dispatch_api_version, KvFormat}; +use causal_ts::CausalTsProviderImpl; +use collections::{HashMap, HashSet}; +use concurrency_manager::ConcurrencyManager; +use encryption_export::DataKeyManager; +use engine_rocks::RocksEngine; +use engine_test::raft::RaftTestEngine; +use engine_traits::{KvEngine, RaftEngine, TabletRegistry}; +use futures::{executor::block_on, Future}; +use grpcio::{ChannelBuilder, EnvBuilder, Environment, Error as GrpcError, Service}; +use grpcio_health::HealthService; +use kvproto::{ + deadlock_grpc::create_deadlock, + debugpb_grpc::DebugClient, + diagnosticspb_grpc::create_diagnostics, + import_sstpb_grpc::create_import_sst, + kvrpcpb::{ApiVersion, Context}, + metapb, + raft_cmdpb::RaftCmdResponse, + raft_serverpb::RaftMessage, + tikvpb_grpc::TikvClient, +}; +use pd_client::PdClient; +use raftstore::{ + coprocessor::CoprocessorHost, + errors::Error as RaftError, + store::{ + region_meta, AutoSplitController, CheckLeaderRunner, FlowStatsReporter, ReadStats, + RegionSnapshot, TabletSnapManager, WriteStats, + }, + RegionInfoAccessor, +}; +use raftstore_v2::{router::RaftRouter, StateStorage, StoreMeta, StoreRouter}; +use resource_control::ResourceGroupManager; +use resource_metering::{CollectorRegHandle, ResourceTagFactory}; +use security::SecurityManager; +use slog_global::debug; +use tempfile::TempDir; +use test_pd_client::TestPdClient; +use test_raftstore::{filter_send, AddressMap, Config, Filter}; +use tikv::{ + coprocessor, coprocessor_v2, + import::{ImportSstService, SstImporter}, + read_pool::ReadPool, + server::{ + gc_worker::GcWorker, load_statistics::ThreadLoadPool, lock_manager::LockManager, + raftkv::ReplicaReadLockChecker, resolve, service::DiagnosticsService, ConnectionBuilder, + Error, Extension, NodeV2, PdStoreAddrResolver, RaftClient, RaftKv2, Result as ServerResult, + Server, ServerTransport, + }, + storage::{ + self, + kv::{FakeExtension, LocalTablets, RaftExtension, SnapContext}, + txn::flow_controller::{EngineFlowController, FlowController}, + Engine, Storage, + }, +}; +use tikv_util::{ + box_err, + config::VersionTrack, + quota_limiter::QuotaLimiter, + sys::thread::ThreadBuildWrapper, + thd_name, + worker::{Builder as WorkerBuilder, LazyWorker}, + Either, HandyRwLock, +}; +use tokio::runtime::Builder as TokioBuilder; +use txn_types::TxnExtraScheduler; + +use crate::{Cluster, RaftStoreRouter, SimulateTransport, Simulator, SnapshotRouter}; + +#[derive(Clone)] +struct DummyReporter; + +impl FlowStatsReporter for DummyReporter { + fn report_read_stats(&self, _read_stats: ReadStats) {} + fn report_write_stats(&self, _write_stats: WriteStats) {} +} + +type SimulateRaftExtension = as Engine>::RaftExtension; +type SimulateStoreTransport = SimulateTransport>; +type SimulateServerTransport = + SimulateTransport, PdStoreAddrResolver>>; + +pub type SimulateEngine = RaftKv2; + +// TestRaftKvv2 behaves the same way with RaftKv2, except that it has filters +// that can mock various network conditions. +#[derive(Clone)] +pub struct TestRaftKv2 { + raftkv: SimulateEngine, + filters: Arc>>>, +} + +impl TestRaftKv2 { + pub fn new( + raftkv: SimulateEngine, + filters: Arc>>>, + ) -> TestRaftKv2 { + TestRaftKv2 { raftkv, filters } + } + + pub fn set_txn_extra_scheduler(&mut self, txn_extra_scheduler: Arc) { + self.raftkv.set_txn_extra_scheduler(txn_extra_scheduler); + } +} + +impl Engine for TestRaftKv2 { + type Snap = RegionSnapshot; + type Local = EK; + + fn kv_engine(&self) -> Option { + self.raftkv.kv_engine() + } + + type RaftExtension = TestExtension; + fn raft_extension(&self) -> Self::RaftExtension { + TestExtension::new(self.raftkv.raft_extension(), self.filters.clone()) + } + + fn modify_on_kv_engine( + &self, + region_modifies: HashMap>, + ) -> storage::kv::Result<()> { + self.raftkv.modify_on_kv_engine(region_modifies) + } + + type SnapshotRes = as Engine>::SnapshotRes; + fn async_snapshot(&mut self, ctx: SnapContext<'_>) -> Self::SnapshotRes { + self.raftkv.async_snapshot(ctx) + } + + type WriteRes = as Engine>::WriteRes; + fn async_write( + &self, + ctx: &Context, + batch: storage::kv::WriteData, + subscribed: u8, + on_applied: Option, + ) -> Self::WriteRes { + self.raftkv.async_write(ctx, batch, subscribed, on_applied) + } + + #[inline] + fn precheck_write_with_ctx(&self, ctx: &Context) -> storage::kv::Result<()> { + self.raftkv.precheck_write_with_ctx(ctx) + } + + #[inline] + fn schedule_txn_extra(&self, txn_extra: txn_types::TxnExtra) { + self.raftkv.schedule_txn_extra(txn_extra) + } +} + +#[derive(Clone)] +pub struct TestExtension { + extension: Extension, + filters: Arc>>>, +} + +impl TestExtension { + pub fn new( + extension: Extension, + filters: Arc>>>, + ) -> Self { + TestExtension { extension, filters } + } +} + +impl RaftExtension for TestExtension { + fn feed(&self, msg: RaftMessage, key_message: bool) { + let send = |msg| -> raftstore::Result<()> { + self.extension.feed(msg, key_message); + Ok(()) + }; + + let _ = filter_send(&self.filters, msg, send); + } + + #[inline] + fn report_reject_message(&self, region_id: u64, from_peer_id: u64) { + self.extension + .report_reject_message(region_id, from_peer_id) + } + + #[inline] + fn report_peer_unreachable(&self, region_id: u64, to_peer_id: u64) { + self.extension + .report_peer_unreachable(region_id, to_peer_id) + } + + #[inline] + fn report_store_unreachable(&self, store_id: u64) { + self.extension.report_store_unreachable(store_id) + } + + #[inline] + fn report_snapshot_status( + &self, + region_id: u64, + to_peer_id: u64, + status: raft::SnapshotStatus, + ) { + self.extension + .report_snapshot_status(region_id, to_peer_id, status) + } + + #[inline] + fn report_resolved(&self, store_id: u64, group_id: u64) { + self.extension.report_resolved(store_id, group_id) + } + + #[inline] + fn split( + &self, + region_id: u64, + region_epoch: metapb::RegionEpoch, + split_keys: Vec>, + source: String, + ) -> futures::future::BoxFuture<'static, storage::kv::Result>> { + self.extension + .split(region_id, region_epoch, split_keys, source) + } + + fn query_region( + &self, + region_id: u64, + ) -> futures::future::BoxFuture<'static, storage::kv::Result> { + self.extension.query_region(region_id) + } +} + +pub struct ServerMeta { + node: NodeV2, + server: Server>, + sim_router: SimulateStoreTransport, + sim_trans: SimulateServerTransport, + raw_router: StoreRouter, + gc_worker: GcWorker>, + rsmeter_cleanup: Box, +} + +type PendingServices = Vec Service>>; + +pub struct ServerCluster { + metas: HashMap>, + addrs: AddressMap, + pub storages: HashMap>, + pub region_info_accessors: HashMap, + snap_paths: HashMap, + snap_mgrs: HashMap, + pd_client: Arc, + raft_clients: HashMap>, + conn_builder: ConnectionBuilder, + concurrency_managers: HashMap, + env: Arc, + pub pending_services: HashMap, + pub health_services: HashMap, + pub security_mgr: Arc, + pub txn_extra_schedulers: HashMap>, + pub causal_ts_providers: HashMap>, +} + +impl ServerCluster { + pub fn new(pd_client: Arc) -> Self { + let env = Arc::new( + EnvBuilder::new() + .cq_count(2) + .name_prefix(thd_name!("server-cluster")) + .build(), + ); + let security_mgr = Arc::new(SecurityManager::new(&Default::default()).unwrap()); + let map = AddressMap::default(); + // We don't actually need to handle snapshot message, just create a dead worker + // to make it compile. + let worker = LazyWorker::new("snap-worker"); + let conn_builder = ConnectionBuilder::new( + env.clone(), + Arc::default(), + security_mgr.clone(), + map.clone(), + FakeExtension {}, + worker.scheduler(), + Arc::new(ThreadLoadPool::with_threshold(usize::MAX)), + ); + ServerCluster { + metas: HashMap::default(), + addrs: map, + pd_client, + security_mgr, + storages: HashMap::default(), + region_info_accessors: HashMap::default(), + snap_mgrs: HashMap::default(), + snap_paths: HashMap::default(), + pending_services: HashMap::default(), + health_services: HashMap::default(), + raft_clients: HashMap::default(), + conn_builder, + concurrency_managers: HashMap::default(), + env, + txn_extra_schedulers: HashMap::default(), + causal_ts_providers: HashMap::default(), + } + } + + pub fn get_addr(&self, node_id: u64) -> String { + self.addrs.get(node_id).unwrap() + } + + pub fn run_node_impl( + &mut self, + node_id: u64, + mut cfg: Config, + store_meta: Arc>>, + key_manager: Option>, + raft_engine: RaftTestEngine, + tablet_registry: TabletRegistry, + resource_manager: &Option>, + ) -> ServerResult { + let (snap_mgr, snap_mgs_path) = if !self.snap_mgrs.contains_key(&node_id) { + let tmp = test_util::temp_dir("test_cluster", cfg.prefer_mem); + let snap_path = tmp.path().to_str().unwrap().to_owned(); + ( + TabletSnapManager::new(snap_path, key_manager.clone())?, + Some(tmp), + ) + } else { + (self.snap_mgrs[&node_id].clone(), None) + }; + + let bg_worker = WorkerBuilder::new("background").thread_count(2).create(); + + if cfg.server.addr == "127.0.0.1:0" { + // Now we cache the store address, so here we should re-use last + // listening address for the same store. + if let Some(addr) = self.addrs.get(node_id) { + cfg.server.addr = addr; + } else { + cfg.server.addr = format!("127.0.0.1:{}", test_util::alloc_port()); + } + } + + // Create node. + let mut raft_store = cfg.raft_store.clone(); + raft_store + .validate( + cfg.coprocessor.region_split_size(), + cfg.coprocessor.enable_region_bucket(), + cfg.coprocessor.region_bucket_size, + ) + .unwrap(); + + let mut node = NodeV2::new(&cfg.server, self.pd_client.clone(), None); + node.try_bootstrap_store(&raft_store, &raft_engine).unwrap(); + assert_eq!(node.id(), node_id); + + tablet_registry + .tablet_factory() + .set_state_storage(Arc::new(StateStorage::new( + raft_engine.clone(), + node.router().clone(), + ))); + + let server_cfg = Arc::new(VersionTrack::new(cfg.server.clone())); + + let raft_router = + RaftRouter::new_with_store_meta(node.router().clone(), store_meta.clone()); + + // Create coprocessor. + let mut coprocessor_host = + CoprocessorHost::new(raft_router.store_router().clone(), cfg.coprocessor.clone()); + + let region_info_accessor = RegionInfoAccessor::new(&mut coprocessor_host); + + let sim_router = SimulateTransport::new(raft_router.clone()); + let mut raft_kv_v2 = TestRaftKv2::new( + RaftKv2::new(raft_router.clone(), region_info_accessor.region_leaders()), + sim_router.filters().clone(), + ); + + // Create storage. + let pd_worker = LazyWorker::new("test-pd-worker"); + let pd_sender = raftstore_v2::PdReporter::new( + pd_worker.scheduler(), + slog_global::borrow_global().new(slog::o!()), + ); + let storage_read_pool = ReadPool::from(storage::build_read_pool( + &tikv::config::StorageReadPoolConfig::default_for_test(), + pd_sender, + raft_kv_v2.clone(), + )); + + if let Some(scheduler) = self.txn_extra_schedulers.remove(&node_id) { + raft_kv_v2.set_txn_extra_scheduler(scheduler); + } + + let latest_ts = + block_on(self.pd_client.get_tso()).expect("failed to get timestamp from PD"); + let concurrency_manager = ConcurrencyManager::new(latest_ts); + + let (tx, _rx) = std::sync::mpsc::channel(); + let mut gc_worker = GcWorker::new( + raft_kv_v2.clone(), + tx, + cfg.gc.clone(), + Default::default(), + Arc::new(region_info_accessor.clone()), + ); + gc_worker.start(node_id).unwrap(); + + // todo: resolved ts + + if ApiVersion::V2 == F::TAG { + let casual_ts_provider: Arc = Arc::new( + block_on(causal_ts::BatchTsoProvider::new_opt( + self.pd_client.clone(), + cfg.causal_ts.renew_interval.0, + cfg.causal_ts.alloc_ahead_buffer.0, + cfg.causal_ts.renew_batch_min_size, + cfg.causal_ts.renew_batch_max_size, + )) + .unwrap() + .into(), + ); + self.causal_ts_providers.insert(node_id, casual_ts_provider); + } + + // Start resource metering. + let (res_tag_factory, collector_reg_handle, rsmeter_cleanup) = + self.init_resource_metering(&cfg.resource_metering); + + let check_leader_runner = CheckLeaderRunner::new(store_meta, coprocessor_host.clone()); + let check_leader_scheduler = bg_worker.start("check-leader", check_leader_runner); + + let mut lock_mgr = LockManager::new(&cfg.pessimistic_txn); + let quota_limiter = Arc::new(QuotaLimiter::new( + cfg.quota.foreground_cpu_time, + cfg.quota.foreground_write_bandwidth, + cfg.quota.foreground_read_bandwidth, + cfg.quota.background_cpu_time, + cfg.quota.background_write_bandwidth, + cfg.quota.background_read_bandwidth, + cfg.quota.max_delay_duration, + cfg.quota.enable_auto_tune, + )); + + let casual_ts_provider = self.get_causal_ts_provider(node_id); + let store = Storage::<_, _, F>::from_engine( + raft_kv_v2.clone(), + &cfg.storage, + storage_read_pool.handle(), + lock_mgr.clone(), + concurrency_manager.clone(), + lock_mgr.get_storage_dynamic_configs(), + Arc::new(FlowController::Singleton(EngineFlowController::empty())), + DummyReporter, + res_tag_factory.clone(), + quota_limiter.clone(), + self.pd_client.feature_gate().clone(), + casual_ts_provider.clone(), + resource_manager + .as_ref() + .map(|m| m.derive_controller("scheduler-worker-pool".to_owned(), true)), + )?; + self.storages.insert(node_id, raft_kv_v2.clone()); + + ReplicaReadLockChecker::new(concurrency_manager.clone()).register(&mut coprocessor_host); + + // Create import service. + let importer = { + let dir = Path::new(raft_engine.get_engine_path()).join("../import-sst"); + Arc::new( + SstImporter::new( + &cfg.import, + dir, + key_manager.clone(), + cfg.storage.api_version(), + ) + .unwrap(), + ) + }; + let import_service = ImportSstService::new( + cfg.import.clone(), + cfg.raft_store.raft_entry_max_size, + raft_kv_v2, + LocalTablets::Registry(tablet_registry.clone()), + Arc::clone(&importer), + ); + + // Create deadlock service. + let deadlock_service = lock_mgr.deadlock_service(); + + // Create pd client, snapshot manager, server. + let (resolver, state) = resolve::new_resolver( + Arc::clone(&self.pd_client), + &bg_worker, + store.get_engine().raft_extension(), + ); + let security_mgr = Arc::new(SecurityManager::new(&cfg.security).unwrap()); + let cop_read_pool = ReadPool::from(coprocessor::readpool_impl::build_read_pool_for_test( + &tikv::config::CoprReadPoolConfig::default_for_test(), + store.get_engine(), + )); + let copr = coprocessor::Endpoint::new( + &server_cfg.value().clone(), + cop_read_pool.handle(), + concurrency_manager.clone(), + res_tag_factory, + quota_limiter, + ); + let copr_v2 = coprocessor_v2::Endpoint::new(&cfg.coprocessor_v2); + let mut server = None; + + // Create Debug service. + let debug_thread_pool = Arc::new( + TokioBuilder::new_multi_thread() + .thread_name(thd_name!("debugger")) + .worker_threads(1) + .after_start_wrapper(|| {}) + .before_stop_wrapper(|| {}) + .build() + .unwrap(), + ); + let debug_thread_handle = debug_thread_pool.handle().clone(); + let diag_service = DiagnosticsService::new( + debug_thread_handle, + cfg.log.file.filename.clone(), + cfg.slow_log_file.clone(), + ); + + let health_service = HealthService::default(); + + for _ in 0..100 { + let mut svr = Server::new( + node_id, + &server_cfg, + &security_mgr, + store.clone(), + copr.clone(), + copr_v2.clone(), + resolver.clone(), + Either::Right(snap_mgr.clone()), + gc_worker.clone(), + check_leader_scheduler.clone(), + self.env.clone(), + None, + debug_thread_pool.clone(), + health_service.clone(), + resource_manager.clone(), + ) + .unwrap(); + svr.register_service(create_diagnostics(diag_service.clone())); + svr.register_service(create_deadlock(deadlock_service.clone())); + svr.register_service(create_import_sst(import_service.clone())); + if let Some(svcs) = self.pending_services.get(&node_id) { + for fact in svcs { + svr.register_service(fact()); + } + } + match svr.build_and_bind() { + Ok(_) => { + server = Some(svr); + break; + } + Err(Error::Grpc(GrpcError::BindFail(ref addr, ref port))) => { + // Servers may meet the error, when we restart them. + debug!("fail to create a server: bind fail {:?}", (addr, port)); + thread::sleep(Duration::from_millis(100)); + continue; + } + Err(ref e) => panic!("fail to create a server: {:?}", e), + } + } + let mut server = server.unwrap(); + let addr = server.listening_addr(); + assert_eq!(addr.clone().to_string(), node.store().address); + cfg.server.addr = format!("{}", addr); + let trans = server.transport(); + let simulate_trans = SimulateTransport::new(trans); + let server_cfg = Arc::new(VersionTrack::new(cfg.server.clone())); + + // Register the role change observer of the lock manager. + lock_mgr.register_detector_role_change_observer(&mut coprocessor_host); + + let pessimistic_txn_cfg = cfg.tikv.pessimistic_txn; + node.start( + raft_engine, + tablet_registry.clone(), + &raft_router, + simulate_trans.clone(), + snap_mgr.clone(), + concurrency_manager.clone(), + casual_ts_provider, + coprocessor_host, + AutoSplitController::default(), + collector_reg_handle, + bg_worker, + pd_worker, + Arc::new(VersionTrack::new(raft_store)), + &state, + importer, + key_manager, + )?; + assert!(node_id == 0 || node_id == node.id()); + let node_id = node.id(); + self.snap_mgrs.insert(node_id, snap_mgr); + if let Some(tmp) = snap_mgs_path { + self.snap_paths.insert(node_id, tmp); + } + self.region_info_accessors + .insert(node_id, region_info_accessor); + // todo: importer + self.health_services.insert(node_id, health_service); + + lock_mgr + .start( + node.id(), + Arc::clone(&self.pd_client), + resolver, + Arc::clone(&security_mgr), + &pessimistic_txn_cfg, + ) + .unwrap(); + + server + .start(server_cfg, security_mgr, tablet_registry) + .unwrap(); + + self.metas.insert( + node_id, + ServerMeta { + raw_router: raft_router.store_router().clone(), + node, + server, + sim_router, + gc_worker, + sim_trans: simulate_trans, + rsmeter_cleanup, + }, + ); + self.addrs.insert(node_id, format!("{}", addr)); + self.concurrency_managers + .insert(node_id, concurrency_manager); + + let client = RaftClient::new(node_id, self.conn_builder.clone()); + self.raft_clients.insert(node_id, client); + Ok(node_id) + } + + pub fn get_gc_worker(&self, node_id: u64) -> &GcWorker> { + &self.metas.get(&node_id).unwrap().gc_worker + } + + pub fn get_causal_ts_provider(&self, node_id: u64) -> Option> { + self.causal_ts_providers.get(&node_id).cloned() + } + + fn init_resource_metering( + &self, + cfg: &resource_metering::Config, + ) -> (ResourceTagFactory, CollectorRegHandle, Box) { + let (_, collector_reg_handle, resource_tag_factory, recorder_worker) = + resource_metering::init_recorder(cfg.precision.as_millis()); + let (_, data_sink_reg_handle, reporter_worker) = + resource_metering::init_reporter(cfg.clone(), collector_reg_handle.clone()); + let (_, single_target_worker) = resource_metering::init_single_target( + cfg.receiver_address.clone(), + Arc::new(Environment::new(2)), + data_sink_reg_handle, + ); + + ( + resource_tag_factory, + collector_reg_handle, + Box::new(move || { + single_target_worker.stop_worker(); + reporter_worker.stop_worker(); + recorder_worker.stop_worker(); + }), + ) + } + + pub fn get_concurrency_manager(&self, node_id: u64) -> ConcurrencyManager { + self.concurrency_managers.get(&node_id).unwrap().clone() + } +} + +impl Simulator for ServerCluster { + fn get_node_ids(&self) -> HashSet { + self.metas.keys().cloned().collect() + } + + fn add_send_filter(&mut self, node_id: u64, filter: Box) { + self.metas + .get_mut(&node_id) + .unwrap() + .sim_trans + .add_filter(filter); + } + + fn clear_send_filters(&mut self, node_id: u64) { + self.metas + .get_mut(&node_id) + .unwrap() + .sim_trans + .clear_filters(); + } + + fn add_recv_filter(&mut self, node_id: u64, filter: Box) { + self.metas + .get_mut(&node_id) + .unwrap() + .sim_router + .add_filter(filter); + } + + fn clear_recv_filters(&mut self, node_id: u64) { + self.metas + .get_mut(&node_id) + .unwrap() + .sim_router + .clear_filters(); + } + + fn run_node( + &mut self, + node_id: u64, + cfg: Config, + store_meta: Arc>>, + key_manager: Option>, + raft_engine: RaftTestEngine, + tablet_registry: TabletRegistry, + resource_manager: &Option>, + ) -> ServerResult { + dispatch_api_version!( + cfg.storage.api_version(), + self.run_node_impl::( + node_id, + cfg, + store_meta, + key_manager, + raft_engine, + tablet_registry, + resource_manager + ) + ) + } + + fn stop_node(&mut self, node_id: u64) { + if let Some(mut meta) = self.metas.remove(&node_id) { + meta.server.stop().unwrap(); + meta.node.stop(); + // // resolved ts worker started, let's stop it + // if let Some(worker) = meta.rts_worker { + // worker.stop_worker(); + // } + (meta.rsmeter_cleanup)(); + } + self.storages.remove(&node_id); + let _ = self.raft_clients.remove(&node_id); + } + + fn async_snapshot( + &mut self, + request: kvproto::raft_cmdpb::RaftCmdRequest, + ) -> impl Future, RaftCmdResponse>> + Send + { + let node_id = request.get_header().get_peer().get_store_id(); + let mut router = match self.metas.get(&node_id) { + None => { + let mut resp = RaftCmdResponse::default(); + let e: RaftError = box_err!("missing sender for store {}", node_id); + resp.mut_header().set_error(e.into()); + // return async move {Err(resp)}; + unreachable!() + } + Some(meta) => meta.sim_router.clone(), + }; + + router.snapshot(request) + } + + fn async_peer_msg_on_node( + &self, + node_id: u64, + region_id: u64, + msg: raftstore_v2::router::PeerMsg, + ) -> raftstore::Result<()> { + let router = match self.metas.get(&node_id) { + None => return Err(box_err!("missing sender for store {}", node_id)), + Some(meta) => meta.sim_router.clone(), + }; + + router.send_peer_msg(region_id, msg) + } + + fn send_raft_msg(&mut self, msg: RaftMessage) -> raftstore::Result<()> { + let from_store = msg.get_from_peer().store_id; + assert_ne!(from_store, 0); + if let Some(client) = self.raft_clients.get_mut(&from_store) { + client.send(msg).unwrap(); + client.flush(); + } + Ok(()) + } + + fn get_router(&self, node_id: u64) -> Option> { + self.metas.get(&node_id).map(|m| m.raw_router.clone()) + } + + fn get_snap_dir(&self, node_id: u64) -> String { + self.snap_mgrs[&node_id] + .root_path() + .to_str() + .unwrap() + .to_owned() + } + + fn get_snap_mgr(&self, node_id: u64) -> &TabletSnapManager { + self.snap_mgrs.get(&node_id).unwrap() + } +} + +impl Cluster, EK> { + pub fn must_get_snapshot_of_region(&mut self, region_id: u64) -> RegionSnapshot { + let mut try_snapshot = || -> Option> { + let leader = self.leader_of_region(region_id)?; + let store_id = leader.store_id; + let epoch = self.get_region_epoch(region_id); + let mut ctx = Context::default(); + ctx.set_region_id(region_id); + ctx.set_peer(leader); + ctx.set_region_epoch(epoch); + + let mut storage = self.sim.rl().storages.get(&store_id).unwrap().clone(); + let snap_ctx = SnapContext { + pb_ctx: &ctx, + ..Default::default() + }; + storage.snapshot(snap_ctx).ok() + }; + for _ in 0..10 { + if let Some(snapshot) = try_snapshot() { + return snapshot; + } + thread::sleep(Duration::from_millis(200)); + } + panic!("failed to get snapshot of region {}", region_id); + } + + pub fn get_addr(&self, node_id: u64) -> String { + self.sim.rl().get_addr(node_id) + } + + pub fn get_security_mgr(&self) -> Arc { + self.sim.rl().security_mgr.clone() + } +} + +pub fn new_server_cluster( + id: u64, + count: usize, +) -> Cluster, RocksEngine> { + let pd_client = Arc::new(TestPdClient::new(id, false)); + let sim = Arc::new(RwLock::new(ServerCluster::new(Arc::clone(&pd_client)))); + Cluster::new( + id, + count, + sim, + pd_client, + ApiVersion::V1, + Box::new(crate::create_test_engine), + ) +} + +pub fn new_incompatible_server_cluster( + id: u64, + count: usize, +) -> Cluster, RocksEngine> { + let pd_client = Arc::new(TestPdClient::new(id, true)); + let sim = Arc::new(RwLock::new(ServerCluster::new(Arc::clone(&pd_client)))); + Cluster::new( + id, + count, + sim, + pd_client, + ApiVersion::V1, + Box::new(crate::create_test_engine), + ) +} + +pub fn new_server_cluster_with_api_ver( + id: u64, + count: usize, + api_ver: ApiVersion, +) -> Cluster, RocksEngine> { + let pd_client = Arc::new(TestPdClient::new(id, false)); + let sim = Arc::new(RwLock::new(ServerCluster::new(Arc::clone(&pd_client)))); + Cluster::new( + id, + count, + sim, + pd_client, + api_ver, + Box::new(crate::create_test_engine), + ) +} + +pub fn must_new_cluster_and_kv_client() -> ( + Cluster, RocksEngine>, + TikvClient, + Context, +) { + must_new_cluster_and_kv_client_mul(1) +} + +pub fn must_new_cluster_and_kv_client_mul( + count: usize, +) -> ( + Cluster, RocksEngine>, + TikvClient, + Context, +) { + let (cluster, leader, ctx) = must_new_cluster_mul(count); + + let env = Arc::new(Environment::new(1)); + let channel = + ChannelBuilder::new(env).connect(&cluster.sim.rl().get_addr(leader.get_store_id())); + let client = TikvClient::new(channel); + + (cluster, client, ctx) +} +pub fn must_new_cluster_mul( + count: usize, +) -> ( + Cluster, RocksEngine>, + metapb::Peer, + Context, +) { + must_new_and_configure_cluster_mul(count, |_| ()) +} + +fn must_new_and_configure_cluster_mul( + count: usize, + mut configure: impl FnMut(&mut Cluster, RocksEngine>), +) -> ( + Cluster, RocksEngine>, + metapb::Peer, + Context, +) { + let mut cluster = new_server_cluster(0, count); + configure(&mut cluster); + cluster.run(); + let region_id = 1; + let leader = cluster.leader_of_region(region_id).unwrap(); + let epoch = cluster.get_region_epoch(region_id); + let mut ctx = Context::default(); + ctx.set_region_id(region_id); + ctx.set_peer(leader.clone()); + ctx.set_region_epoch(epoch); + + (cluster, leader, ctx) +} + +pub fn must_new_and_configure_cluster_and_kv_client( + configure: impl FnMut(&mut Cluster, RocksEngine>), +) -> ( + Cluster, RocksEngine>, + TikvClient, + Context, +) { + let (cluster, leader, ctx) = must_new_and_configure_cluster(configure); + + let env = Arc::new(Environment::new(1)); + let channel = + ChannelBuilder::new(env).connect(&cluster.sim.rl().get_addr(leader.get_store_id())); + let client = TikvClient::new(channel); + + (cluster, client, ctx) +} + +pub fn must_new_and_configure_cluster( + configure: impl FnMut(&mut Cluster, RocksEngine>), +) -> ( + Cluster, RocksEngine>, + metapb::Peer, + Context, +) { + must_new_and_configure_cluster_mul(1, configure) +} + +pub fn must_new_cluster_and_debug_client() -> ( + Cluster, RocksEngine>, + DebugClient, + u64, +) { + let (cluster, leader, _) = must_new_cluster_mul(1); + + let env = Arc::new(Environment::new(1)); + let channel = + ChannelBuilder::new(env).connect(&cluster.sim.rl().get_addr(leader.get_store_id())); + let client = DebugClient::new(channel); + + (cluster, client, leader.get_store_id()) +} diff --git a/components/test_raftstore/src/server.rs b/components/test_raftstore/src/server.rs index ea9868afdbd7..9dfb96a9ea7f 100644 --- a/components/test_raftstore/src/server.rs +++ b/components/test_raftstore/src/server.rs @@ -153,7 +153,8 @@ pub struct ServerCluster { snap_paths: HashMap, snap_mgrs: HashMap, pd_client: Arc, - raft_client: RaftClient, + raft_clients: HashMap>, + conn_builder: ConnectionBuilder, concurrency_managers: HashMap, env: Arc, pub causal_ts_providers: HashMap>, @@ -181,7 +182,6 @@ impl ServerCluster { worker.scheduler(), Arc::new(ThreadLoadPool::with_threshold(usize::MAX)), ); - let raft_client = RaftClient::new(conn_builder); ServerCluster { metas: HashMap::default(), addrs: map, @@ -195,7 +195,8 @@ impl ServerCluster { pending_services: HashMap::default(), coprocessor_hooks: HashMap::default(), health_services: HashMap::default(), - raft_client, + raft_clients: HashMap::default(), + conn_builder, concurrency_managers: HashMap::default(), env, txn_extra_schedulers: HashMap::default(), @@ -634,6 +635,8 @@ impl ServerCluster { self.concurrency_managers .insert(node_id, concurrency_manager); + let client = RaftClient::new(node_id, self.conn_builder.clone()); + self.raft_clients.insert(node_id, client); Ok(node_id) } } @@ -685,6 +688,7 @@ impl Simulator for ServerCluster { } (meta.rsmeter_cleanup)(); } + let _ = self.raft_clients.remove(&node_id); } fn get_node_ids(&self) -> HashSet { @@ -726,8 +730,12 @@ impl Simulator for ServerCluster { } fn send_raft_msg(&mut self, raft_msg: raft_serverpb::RaftMessage) -> Result<()> { - self.raft_client.send(raft_msg).unwrap(); - self.raft_client.flush(); + let from_store = raft_msg.get_from_peer().store_id; + assert_ne!(from_store, 0); + if let Some(client) = self.raft_clients.get_mut(&from_store) { + client.send(raft_msg).unwrap(); + client.flush(); + } Ok(()) } diff --git a/src/server/mod.rs b/src/server/mod.rs index d926ca40b2a3..33713773f4fa 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -34,7 +34,7 @@ pub use self::{ metrics::{CONFIG_ROCKSDB_GAUGE, CPU_CORES_QUOTA_GAUGE, MEM_TRACE_SUM_GAUGE}, node::{create_raft_storage, Node}, proxy::{build_forward_option, get_target_address, Proxy}, - raft_client::{ConnectionBuilder, RaftClient}, + raft_client::{ConnectionBuilder, MetadataSourceStoreId, RaftClient}, raftkv::RaftKv, resolve::{PdStoreAddrResolver, StoreAddrResolver}, server::{Server, GRPC_THREAD_PREFIX}, diff --git a/src/server/raft_client.rs b/src/server/raft_client.rs index fa12600bb98d..f2a58a81aaa1 100644 --- a/src/server/raft_client.rs +++ b/src/server/raft_client.rs @@ -25,8 +25,8 @@ use futures::{ }; use futures_timer::Delay; use grpcio::{ - Channel, ChannelBuilder, ClientCStreamReceiver, ClientCStreamSender, Environment, - RpcStatusCode, WriteFlags, + CallOption, Channel, ChannelBuilder, ClientCStreamReceiver, ClientCStreamSender, Environment, + MetadataBuilder, RpcStatusCode, WriteFlags, }; use kvproto::{ raft_serverpb::{Done, RaftMessage, RaftSnapshotData}, @@ -50,6 +50,21 @@ use crate::server::{ StoreAddrResolver, }; +pub struct MetadataSourceStoreId {} + +impl MetadataSourceStoreId { + pub const KEY: &str = "source_store_id"; + + pub fn parse(value: &[u8]) -> u64 { + let value = std::str::from_utf8(value).unwrap(); + value.parse::().unwrap() + } + + pub fn format(id: u64) -> String { + format!("{}", id) + } +} + static CONN_ID: AtomicI32 = AtomicI32::new(0); const _ON_RESOLVE_FP: &str = "transport_snapshot_on_resolve"; @@ -612,6 +627,7 @@ impl ConnectionBuilder { /// StreamBackEnd watches lifetime of a connection and handles reconnecting, /// spawn new RPC. struct StreamBackEnd { + self_store_id: u64, store_id: u64, queue: Arc, builder: ConnectionBuilder, @@ -693,7 +709,8 @@ where } fn batch_call(&self, client: &TikvClient, addr: String) -> oneshot::Receiver { - let (batch_sink, batch_stream) = client.batch_raft().unwrap(); + let (batch_sink, batch_stream) = client.batch_raft_opt(self.get_call_option()).unwrap(); + let (tx, rx) = oneshot::channel(); let mut call = RaftCall { sender: AsyncRaftSender { @@ -717,7 +734,8 @@ where } fn call(&self, client: &TikvClient, addr: String) -> oneshot::Receiver { - let (sink, stream) = client.raft().unwrap(); + let (sink, stream) = client.raft_opt(self.get_call_option()).unwrap(); + let (tx, rx) = oneshot::channel(); let mut call = RaftCall { sender: AsyncRaftSender { @@ -738,6 +756,15 @@ where }); rx } + + fn get_call_option(&self) -> CallOption { + let mut metadata = MetadataBuilder::with_capacity(1); + let value = MetadataSourceStoreId::format(self.self_store_id); + metadata + .add_str(MetadataSourceStoreId::KEY, &value) + .unwrap(); + CallOption::default().headers(metadata.build()) + } } async fn maybe_backoff(backoff: Duration, last_wake_time: &mut Option) { @@ -778,7 +805,6 @@ async fn start( R: RaftExtension + Unpin + Send + 'static, { let mut last_wake_time = None; - let mut first_time = true; let backoff_duration = back_end.builder.cfg.value().raft_client_max_backoff.0; let mut addr_channel = None; loop { @@ -824,15 +850,10 @@ async fn start( // shutdown. back_end.clear_pending_message("unreachable"); - // broadcast is time consuming operation which would blocks raftstore, so report - // unreachable only once until being connected again. - if first_time { - first_time = false; - back_end - .builder - .router - .report_store_unreachable(back_end.store_id); - } + back_end + .builder + .router + .report_store_unreachable(back_end.store_id); continue; } else { debug!("connection established"; "store_id" => back_end.store_id, "addr" => %addr); @@ -864,7 +885,6 @@ async fn start( .router .report_store_unreachable(back_end.store_id); addr_channel = None; - first_time = false; } } } @@ -922,6 +942,7 @@ struct CachedQueue { /// raft_client.flush(); /// ``` pub struct RaftClient { + self_store_id: u64, pool: Arc>, cache: LruCache<(u64, usize), CachedQueue>, need_flush: Vec<(u64, usize)>, @@ -936,13 +957,14 @@ where S: StoreAddrResolver + Send + 'static, R: RaftExtension + Unpin + Send + 'static, { - pub fn new(builder: ConnectionBuilder) -> Self { + pub fn new(self_store_id: u64, builder: ConnectionBuilder) -> Self { let future_pool = Arc::new( yatp::Builder::new(thd_name!("raft-stream")) .max_thread_count(1) .build_future_pool(), ); RaftClient { + self_store_id, pool: Arc::default(), cache: LruCache::with_capacity_and_sample(0, 7), need_flush: vec![], @@ -978,6 +1000,7 @@ where queue.set_conn_state(ConnState::Paused); } let back_end = StreamBackEnd { + self_store_id: self.self_store_id, store_id, queue: queue.clone(), builder: self.builder.clone(), @@ -1139,6 +1162,7 @@ where { fn clone(&self) -> Self { RaftClient { + self_store_id: self.self_store_id, pool: self.pool.clone(), cache: LruCache::with_capacity_and_sample(0, 7), need_flush: vec![], diff --git a/src/server/server.rs b/src/server/server.rs index 1921483e37b6..b23c722db4db 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -172,7 +172,7 @@ where lazy_worker.scheduler(), grpc_thread_load.clone(), ); - let raft_client = RaftClient::new(conn_builder); + let raft_client = RaftClient::new(store_id, conn_builder); let trans = ServerTransport::new(raft_client); health_service.set_serving_status("", ServingStatus::NotServing); diff --git a/src/server/service/kv.rs b/src/server/service/kv.rs index db50dfe459e8..7b33ce51630a 100644 --- a/src/server/service/kv.rs +++ b/src/server/service/kv.rs @@ -21,7 +21,7 @@ use raft::eraftpb::MessageType; use raftstore::{ store::{ memory::{MEMTRACE_APPLYS, MEMTRACE_RAFT_ENTRIES, MEMTRACE_RAFT_MESSAGES}, - metrics::RAFT_ENTRIES_CACHES_GAUGE, + metrics::{MESSAGE_RECV_BY_STORE, RAFT_ENTRIES_CACHES_GAUGE}, CheckLeaderTask, }, Error as RaftStoreError, Result as RaftStoreResult, @@ -44,7 +44,7 @@ use crate::{ coprocessor_v2, forward_duplex, forward_unary, log_net_error, server::{ gc_worker::GcWorker, load_statistics::ThreadLoadPool, metrics::*, snap::Task as SnapTask, - Error, Proxy, Result as ServerResult, + Error, MetadataSourceStoreId, Proxy, Result as ServerResult, }, storage::{ self, @@ -155,9 +155,23 @@ impl Service { ch.report_reject_message(id, peer_id); return Ok(()); } + + fail_point!("receive_raft_message_from_outside"); ch.feed(msg, false); Ok(()) } + + fn get_store_id_from_metadata(ctx: &RpcContext<'_>) -> Option { + let metadata = ctx.request_headers(); + for i in 0..metadata.len() { + let (key, value) = metadata.get(i).unwrap(); + if key == MetadataSourceStoreId::KEY { + let store_id = MetadataSourceStoreId::parse(value); + return Some(store_id); + } + } + None + } } macro_rules! handle_request { @@ -589,6 +603,14 @@ impl Tikv for Service { stream: RequestStream, sink: ClientStreamingSink, ) { + let source_store_id = Self::get_store_id_from_metadata(&ctx); + let message_received = + source_store_id.map(|x| MESSAGE_RECV_BY_STORE.with_label_values(&[&format!("{}", x)])); + info!( + "raft RPC is called, new gRPC stream established"; + "source_store_id" => ?source_store_id, + ); + let store_id = self.store_id; let ch = self.storage.get_engine().raft_extension().clone(); let reject_messages_on_memory_ratio = self.reject_messages_on_memory_ratio; @@ -605,6 +627,9 @@ impl Tikv for Service { // `StoreNotMatch` to let tikv to resolve a correct address from PD return Err(Error::from(err)); } + if let Some(ref counter) = message_received { + counter.inc(); + } } Ok::<(), Error>(()) }; @@ -631,7 +656,14 @@ impl Tikv for Service { stream: RequestStream, sink: ClientStreamingSink, ) { - info!("batch_raft RPC is called, new gRPC stream established"); + let source_store_id = Self::get_store_id_from_metadata(&ctx); + let message_received = + source_store_id.map(|x| MESSAGE_RECV_BY_STORE.with_label_values(&[&format!("{}", x)])); + info!( + "batch_raft RPC is called, new gRPC stream established"; + "source_store_id" => ?source_store_id, + ); + let store_id = self.store_id; let ch = self.storage.get_engine().raft_extension().clone(); let reject_messages_on_memory_ratio = self.reject_messages_on_memory_ratio; @@ -652,6 +684,9 @@ impl Tikv for Service { return Err(Error::from(err)); } } + if let Some(ref counter) = message_received { + counter.inc_by(len as u64); + } } Ok::<(), Error>(()) }; diff --git a/tests/failpoints/cases/test_hibernate.rs b/tests/failpoints/cases/test_hibernate.rs index 6bbed4ac6418..4dc404e58b83 100644 --- a/tests/failpoints/cases/test_hibernate.rs +++ b/tests/failpoints/cases/test_hibernate.rs @@ -81,3 +81,54 @@ fn test_break_leadership_on_restart() { // incorrectly. rx.recv_timeout(Duration::from_secs(2)).unwrap_err(); } + +// This case creates a cluster with 3 TiKV instances, and then wait all peers +// hibernate. +// +// After that, propose a command and stop the leader node immediately. +// With failpoint `receive_raft_message_from_outside`, we can make the proposal +// reach 2 followers *after* `StoreUnreachable` is broadcasted. +// +// 2 followers may become GroupState::Chaos after `StoreUnreachable` is +// received, and become `GroupState::Ordered` after the proposal is received. +// But they should keep wakeful for a while. +#[test] +fn test_store_disconnect_with_hibernate() { + let mut cluster = new_server_cluster(0, 3); + let base_tick_ms = 50; + cluster.cfg.raft_store.raft_base_tick_interval = ReadableDuration::millis(base_tick_ms); + cluster.cfg.raft_store.raft_heartbeat_ticks = 2; + cluster.cfg.raft_store.raft_election_timeout_ticks = 10; + cluster.cfg.raft_store.unreachable_backoff = ReadableDuration::millis(500); + cluster.cfg.server.raft_client_max_backoff = ReadableDuration::millis(200); + // So the random election timeout will always be 10, which makes the case more + // stable. + cluster.cfg.raft_store.raft_min_election_timeout_ticks = 10; + cluster.cfg.raft_store.raft_max_election_timeout_ticks = 11; + configure_for_hibernate(&mut cluster); + cluster.pd_client.disable_default_operator(); + let r = cluster.run_conf_change(); + cluster.pd_client.must_add_peer(r, new_peer(2, 2)); + cluster.pd_client.must_add_peer(r, new_peer(3, 3)); + + cluster.must_put(b"k1", b"v1"); + must_get_equal(&cluster.get_engine(2), b"k1", b"v1"); + must_get_equal(&cluster.get_engine(3), b"k1", b"v1"); + + // Wait until all peers of region 1 hibernate. + thread::sleep(Duration::from_millis(base_tick_ms * 30)); + + // Stop the region leader. + fail::cfg("receive_raft_message_from_outside", "pause").unwrap(); + let _ = cluster.async_put(b"k2", b"v2").unwrap(); + cluster.stop_node(1); + + // Wait for a while so that the failpoint can be triggered on followers. + thread::sleep(Duration::from_millis(100)); + fail::remove("receive_raft_message_from_outside"); + + // Wait for a while. Peers of region 1 shouldn't hibernate. + thread::sleep(Duration::from_millis(base_tick_ms * 30)); + must_get_equal(&cluster.get_engine(2), b"k2", b"v2"); + must_get_equal(&cluster.get_engine(3), b"k2", b"v2"); +} diff --git a/tests/integrations/raftstore/test_tombstone.rs b/tests/integrations/raftstore/test_tombstone.rs index 3d7fc235cadc..972a75212b48 100644 --- a/tests/integrations/raftstore/test_tombstone.rs +++ b/tests/integrations/raftstore/test_tombstone.rs @@ -80,7 +80,7 @@ fn test_tombstone(cluster: &mut Cluster) { raft_msg.set_region_id(r1); // Use an invalid from peer to ignore gc peer message. - raft_msg.set_from_peer(new_peer(0, 0)); + raft_msg.set_from_peer(new_peer(100, 100)); raft_msg.set_to_peer(new_peer(2, 2)); raft_msg.mut_region_epoch().set_conf_ver(0); raft_msg.mut_region_epoch().set_version(0); diff --git a/tests/integrations/server/raft_client.rs b/tests/integrations/server/raft_client.rs index fa7a86f12c4f..aad9ab7ceb10 100644 --- a/tests/integrations/server/raft_client.rs +++ b/tests/integrations/server/raft_client.rs @@ -19,7 +19,7 @@ use kvproto::{ tikvpb::BatchRaftMessage, }; use raft::eraftpb::Entry; -use raftstore::{errors::DiscardReason, store::StoreMsg}; +use raftstore::errors::DiscardReason; use tikv::server::{ self, load_statistics::ThreadLoadPool, raftkv::RaftRouterWrap, resolve, resolve::Callback, Config, ConnectionBuilder, RaftClient, StoreAddrResolver, TestRaftStoreRouter, @@ -28,7 +28,6 @@ use tikv_kv::{FakeExtension, RaftExtension}; use tikv_util::{ config::{ReadableDuration, VersionTrack}, worker::{Builder as WorkerBuilder, LazyWorker}, - Either, }; use super::*; @@ -73,7 +72,7 @@ where worker.scheduler(), loads, ); - RaftClient::new(builder) + RaftClient::new(0, builder) } fn get_raft_client_by_port(port: u16) -> RaftClient { @@ -206,59 +205,6 @@ fn test_raft_client_reconnect() { drop(mock_server); } -#[test] -// Test raft_client reports store unreachable only once until being connected -// again -fn test_raft_client_report_unreachable() { - let msg_count = Arc::new(AtomicUsize::new(0)); - let batch_msg_count = Arc::new(AtomicUsize::new(0)); - let service = MockKvForRaft::new(Arc::clone(&msg_count), Arc::clone(&batch_msg_count), true); - let (mut mock_server, port) = create_mock_server(service, 60100, 60200).unwrap(); - - let (tx, rx) = mpsc::channel(); - let (significant_msg_sender, _significant_msg_receiver) = mpsc::channel(); - let router = TestRaftStoreRouter::new(tx, significant_msg_sender); - let wrap = RaftRouterWrap::new(router); - let mut raft_client = get_raft_client(wrap, StaticResolver::new(port)); - - // server is disconnected - mock_server.shutdown(); - drop(mock_server); - - raft_client.send(RaftMessage::default()).unwrap(); - let msg = rx.recv_timeout(Duration::from_millis(200)).unwrap(); - if let Either::Right(StoreMsg::StoreUnreachable { store_id }) = msg { - assert_eq!(store_id, 0); - } else { - panic!("expect StoreUnreachable"); - } - // no more unreachable message is sent until it's connected again. - rx.recv_timeout(Duration::from_millis(200)).unwrap_err(); - - // restart the mock server. - let service = MockKvForRaft::new(Arc::clone(&msg_count), batch_msg_count, true); - let mut mock_server = create_mock_server_on(service, port); - - // make sure the connection is connected, otherwise the following sent messages - // may be dropped - std::thread::sleep(Duration::from_millis(200)); - (0..50).for_each(|_| raft_client.send(RaftMessage::default()).unwrap()); - raft_client.flush(); - check_msg_count(500, &msg_count, 50); - - // server is disconnected - mock_server.take().unwrap().shutdown(); - - let msg = rx.recv_timeout(Duration::from_millis(200)).unwrap(); - if let Either::Right(StoreMsg::StoreUnreachable { store_id }) = msg { - assert_eq!(store_id, 0); - } else { - panic!("expect StoreUnreachable"); - } - // no more unreachable message is sent until it's connected again. - rx.recv_timeout(Duration::from_millis(200)).unwrap_err(); -} - #[test] fn test_batch_size_limit() { let msg_count = Arc::new(AtomicUsize::new(0));