Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#14574
Browse files Browse the repository at this point in the history
ref tikv#14547

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
hicqu authored and ti-chi-bot committed Apr 21, 2023
1 parent a29f525 commit f5a2202
Show file tree
Hide file tree
Showing 11 changed files with 1,199 additions and 96 deletions.
44 changes: 31 additions & 13 deletions components/raftstore/src/store/fsm/store.rs
Expand Up @@ -632,7 +632,12 @@ struct Store {
stopped: bool,
start_time: Option<Timespec>,
consistency_check_time: HashMap<u64, Instant>,
last_unreachable_report: HashMap<u64, Instant>,
store_reachability: HashMap<u64, StoreReachability>,
}

struct StoreReachability {
last_broadcast: Instant,
received_message_count: u64,
}

pub struct StoreFsm<EK>
Expand All @@ -656,7 +661,7 @@ where
stopped: false,
start_time: None,
consistency_check_time: HashMap::default(),
last_unreachable_report: HashMap::default(),
store_reachability: HashMap::default(),
},
receiver: rx,
});
Expand Down Expand Up @@ -2795,22 +2800,35 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER
fn on_store_unreachable(&mut self, store_id: u64) {
let now = Instant::now();
let unreachable_backoff = self.ctx.cfg.unreachable_backoff.0;
if self
.fsm
.store
.last_unreachable_report
.get(&store_id)
.map_or(unreachable_backoff, |t| now.saturating_duration_since(*t))
< unreachable_backoff
{
return;
}
let new_messages = MESSAGE_RECV_BY_STORE
.with_label_values(&[&format!("{}", store_id)])
.get();
match self.fsm.store.store_reachability.entry(store_id) {
HashMapEntry::Vacant(x) => {
x.insert(StoreReachability {
last_broadcast: now,
received_message_count: new_messages,
});
}
HashMapEntry::Occupied(x) => {
let ob = x.into_mut();
if now.saturating_duration_since(ob.last_broadcast) < unreachable_backoff
// If there are no new messages come from `store_id`, it's not
// necessary to do redundant broadcasts.
|| (new_messages <= ob.received_message_count && new_messages > 0)
{
return;
}
ob.last_broadcast = now;
ob.received_message_count = new_messages;
}
};

info!(
"broadcasting unreachable";
"store_id" => self.fsm.store.id,
"unreachable_store_id" => store_id,
);
self.fsm.store.last_unreachable_report.insert(store_id, now);
// It's possible to acquire the lock and only send notification to
// involved regions. However loop over all the regions can take a
// lot of time, which may block other operations.
Expand Down
7 changes: 7 additions & 0 deletions components/raftstore/src/store/metrics.rs
Expand Up @@ -794,4 +794,11 @@ lazy_static! {
"Total snapshot generate limit used",
)
.unwrap();

pub static ref MESSAGE_RECV_BY_STORE: IntCounterVec = register_int_counter_vec!(
"tikv_raftstore_message_recv_by_store",
"Messages received by store",
&["store"]
)
.unwrap();
}

0 comments on commit f5a2202

Please sign in to comment.