Skip to content

Commit

Permalink
prevent PANIC when leader is None
Browse files Browse the repository at this point in the history
happend in tests with tifs:
thread 'tokio-runtime-worker' panicked at /home/uli/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tikv-client-0.3.0/src/pd/cluster.rs:270:47:
called `Option::unwrap()` on a `None` value
  • Loading branch information
cre4ture committed May 14, 2024
1 parent c6110dd commit bcff6ae
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 7 deletions.
4 changes: 3 additions & 1 deletion src/common/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ pub enum Error {
RegionNotFoundInResponse { region_id: u64 },
/// No leader is found for the given id.
#[error("Leader of region {} is not found", region_id)]
LeaderNotFound { region_id: u64 },
LeaderOfRegionNotFound { region_id: u64 },
#[error("Leader of cluster {} is not found", cluster_id)]
LeaderOfClusterNotFound { cluster_id: u64 },
/// Scan limit exceeds the maximum
#[error("Limit {} exceeds max scan limit {}", limit, max_limit)]
MaxScanLimitExceeded { limit: u32, max_limit: u32 },
Expand Down
10 changes: 6 additions & 4 deletions src/pd/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,16 +279,16 @@ impl Connection {
keyspacepb::keyspace_client::KeyspaceClient<Channel>,
pdpb::GetMembersResponse,
)> {
let previous_leader = previous.leader.as_ref().unwrap();
let previous_leader = previous.leader.as_ref();
let members = &previous.members;
let cluster_id = previous.header.as_ref().unwrap().cluster_id;

let mut resp = None;
// Try to connect to other members, then the previous leader.
'outer: for m in members
.iter()
.filter(|m| *m != previous_leader)
.chain(Some(previous_leader))
.filter(|m| !previous_leader.is_some_and(|pl| *m == pl))
.chain(previous_leader)
{
for ep in &m.client_urls {
match self.try_connect(ep.as_str(), cluster_id, timeout).await {
Expand All @@ -306,7 +306,9 @@ impl Connection {

// Then try to connect the PD cluster leader.
if let Some(resp) = resp {
let leader = resp.leader.as_ref().unwrap();
let leader = resp.leader.as_ref().ok_or(Error::LeaderOfClusterNotFound{
cluster_id,
})?;
for ep in &leader.client_urls {
if let Ok((client, keyspace_client, members)) =
self.try_connect(ep.as_str(), cluster_id, timeout).await
Expand Down
2 changes: 1 addition & 1 deletion src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl RegionWithLeader {
self.leader
.as_ref()
.cloned()
.ok_or_else(|| Error::LeaderNotFound {
.ok_or_else(|| Error::LeaderOfRegionNotFound {
region_id: self.id(),
})
.map(|s| s.store_id)
Expand Down
2 changes: 1 addition & 1 deletion src/store/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ macro_rules! impl_request {

fn set_leader(&mut self, leader: &RegionWithLeader) -> Result<()> {
let ctx = self.context.get_or_insert(kvrpcpb::Context::default());
let leader_peer = leader.leader.as_ref().ok_or(Error::LeaderNotFound {
let leader_peer = leader.leader.as_ref().ok_or(Error::LeaderOfRegionNotFound {
region_id: leader.region.id,
})?;
ctx.region_id = leader.region.id;
Expand Down

0 comments on commit bcff6ae

Please sign in to comment.