Skip to content

Commit

Permalink
raft: peers shouldn't hibernate incorrectly when one node fails (#14574)
Browse files Browse the repository at this point in the history
ref #14547

raft: peers shouldn't hibernate incorrectly when one node fails

Signed-off-by: qupeng <qupeng@pingcap.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
hicqu and ti-chi-bot committed Apr 21, 2023
1 parent 666edee commit 20b75dc
Show file tree
Hide file tree
Showing 11 changed files with 198 additions and 101 deletions.
44 changes: 31 additions & 13 deletions components/raftstore/src/store/fsm/store.rs
Expand Up @@ -678,7 +678,12 @@ struct Store {
stopped: bool,
start_time: Option<Timespec>,
consistency_check_time: HashMap<u64, Instant>,
last_unreachable_report: HashMap<u64, Instant>,
store_reachability: HashMap<u64, StoreReachability>,
}

struct StoreReachability {
last_broadcast: Instant,
received_message_count: u64,
}

pub struct StoreFsm<EK>
Expand All @@ -702,7 +707,7 @@ where
stopped: false,
start_time: None,
consistency_check_time: HashMap::default(),
last_unreachable_report: HashMap::default(),
store_reachability: HashMap::default(),
},
receiver: rx,
});
Expand Down Expand Up @@ -2876,22 +2881,35 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER
fn on_store_unreachable(&mut self, store_id: u64) {
let now = Instant::now();
let unreachable_backoff = self.ctx.cfg.unreachable_backoff.0;
if self
.fsm
.store
.last_unreachable_report
.get(&store_id)
.map_or(unreachable_backoff, |t| now.saturating_duration_since(*t))
< unreachable_backoff
{
return;
}
let new_messages = MESSAGE_RECV_BY_STORE
.with_label_values(&[&format!("{}", store_id)])
.get();
match self.fsm.store.store_reachability.entry(store_id) {
HashMapEntry::Vacant(x) => {
x.insert(StoreReachability {
last_broadcast: now,
received_message_count: new_messages,
});
}
HashMapEntry::Occupied(x) => {
let ob = x.into_mut();
if now.saturating_duration_since(ob.last_broadcast) < unreachable_backoff
// If there are no new messages come from `store_id`, it's not
// necessary to do redundant broadcasts.
|| (new_messages <= ob.received_message_count && new_messages > 0)
{
return;
}
ob.last_broadcast = now;
ob.received_message_count = new_messages;
}
};

info!(
"broadcasting unreachable";
"store_id" => self.fsm.store.id,
"unreachable_store_id" => store_id,
);
self.fsm.store.last_unreachable_report.insert(store_id, now);
// It's possible to acquire the lock and only send notification to
// involved regions. However loop over all the regions can take a
// lot of time, which may block other operations.
Expand Down
7 changes: 7 additions & 0 deletions components/raftstore/src/store/metrics.rs
Expand Up @@ -868,4 +868,11 @@ lazy_static! {
"Total snapshot generate limit used",
)
.unwrap();

pub static ref MESSAGE_RECV_BY_STORE: IntCounterVec = register_int_counter_vec!(
"tikv_raftstore_message_recv_by_store",
"Messages received by store",
&["store"]
)
.unwrap();
}
18 changes: 13 additions & 5 deletions components/test_raftstore-v2/src/server.rs
Expand Up @@ -259,7 +259,8 @@ pub struct ServerCluster<EK: KvEngine> {
snap_paths: HashMap<u64, TempDir>,
snap_mgrs: HashMap<u64, TabletSnapManager>,
pd_client: Arc<TestPdClient>,
raft_client: RaftClient<AddressMap, FakeExtension>,
raft_clients: HashMap<u64, RaftClient<AddressMap, FakeExtension>>,
conn_builder: ConnectionBuilder<AddressMap, FakeExtension>,
concurrency_managers: HashMap<u64, ConcurrencyManager>,
env: Arc<Environment>,
pub pending_services: HashMap<u64, PendingServices>,
Expand Down Expand Up @@ -291,7 +292,6 @@ impl<EK: KvEngine> ServerCluster<EK> {
worker.scheduler(),
Arc::new(ThreadLoadPool::with_threshold(usize::MAX)),
);
let raft_client = RaftClient::new(conn_builder);
ServerCluster {
metas: HashMap::default(),
addrs: map,
Expand All @@ -303,7 +303,8 @@ impl<EK: KvEngine> ServerCluster<EK> {
snap_paths: HashMap::default(),
pending_services: HashMap::default(),
health_services: HashMap::default(),
raft_client,
raft_clients: HashMap::default(),
conn_builder,
concurrency_managers: HashMap::default(),
env,
txn_extra_schedulers: HashMap::default(),
Expand Down Expand Up @@ -650,6 +651,8 @@ impl<EK: KvEngine> ServerCluster<EK> {
self.concurrency_managers
.insert(node_id, concurrency_manager);

let client = RaftClient::new(node_id, self.conn_builder.clone());
self.raft_clients.insert(node_id, client);
Ok(node_id)
}

Expand Down Expand Up @@ -763,6 +766,7 @@ impl<EK: KvEngine> Simulator<EK> for ServerCluster<EK> {
(meta.rsmeter_cleanup)();
}
self.storages.remove(&node_id);
let _ = self.raft_clients.remove(&node_id);
}

fn async_snapshot(
Expand Down Expand Up @@ -800,8 +804,12 @@ impl<EK: KvEngine> Simulator<EK> for ServerCluster<EK> {
}

fn send_raft_msg(&mut self, msg: RaftMessage) -> raftstore::Result<()> {
self.raft_client.send(msg).unwrap();
self.raft_client.flush();
let from_store = msg.get_from_peer().store_id;
assert_ne!(from_store, 0);
if let Some(client) = self.raft_clients.get_mut(&from_store) {
client.send(msg).unwrap();
client.flush();
}
Ok(())
}

Expand Down
18 changes: 13 additions & 5 deletions components/test_raftstore/src/server.rs
Expand Up @@ -154,7 +154,8 @@ pub struct ServerCluster {
snap_paths: HashMap<u64, TempDir>,
snap_mgrs: HashMap<u64, SnapManager>,
pd_client: Arc<TestPdClient>,
raft_client: RaftClient<AddressMap, FakeExtension>,
raft_clients: HashMap<u64, RaftClient<AddressMap, FakeExtension>>,
conn_builder: ConnectionBuilder<AddressMap, FakeExtension>,
concurrency_managers: HashMap<u64, ConcurrencyManager>,
env: Arc<Environment>,
pub causal_ts_providers: HashMap<u64, Arc<CausalTsProviderImpl>>,
Expand Down Expand Up @@ -182,7 +183,6 @@ impl ServerCluster {
worker.scheduler(),
Arc::new(ThreadLoadPool::with_threshold(usize::MAX)),
);
let raft_client = RaftClient::new(conn_builder);
ServerCluster {
metas: HashMap::default(),
addrs: map,
Expand All @@ -196,7 +196,8 @@ impl ServerCluster {
pending_services: HashMap::default(),
coprocessor_hooks: HashMap::default(),
health_services: HashMap::default(),
raft_client,
raft_clients: HashMap::default(),
conn_builder,
concurrency_managers: HashMap::default(),
env,
txn_extra_schedulers: HashMap::default(),
Expand Down Expand Up @@ -645,6 +646,8 @@ impl ServerCluster {
self.concurrency_managers
.insert(node_id, concurrency_manager);

let client = RaftClient::new(node_id, self.conn_builder.clone());
self.raft_clients.insert(node_id, client);
Ok(node_id)
}
}
Expand Down Expand Up @@ -698,6 +701,7 @@ impl Simulator for ServerCluster {
}
(meta.rsmeter_cleanup)();
}
let _ = self.raft_clients.remove(&node_id);
}

fn get_node_ids(&self) -> HashSet<u64> {
Expand Down Expand Up @@ -739,8 +743,12 @@ impl Simulator for ServerCluster {
}

fn send_raft_msg(&mut self, raft_msg: raft_serverpb::RaftMessage) -> Result<()> {
self.raft_client.send(raft_msg).unwrap();
self.raft_client.flush();
let from_store = raft_msg.get_from_peer().store_id;
assert_ne!(from_store, 0);
if let Some(client) = self.raft_clients.get_mut(&from_store) {
client.send(raft_msg).unwrap();
client.flush();
}
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion src/server/mod.rs
Expand Up @@ -34,7 +34,7 @@ pub use self::{
metrics::{CONFIG_ROCKSDB_GAUGE, CPU_CORES_QUOTA_GAUGE, MEM_TRACE_SUM_GAUGE},
node::Node,
proxy::{build_forward_option, get_target_address, Proxy},
raft_client::{ConnectionBuilder, RaftClient},
raft_client::{ConnectionBuilder, MetadataSourceStoreId, RaftClient},
raftkv::RaftKv,
raftkv2::{Extension, NodeV2, RaftKv2},
resolve::{PdStoreAddrResolver, StoreAddrResolver},
Expand Down
56 changes: 40 additions & 16 deletions src/server/raft_client.rs
Expand Up @@ -25,8 +25,8 @@ use futures::{
};
use futures_timer::Delay;
use grpcio::{
Channel, ChannelBuilder, ClientCStreamReceiver, ClientCStreamSender, Environment,
RpcStatusCode, WriteFlags,
CallOption, Channel, ChannelBuilder, ClientCStreamReceiver, ClientCStreamSender, Environment,
MetadataBuilder, RpcStatusCode, WriteFlags,
};
use kvproto::{
raft_serverpb::{Done, RaftMessage, RaftSnapshotData},
Expand All @@ -50,6 +50,21 @@ use crate::server::{
StoreAddrResolver,
};

pub struct MetadataSourceStoreId {}

impl MetadataSourceStoreId {
pub const KEY: &str = "source_store_id";

pub fn parse(value: &[u8]) -> u64 {
let value = std::str::from_utf8(value).unwrap();
value.parse::<u64>().unwrap()
}

pub fn format(id: u64) -> String {
format!("{}", id)
}
}

static CONN_ID: AtomicI32 = AtomicI32::new(0);

const _ON_RESOLVE_FP: &str = "transport_snapshot_on_resolve";
Expand Down Expand Up @@ -616,6 +631,7 @@ impl<S, R> ConnectionBuilder<S, R> {
/// StreamBackEnd watches lifetime of a connection and handles reconnecting,
/// spawn new RPC.
struct StreamBackEnd<S, R> {
self_store_id: u64,
store_id: u64,
queue: Arc<Queue>,
builder: ConnectionBuilder<S, R>,
Expand Down Expand Up @@ -697,7 +713,8 @@ where
}

fn batch_call(&self, client: &TikvClient, addr: String) -> oneshot::Receiver<RaftCallRes> {
let (batch_sink, batch_stream) = client.batch_raft().unwrap();
let (batch_sink, batch_stream) = client.batch_raft_opt(self.get_call_option()).unwrap();

let (tx, rx) = oneshot::channel();
let mut call = RaftCall {
sender: AsyncRaftSender {
Expand All @@ -721,7 +738,8 @@ where
}

fn call(&self, client: &TikvClient, addr: String) -> oneshot::Receiver<RaftCallRes> {
let (sink, stream) = client.raft().unwrap();
let (sink, stream) = client.raft_opt(self.get_call_option()).unwrap();

let (tx, rx) = oneshot::channel();
let mut call = RaftCall {
sender: AsyncRaftSender {
Expand All @@ -742,6 +760,15 @@ where
});
rx
}

fn get_call_option(&self) -> CallOption {
let mut metadata = MetadataBuilder::with_capacity(1);
let value = MetadataSourceStoreId::format(self.self_store_id);
metadata
.add_str(MetadataSourceStoreId::KEY, &value)
.unwrap();
CallOption::default().headers(metadata.build())
}
}

async fn maybe_backoff(backoff: Duration, last_wake_time: &mut Option<Instant>) {
Expand Down Expand Up @@ -782,7 +809,6 @@ async fn start<S, R>(
R: RaftExtension + Unpin + Send + 'static,
{
let mut last_wake_time = None;
let mut first_time = true;
let backoff_duration = back_end.builder.cfg.value().raft_client_max_backoff.0;
let mut addr_channel = None;
loop {
Expand Down Expand Up @@ -828,15 +854,10 @@ async fn start<S, R>(
// shutdown.
back_end.clear_pending_message("unreachable");

// broadcast is time consuming operation which would blocks raftstore, so report
// unreachable only once until being connected again.
if first_time {
first_time = false;
back_end
.builder
.router
.report_store_unreachable(back_end.store_id);
}
back_end
.builder
.router
.report_store_unreachable(back_end.store_id);
continue;
} else {
debug!("connection established"; "store_id" => back_end.store_id, "addr" => %addr);
Expand Down Expand Up @@ -868,7 +889,6 @@ async fn start<S, R>(
.router
.report_store_unreachable(back_end.store_id);
addr_channel = None;
first_time = false;
}
}
}
Expand Down Expand Up @@ -926,6 +946,7 @@ struct CachedQueue {
/// raft_client.flush();
/// ```
pub struct RaftClient<S, R> {
self_store_id: u64,
pool: Arc<Mutex<ConnectionPool>>,
cache: LruCache<(u64, usize), CachedQueue>,
need_flush: Vec<(u64, usize)>,
Expand All @@ -940,13 +961,14 @@ where
S: StoreAddrResolver + Send + 'static,
R: RaftExtension + Unpin + Send + 'static,
{
pub fn new(builder: ConnectionBuilder<S, R>) -> Self {
pub fn new(self_store_id: u64, builder: ConnectionBuilder<S, R>) -> Self {
let future_pool = Arc::new(
yatp::Builder::new(thd_name!("raft-stream"))
.max_thread_count(1)
.build_future_pool(),
);
RaftClient {
self_store_id,
pool: Arc::default(),
cache: LruCache::with_capacity_and_sample(0, 7),
need_flush: vec![],
Expand Down Expand Up @@ -982,6 +1004,7 @@ where
queue.set_conn_state(ConnState::Paused);
}
let back_end = StreamBackEnd {
self_store_id: self.self_store_id,
store_id,
queue: queue.clone(),
builder: self.builder.clone(),
Expand Down Expand Up @@ -1143,6 +1166,7 @@ where
{
fn clone(&self) -> Self {
RaftClient {
self_store_id: self.self_store_id,
pool: self.pool.clone(),
cache: LruCache::with_capacity_and_sample(0, 7),
need_flush: vec![],
Expand Down
2 changes: 1 addition & 1 deletion src/server/server.rs
Expand Up @@ -176,7 +176,7 @@ where
lazy_worker.scheduler(),
grpc_thread_load.clone(),
);
let raft_client = RaftClient::new(conn_builder);
let raft_client = RaftClient::new(store_id, conn_builder);

let trans = ServerTransport::new(raft_client);
health_service.set_serving_status("", ServingStatus::NotServing);
Expand Down

0 comments on commit 20b75dc

Please sign in to comment.