Skip to content

Commit

Permalink
resolved_ts: use smaller timeout when do check_leader (tikv#16000) (t…
Browse files Browse the repository at this point in the history
…ikv#9)

* refine resolve check_leader timeout and add failpoint

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* fix

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* remove fail_point

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* Update components/resolved_ts/src/advance.rs

Co-authored-by: cfzjywxk <cfzjywxk@gmail.com>
Signed-off-by: crazycs <crazycs520@gmail.com>

* make format

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* add unit test

Signed-off-by: crazycs520 <crazycs520@gmail.com>

---------

Signed-off-by: crazycs520 <crazycs520@gmail.com>
Signed-off-by: crazycs <crazycs520@gmail.com>
Co-authored-by: cfzjywxk <cfzjywxk@gmail.com>

Signed-off-by: crazycs520 <crazycs520@gmail.com>
Signed-off-by: crazycs <crazycs520@gmail.com>
Co-authored-by: crazycs <crazycs520@gmail.com>
Co-authored-by: cfzjywxk <cfzjywxk@gmail.com>
  • Loading branch information
3 people authored and GitHub Enterprise committed Nov 28, 2023
1 parent 60110b0 commit 26f94f4
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 12 deletions.
2 changes: 1 addition & 1 deletion components/backup-stream/src/subscription_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ where
"take" => ?now.saturating_elapsed(), "timedout" => %timedout);
}
let regions = leader_checker
.resolve(self.subs.current_regions(), min_ts)
.resolve(self.subs.current_regions(), min_ts, None)
.await;
let cps = self.subs.resolve_with(min_ts, regions);
let min_region = cps.iter().min_by_key(|rs| rs.checkpoint);
Expand Down
2 changes: 1 addition & 1 deletion components/cdc/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1069,7 +1069,7 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Endpoint<T, E> {
let regions =
if hibernate_regions_compatible && gate.can_enable(FEATURE_RESOLVED_TS_STORE) {
CDC_RESOLVED_TS_ADVANCE_METHOD.set(1);
leader_resolver.resolve(regions, min_ts).await
leader_resolver.resolve(regions, min_ts, None).await
} else {
CDC_RESOLVED_TS_ADVANCE_METHOD.set(0);
leader_resolver
Expand Down
66 changes: 56 additions & 10 deletions components/resolved_ts/src/advance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ impl AdvanceTsWorker {
}
}

let regions = leader_resolver.resolve(regions, min_ts).await;
let regions = leader_resolver
.resolve(regions, min_ts, Some(advance_ts_interval))
.await;
if !regions.is_empty() {
if let Err(e) = scheduler.schedule(Task::ResolvedTsAdvanced {
regions,
Expand Down Expand Up @@ -272,7 +274,12 @@ impl LeadershipResolver {
// This function broadcasts a special message to all stores, gets the leader id
// of them to confirm whether current peer has a quorum which accepts its
// leadership.
pub async fn resolve(&mut self, regions: Vec<u64>, min_ts: TimeStamp) -> Vec<u64> {
pub async fn resolve(
&mut self,
regions: Vec<u64>,
min_ts: TimeStamp,
timeout: Option<Duration>,
) -> Vec<u64> {
if regions.is_empty() {
return regions;
}
Expand Down Expand Up @@ -359,6 +366,8 @@ impl LeadershipResolver {
.map_or(0, |req| req.regions[0].compute_size());
let store_count = store_req_map.len();
let mut check_leader_rpcs = Vec::with_capacity(store_req_map.len());
let timeout = get_min_timeout(timeout, DEFAULT_CHECK_LEADER_TIMEOUT_DURATION);

for (store_id, req) in store_req_map {
if req.regions.is_empty() {
continue;
Expand All @@ -373,9 +382,16 @@ impl LeadershipResolver {
let rpc = async move {
PENDING_CHECK_LEADER_REQ_COUNT.inc();
defer!(PENDING_CHECK_LEADER_REQ_COUNT.dec());
let client = get_tikv_client(to_store, pd_client, security_mgr, env, tikv_clients)
.await
.map_err(|e| (to_store, e.retryable(), format!("[get tikv client] {}", e)))?;
let client = get_tikv_client(
to_store,
pd_client,
security_mgr,
env,
tikv_clients,
timeout,
)
.await
.map_err(|e| (to_store, e.retryable(), format!("[get tikv client] {}", e)))?;

// Set min_ts in the request.
req.set_ts(min_ts.into_inner());
Expand Down Expand Up @@ -406,7 +422,6 @@ impl LeadershipResolver {

PENDING_CHECK_LEADER_REQ_SENT_COUNT.inc();
defer!(PENDING_CHECK_LEADER_REQ_SENT_COUNT.dec());
let timeout = DEFAULT_CHECK_LEADER_TIMEOUT_DURATION;
let resp = tokio::time::timeout(timeout, rpc)
.map_err(|e| (to_store, true, format!("[timeout] {}", e)))
.await?
Expand Down Expand Up @@ -466,6 +481,11 @@ impl LeadershipResolver {
}
}

#[inline]
fn get_min_timeout(timeout: Option<Duration>, default: Duration) -> Duration {
timeout.unwrap_or(default).min(default)
}

fn region_has_quorum(peers: &[Peer], stores: &[u64]) -> bool {
let mut voters = 0;
let mut incoming_voters = 0;
Expand Down Expand Up @@ -522,14 +542,14 @@ async fn get_tikv_client(
security_mgr: &SecurityManager,
env: Arc<Environment>,
tikv_clients: &Mutex<HashMap<u64, TikvClient>>,
timeout: Duration,
) -> pd_client::Result<TikvClient> {
{
let clients = tikv_clients.lock().await;
if let Some(client) = clients.get(&store_id).cloned() {
return Ok(client);
}
}
let timeout = DEFAULT_CHECK_LEADER_TIMEOUT_DURATION;
let store = tokio::time::timeout(timeout, pd_client.get_store_async(store_id))
.await
.map_err(|e| pd_client::Error::Other(Box::new(e)))
Expand Down Expand Up @@ -647,19 +667,45 @@ mod tests {
.region_read_progress
.insert(2, Arc::new(progress2));

leader_resolver.resolve(vec![1, 2], TimeStamp::new(1)).await;
leader_resolver
.resolve(vec![1, 2], TimeStamp::new(1), None)
.await;
let req = rx.recv_timeout(Duration::from_secs(1)).unwrap();
assert_eq!(req.regions.len(), 2);

// Checking one region only send 1 region in request.
leader_resolver.resolve(vec![1], TimeStamp::new(1)).await;
leader_resolver
.resolve(vec![1], TimeStamp::new(1), None)
.await;
let req = rx.recv_timeout(Duration::from_secs(1)).unwrap();
assert_eq!(req.regions.len(), 1);

// Checking zero region does not send request.
leader_resolver.resolve(vec![], TimeStamp::new(1)).await;
leader_resolver
.resolve(vec![], TimeStamp::new(1), None)
.await;
rx.recv_timeout(Duration::from_secs(1)).unwrap_err();

let _ = server.shutdown().await;
}

#[test]
fn test_get_min_timeout() {
assert_eq!(
get_min_timeout(None, Duration::from_secs(5)),
Duration::from_secs(5)
);
assert_eq!(
get_min_timeout(None, Duration::from_secs(2)),
Duration::from_secs(2)
);
assert_eq!(
get_min_timeout(Some(Duration::from_secs(1)), Duration::from_secs(5)),
Duration::from_secs(1)
);
assert_eq!(
get_min_timeout(Some(Duration::from_secs(20)), Duration::from_secs(5)),
Duration::from_secs(5)
);
}
}

0 comments on commit 26f94f4

Please sign in to comment.