Skip to content

Commit

Permalink
*: ignore tombstone stores (#4223)
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus committed Feb 26, 2019
1 parent a8f16c2 commit 9a66560
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/bin/tikv-ctl.rs
Expand Up @@ -2172,7 +2172,7 @@ fn compact_whole_cluster(
bottommost: BottommostLevelCompaction,
) {
let stores = pd_client
.get_all_stores()
.get_all_stores(true) // Exclude tombstone stores.
.unwrap_or_else(|e| perror_and_exit("Get all cluster stores from PD failed", e));

let mut handles = Vec::new();
Expand Down
6 changes: 4 additions & 2 deletions src/import/client.rs
Expand Up @@ -107,7 +107,8 @@ impl Client {

pub fn switch_cluster(&self, req: &SwitchModeRequest) -> Result<()> {
let mut futures = Vec::new();
for store in self.pd.get_all_stores()? {
// Exclude tombstone stores.
for store in self.pd.get_all_stores(true)? {
let ch = match self.resolve(store.get_id()) {
Ok(v) => v,
Err(e) => {
Expand All @@ -134,7 +135,8 @@ impl Client {

pub fn compact_cluster(&self, req: &CompactRequest) -> Result<()> {
let mut futures = Vec::new();
for store in self.pd.get_all_stores()? {
// Exclude tombstone stores.
for store in self.pd.get_all_stores(true)? {
let ch = match self.resolve(store.get_id()) {
Ok(v) => v,
Err(e) => {
Expand Down
12 changes: 9 additions & 3 deletions src/pd/client.rs
Expand Up @@ -210,23 +210,29 @@ impl PdClient for RpcClient {
})?;
check_resp_header(resp.get_header())?;

Ok(resp.take_store())
let store = resp.take_store();
if store.get_state() != metapb::StoreState::Tombstone {
Ok(store)
} else {
Err(Error::StoreTombstone(format!("{:?}", store)))
}
}

fn get_all_stores(&self) -> Result<Vec<metapb::Store>> {
fn get_all_stores(&self, exclude_tombstone: bool) -> Result<Vec<metapb::Store>> {
let _timer = PD_REQUEST_HISTOGRAM_VEC
.with_label_values(&["get_all_stores"])
.start_coarse_timer();

let mut req = pdpb::GetAllStoresRequest::new();
req.set_header(self.header());
req.set_exclude_tombstone_stores(exclude_tombstone);

let mut resp = sync_request(&self.leader_client, LEADER_CHANGE_RETRY, |client| {
client.get_all_stores_opt(&req, Self::call_option())
})?;
check_resp_header(resp.get_header())?;

Ok(resp.take_stores().to_vec())
Ok(resp.take_stores().into_vec())
}

fn get_cluster_config(&self) -> Result<metapb::Cluster> {
Expand Down
4 changes: 4 additions & 0 deletions src/pd/errors.rs
Expand Up @@ -49,6 +49,10 @@ quick_error! {
description("region is not found")
display("region is not found for key {:?}", key)
}
StoreTombstone(msg: String) {
description("store is tombstone")
display("store is tombstone {:?}", msg)
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/pd/mod.rs
Expand Up @@ -112,11 +112,11 @@ pub trait PdClient: Send + Sync {
/// and Peers will be removed.
/// - For auto-balance, PD determines how to move the Region from one store to another.

/// Gets store information.
/// Gets store information if it is not a tombstone store.
fn get_store(&self, store_id: u64) -> Result<metapb::Store>;

/// Gets all stores information.
fn get_all_stores(&self) -> Result<Vec<metapb::Store>> {
fn get_all_stores(&self, _exlcude_tombstone: bool) -> Result<Vec<metapb::Store>> {
unimplemented!();
}

Expand Down
6 changes: 4 additions & 2 deletions src/pd/util.rs
Expand Up @@ -473,16 +473,18 @@ pub fn try_connect_leader(
Err(box_err!("failed to connect to {:?}", members))
}

/// Convert a PD protobuf error to an `Error`.
pub fn check_resp_header(header: &ResponseHeader) -> Result<()> {
if !header.has_error() {
return Ok(());
}
// TODO: translate more error types
let err = header.get_error();
match err.get_field_type() {
ErrorType::ALREADY_BOOTSTRAPPED => Err(Error::ClusterBootstrapped(header.get_cluster_id())),
ErrorType::NOT_BOOTSTRAPPED => Err(Error::ClusterNotBootstrapped(header.get_cluster_id())),
ErrorType::INCOMPATIBLE_VERSION => Err(Error::Incompatible),
_ => Err(box_err!(err.get_message())),
ErrorType::STORE_TOMBSTONE => Err(Error::StoreTombstone(err.get_message().to_owned())),
ErrorType::UNKNOWN => Err(box_err!(err.get_message())),
ErrorType::OK => Ok(()),
}
}
14 changes: 12 additions & 2 deletions tests/integrations/pd/mock/mocker/service.rs
Expand Up @@ -15,7 +15,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Mutex;
use tikv::util::collections::HashMap;

use kvproto::metapb::{Peer, Region, Store};
use kvproto::metapb::{Peer, Region, Store, StoreState};
use kvproto::pdpb::*;

use protobuf::RepeatedField;
Expand Down Expand Up @@ -49,6 +49,12 @@ impl Service {
header.set_cluster_id(DEFAULT_CLUSTER_ID);
header
}

/// Add an arbitrary store.
pub fn add_store(&self, store: Store) {
let store_id = store.get_id();
self.stores.lock().unwrap().insert(store_id, store);
}
}

fn make_members_response(eps: Vec<String>) -> GetMembersResponse {
Expand Down Expand Up @@ -144,11 +150,15 @@ impl PdMocker for Service {
}
}

fn get_all_stores(&self, _: &GetAllStoresRequest) -> Option<Result<GetAllStoresResponse>> {
fn get_all_stores(&self, req: &GetAllStoresRequest) -> Option<Result<GetAllStoresResponse>> {
let mut resp = GetAllStoresResponse::new();
resp.set_header(Service::header());
let exclude_tombstone = req.get_exclude_tombstone_stores();
let stores = self.stores.lock().unwrap();
for store in stores.values() {
if exclude_tombstone && store.get_state() == StoreState::Tombstone {
continue;
}
resp.mut_stores().push(store.clone());
}
Some(Ok(resp))
Expand Down
4 changes: 4 additions & 0 deletions tests/integrations/pd/mock/server.rs
Expand Up @@ -40,6 +40,10 @@ impl Server<Service> {
let case = Option::None::<Arc<Service>>;
Self::with_configuration(&mgr, eps, case)
}

pub fn default_handler(&self) -> &Service {
&self.mocker.default_handler
}
}

impl<C: PdMocker + Send + Sync + 'static> Server<C> {
Expand Down
58 changes: 57 additions & 1 deletion tests/integrations/pd/test_rpc_client.rs
Expand Up @@ -93,7 +93,7 @@ fn test_rpc_client() {
.unwrap();
assert_eq!(client.is_cluster_bootstrapped().unwrap(), true);

let tmp_stores = client.get_all_stores().unwrap();
let tmp_stores = client.get_all_stores(false).unwrap();
assert_eq!(tmp_stores.len(), 1);
assert_eq!(tmp_stores[0], store);

Expand Down Expand Up @@ -154,6 +154,62 @@ fn test_rpc_client() {
client.scatter_region(region_info).unwrap();
}

#[test]
fn test_get_tombstone_stores() {
let eps_count = 1;
let server = MockServer::new(eps_count);
let eps = server.bind_addrs();
let client = new_client(eps.clone(), None);

let mut all_stores = vec![];
let store_id = client.alloc_id().unwrap();
let mut store = metapb::Store::new();
store.set_id(store_id);
let region_id = client.alloc_id().unwrap();
let mut region = metapb::Region::new();
region.set_id(region_id);
client
.bootstrap_cluster(store.clone(), region.clone())
.unwrap();

all_stores.push(store.clone());
assert_eq!(client.is_cluster_bootstrapped().unwrap(), true);
let s = client.get_all_stores(false).unwrap();
assert_eq!(s, all_stores);

// Add tombstone store.
let mut store99 = metapb::Store::new();
store99.set_id(99);
store99.set_state(metapb::StoreState::Tombstone);
server.default_handler().add_store(store99.clone());

// do not include tombstone.
let s = client.get_all_stores(true).unwrap();
assert_eq!(s, all_stores);

all_stores.push(store99.clone());
all_stores.sort_by(|a, b| a.get_id().cmp(&b.get_id()));
// include tombstone, there should be 2 stores.
let mut s = client.get_all_stores(false).unwrap();
s.sort_by(|a, b| a.get_id().cmp(&b.get_id()));
assert_eq!(s, all_stores);

// Add another tombstone store.
let mut store199 = store99.clone();
store199.set_id(199);
server.default_handler().add_store(store199.clone());

all_stores.push(store199.clone());
all_stores.sort_by(|a, b| a.get_id().cmp(&b.get_id()));
let mut s = client.get_all_stores(false).unwrap();
s.sort_by(|a, b| a.get_id().cmp(&b.get_id()));
assert_eq!(s, all_stores);

client.get_store(store_id).unwrap();
client.get_store(99).unwrap_err();
client.get_store(199).unwrap_err();
}

#[test]
fn test_reboot() {
let eps_count = 1;
Expand Down

0 comments on commit 9a66560

Please sign in to comment.