Skip to content

Commit

Permalink
*: ignore tombstone stores
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus committed Feb 15, 2019
1 parent 42eb6a0 commit b682c27
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/bin/tikv-ctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2132,7 +2132,7 @@ fn compact_whole_cluster(
bottommost: BottommostLevelCompaction,
) {
let stores = pd_client
.get_all_stores()
.get_all_stores(false) // do not include tombstone stores.
.unwrap_or_else(|e| perror_and_exit("Get all cluster stores from PD failed", e));

let mut handles = Vec::new();
Expand Down
6 changes: 4 additions & 2 deletions src/import/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ impl Client {

pub fn switch_cluster(&self, req: &SwitchModeRequest) -> Result<()> {
let mut futures = Vec::new();
for store in self.pd.get_all_stores()? {
// do not include tombstone stores.
for store in self.pd.get_all_stores(false)? {
let ch = match self.resolve(store.get_id()) {
Ok(v) => v,
Err(e) => {
Expand All @@ -134,7 +135,8 @@ impl Client {

pub fn compact_cluster(&self, req: &CompactRequest) -> Result<()> {
let mut futures = Vec::new();
for store in self.pd.get_all_stores()? {
// do not include tombstone stores.
for store in self.pd.get_all_stores(false)? {
let ch = match self.resolve(store.get_id()) {
Ok(v) => v,
Err(e) => {
Expand Down
12 changes: 10 additions & 2 deletions src/pd/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ impl PdClient for RpcClient {
Ok(resp.take_store())
}

fn get_all_stores(&self) -> Result<Vec<metapb::Store>> {
fn get_all_stores(&self, include_tombstone: bool) -> Result<Vec<metapb::Store>> {
let _timer = PD_REQUEST_HISTOGRAM_VEC
.with_label_values(&["get_all_stores"])
.start_coarse_timer();
Expand All @@ -226,7 +226,15 @@ impl PdClient for RpcClient {
})?;
check_resp_header(resp.get_header())?;

Ok(resp.take_stores().to_vec())
let mut stores = resp.take_stores().to_vec();
if !include_tombstone {
for i in (0..stores.len()).rev() {
if stores[i].get_state() == metapb::StoreState::Tombstone {
stores.swap_remove(i);
}
}
}
Ok(stores)
}

fn get_cluster_config(&self) -> Result<metapb::Cluster> {
Expand Down
4 changes: 2 additions & 2 deletions src/pd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ pub trait PdClient: Send + Sync {
/// and Peers will be removed.
/// - For auto-balance, PD determines how to move the Region from one store to another.

/// Gets store information.
/// Gets store information if it is not tombstone store.
fn get_store(&self, store_id: u64) -> Result<metapb::Store>;

/// Gets all stores information.
fn get_all_stores(&self) -> Result<Vec<metapb::Store>> {
fn get_all_stores(&self, _inlcude_tombstone: bool) -> Result<Vec<metapb::Store>> {
unimplemented!();
}

Expand Down
6 changes: 6 additions & 0 deletions tests/integrations/pd/mock/mocker/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ impl Service {
header.set_cluster_id(DEFAULT_CLUSTER_ID);
header
}

/// Add an arbitrary store.
pub fn add_store(&self, store: Store) {
let store_id = store.get_id();
self.stores.lock().unwrap().insert(store_id, store);
}
}

fn make_members_response(eps: Vec<String>) -> GetMembersResponse {
Expand Down
4 changes: 4 additions & 0 deletions tests/integrations/pd/mock/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ impl Server<Service> {
let case = Option::None::<Arc<Service>>;
Self::with_configuration(&mgr, eps, case)
}

pub fn default_handler(&self) -> &Service {
&self.mocker.default_handler
}
}

impl<C: PdMocker + Send + Sync + 'static> Server<C> {
Expand Down
54 changes: 53 additions & 1 deletion tests/integrations/pd/test_rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ fn test_rpc_client() {
.unwrap();
assert_eq!(client.is_cluster_bootstrapped().unwrap(), true);

let tmp_stores = client.get_all_stores().unwrap();
let tmp_stores = client.get_all_stores(false).unwrap();
assert_eq!(tmp_stores.len(), 1);
assert_eq!(tmp_stores[0], store);

Expand Down Expand Up @@ -154,6 +154,58 @@ fn test_rpc_client() {
client.scatter_region(region_info).unwrap();
}

#[test]
fn test_get_all_stores() {
let eps_count = 1;
let server = MockServer::new(eps_count);
let eps = server.bind_addrs();
let client = new_client(eps.clone(), None);

let mut all_stores = vec![];
let store_id = client.alloc_id().unwrap();
let mut store = metapb::Store::new();
store.set_id(store_id);
let region_id = client.alloc_id().unwrap();
let mut region = metapb::Region::new();
region.set_id(region_id);
client
.bootstrap_cluster(store.clone(), region.clone())
.unwrap();

all_stores.push(store.clone());
assert_eq!(client.is_cluster_bootstrapped().unwrap(), true);
let s = client.get_all_stores(true).unwrap();
assert_eq!(s, all_stores);

// Add tombstone store.
let mut store99 = metapb::Store::new();
store99.set_id(99);
store99.set_state(metapb::StoreState::Tombstone);
server.default_handler().add_store(store99.clone());

// do not include tombstone.
let s = client.get_all_stores(false).unwrap();
assert_eq!(s, all_stores);

all_stores.push(store99.clone());
all_stores.sort_by(|a, b| a.get_id().cmp(&b.get_id()));
// include tombstone, there should be 2 stores.
let mut s = client.get_all_stores(true).unwrap();
s.sort_by(|a, b| a.get_id().cmp(&b.get_id()));
assert_eq!(s, all_stores);

// Add another tombstone store.
let mut store199 = store99.clone();
store199.set_id(199);
server.default_handler().add_store(store199.clone());

all_stores.push(store199.clone());
all_stores.sort_by(|a, b| a.get_id().cmp(&b.get_id()));
let mut s = client.get_all_stores(true).unwrap();
s.sort_by(|a, b| a.get_id().cmp(&b.get_id()));
assert_eq!(s, all_stores);
}

#[test]
fn test_reboot() {
let eps_count = 1;
Expand Down

0 comments on commit b682c27

Please sign in to comment.