Skip to content

Commit

Permalink
server/node: allow restart with a StoreIdent when cluster not boostra…
Browse files Browse the repository at this point in the history
…ppped (tikv#4334)

Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus committed Mar 13, 2019
1 parent 8e071de commit d7ba49e
Show file tree
Hide file tree
Showing 23 changed files with 291 additions and 263 deletions.
67 changes: 39 additions & 28 deletions components/test_raftstore/src/cluster.rs
Expand Up @@ -28,9 +28,10 @@ use kvproto::raft_serverpb::RaftMessage;

use tikv::config::TiKvConfig;
use tikv::pd::PdClient;
use tikv::raftstore::store::fsm::RaftRouter;
use tikv::raftstore::store::fsm::{create_raft_batch_system, RaftBatchSystem, RaftRouter};
use tikv::raftstore::store::*;
use tikv::raftstore::{Error, Result};
use tikv::server::Result as ServerResult;
use tikv::storage::CF_DEFAULT;
use tikv::util::collections::{HashMap, HashSet};
use tikv::util::{escape, rocksdb_util, HandyRwLock};
Expand All @@ -52,8 +53,10 @@ pub trait Simulator {
&mut self,
node_id: u64,
cfg: TiKvConfig,
_: Option<Engines>,
) -> (u64, Engines, Option<TempDir>);
engines: Engines,
router: RaftRouter,
system: RaftBatchSystem,
) -> ServerResult<u64>;
fn stop_node(&mut self, node_id: u64);
fn get_node_ids(&self) -> HashSet<u64>;
fn async_command_on_node(
Expand Down Expand Up @@ -98,10 +101,10 @@ pub trait Simulator {
pub struct Cluster<T: Simulator> {
pub cfg: TiKvConfig,
leaders: HashMap<u64, metapb::Peer>,
paths: Vec<TempDir>,
dbs: Vec<Engines>,
count: usize,

paths: Vec<TempDir>,
pub dbs: Vec<Engines>,
pub engines: HashMap<u64, Engines>,

pub sim: Arc<RwLock<T>>,
Expand Down Expand Up @@ -153,22 +156,24 @@ impl<T: Simulator> Cluster<T> {
}
}

pub fn start(&mut self) {
if self.engines.is_empty() {
let mut sim = self.sim.wl();
for _ in 0..self.count {
let (node_id, engines, path) = sim.run_node(0, self.cfg.clone(), None);
self.dbs.push(engines.clone());
self.engines.insert(node_id, engines);
self.paths.push(path.unwrap());
}
} else {
// recover from last shutdown.
let mut node_ids: Vec<u64> = self.engines.iter().map(|(&id, _)| id).collect();
for node_id in node_ids.drain(..) {
self.run_node(node_id);
}
pub fn start(&mut self) -> ServerResult<()> {
// Try recover from last shutdown.
let node_ids: Vec<u64> = self.engines.iter().map(|(&id, _)| id).collect();
for node_id in node_ids {
self.run_node(node_id)?;
}

// Try start new nodes.
let mut sim = self.sim.wl();
for _ in 0..self.count - self.engines.len() {
let (router, system) = create_raft_batch_system(&self.cfg.raft_store);
let (engines, path) = create_test_engine(None, router.clone(), &self.cfg);
self.dbs.push(engines.clone());
self.paths.push(path.unwrap());
let node_id = sim.run_node(0, self.cfg.clone(), engines.clone(), router, system)?;
self.engines.insert(node_id, engines);
}
Ok(())
}

pub fn compact_data(&self) {
Expand All @@ -183,29 +188,32 @@ impl<T: Simulator> Cluster<T> {
pub fn run(&mut self) {
self.create_engines();
self.bootstrap_region().unwrap();
self.start();
self.start().unwrap();
}

// Bootstrap the store with fixed ID (like 1, 2, .. 5) and
// initialize first region in store 1, then start the cluster.
pub fn run_conf_change(&mut self) -> u64 {
self.create_engines();
let region_id = self.bootstrap_conf_change();
self.start();
self.start().unwrap();
region_id
}

pub fn get_node_ids(&self) -> HashSet<u64> {
self.sim.rl().get_node_ids()
}

pub fn run_node(&mut self, node_id: u64) {
pub fn run_node(&mut self, node_id: u64) -> ServerResult<()> {
debug!("starting node {}", node_id);
let engines = self.engines[&node_id].clone();
let (router, system) = create_raft_batch_system(&self.cfg.raft_store);
// FIXME: rocksdb event listeners may not work, because we change the router.
self.sim
.wl()
.run_node(node_id, self.cfg.clone(), Some(engines));
.run_node(node_id, self.cfg.clone(), engines, router, system)?;
debug!("node {} started", node_id);
Ok(())
}

pub fn stop_node(&mut self, node_id: u64) {
Expand Down Expand Up @@ -437,7 +445,7 @@ impl<T: Simulator> Cluster<T> {
}

for engines in self.engines.values() {
write_prepare_bootstrap(engines, &region)?;
prepare_bootstrap_cluster(engines, &region)?;
}

self.bootstrap_cluster(region);
Expand All @@ -457,10 +465,13 @@ impl<T: Simulator> Cluster<T> {
}

let node_id = 1;
let region = prepare_bootstrap(&self.engines[&node_id], 1, 1, 1).unwrap();
let rid = region.get_id();
let region_id = 1;
let peer_id = 1;

let region = initial_region(node_id, region_id, peer_id);
prepare_bootstrap_cluster(&self.engines[&node_id], &region).unwrap();
self.bootstrap_cluster(region);
rid
region_id
}

// This is only for fixed id test.
Expand Down
33 changes: 17 additions & 16 deletions components/test_raftstore/src/node.rs
Expand Up @@ -25,11 +25,12 @@ use raft::SnapshotStatus;
use tikv::config::TiKvConfig;
use tikv::import::SSTImporter;
use tikv::raftstore::coprocessor::CoprocessorHost;
use tikv::raftstore::store::fsm::{create_raft_batch_system, RaftRouter};
use tikv::raftstore::store::fsm::{RaftBatchSystem, RaftRouter};
use tikv::raftstore::store::*;
use tikv::raftstore::Result;
use tikv::server::transport::{RaftStoreRouter, ServerRaftStoreRouter};
use tikv::server::Node;
use tikv::server::Result as ServerResult;
use tikv::util::collections::{HashMap, HashSet};
use tikv::util::worker::{FutureWorker, Worker};

Expand Down Expand Up @@ -166,10 +167,11 @@ impl Simulator for NodeCluster {
&mut self,
node_id: u64,
cfg: TiKvConfig,
engines: Option<Engines>,
) -> (u64, Engines, Option<TempDir>) {
engines: Engines,
router: RaftRouter,
system: RaftBatchSystem,
) -> ServerResult<u64> {
assert!(node_id == 0 || !self.nodes.contains_key(&node_id));
let (router, system) = create_raft_batch_system(&cfg.raft_store);
let pd_worker = FutureWorker::new("test-pd-worker");

// Create localreader.
Expand All @@ -184,10 +186,7 @@ impl Simulator for NodeCluster {
Arc::clone(&self.pd_client),
);

// Create engine
let (engines, path) = create_test_engine(engines, router.clone(), &cfg);

let (snap_mgr, tmp) = if node_id == 0
let (snap_mgr, snap_mgr_path) = if node_id == 0
|| !self
.trans
.core
Expand Down Expand Up @@ -225,29 +224,31 @@ impl Simulator for NodeCluster {
local_reader,
coprocessor_host,
importer,
)
.unwrap();
)?;
assert!(engines
.kv
.get_msg::<metapb::Region>(keys::PREPARE_BOOTSTRAP_KEY)
.unwrap()
.is_none());
assert!(node_id == 0 || node_id == node.id());

let node_id = node.id();
debug!(
"node_id: {} tmp: {:?}",
node_id,
tmp.as_ref().map(|p| p.path().to_str().unwrap().to_owned())
snap_mgr_path
.as_ref()
.map(|p| p.path().to_str().unwrap().to_owned())
);
if let Some(tmp) = tmp {
if let Some(tmp) = snap_mgr_path {
self.trans
.core
.lock()
.unwrap()
.snap_paths
.insert(node.id(), (snap_mgr, tmp));
.insert(node_id, (snap_mgr, tmp));
}

let node_id = node.id();
let router = ServerRaftStoreRouter::new(router.clone(), local_ch);
self.trans
.core
Expand All @@ -258,7 +259,7 @@ impl Simulator for NodeCluster {
self.nodes.insert(node_id, node);
self.simulate_trans.insert(node_id, simulate_trans);

(node_id, engines, path)
Ok(node_id)
}

fn get_snap_dir(&self, node_id: u64) -> String {
Expand All @@ -272,7 +273,7 @@ impl Simulator for NodeCluster {

fn stop_node(&mut self, node_id: u64) {
if let Some(mut node) = self.nodes.remove(&node_id) {
node.stop().unwrap();
node.stop();
}
self.trans
.core
Expand Down
27 changes: 10 additions & 17 deletions components/test_raftstore/src/server.rs
Expand Up @@ -25,14 +25,15 @@ use tikv::config::TiKvConfig;
use tikv::coprocessor;
use tikv::import::{ImportSSTService, SSTImporter};
use tikv::raftstore::coprocessor::{CoprocessorHost, RegionInfoAccessor};
use tikv::raftstore::store::fsm::{create_raft_batch_system, RaftRouter};
use tikv::raftstore::store::fsm::{RaftBatchSystem, RaftRouter};
use tikv::raftstore::store::{Callback, Engines, SnapManager};
use tikv::raftstore::Result;
use tikv::server::load_statistics::ThreadLoad;
use tikv::server::readpool::ReadPool;
use tikv::server::resolve::{self, Task as ResolveTask};
use tikv::server::transport::RaftStoreRouter;
use tikv::server::transport::ServerRaftStoreRouter;
use tikv::server::Result as ServerResult;
use tikv::server::{
create_raft_storage, Config, Error, Node, PdStoreAddrResolver, RaftClient, Server,
ServerTransport,
Expand Down Expand Up @@ -109,10 +110,10 @@ impl Simulator for ServerCluster {
&mut self,
node_id: u64,
mut cfg: TiKvConfig,
engines: Option<Engines>,
) -> (u64, Engines, Option<TempDir>) {
assert!(node_id == 0 || !self.metas.contains_key(&node_id));

engines: Engines,
router: RaftRouter,
system: RaftBatchSystem,
) -> ServerResult<u64> {
let (tmp_str, tmp) = if node_id == 0 || !self.snap_paths.contains_key(&node_id) {
let p = TempDir::new("test_cluster").unwrap();
(p.path().to_str().unwrap().to_owned(), Some(p))
Expand All @@ -127,19 +128,13 @@ impl Simulator for ServerCluster {
cfg.server.addr = addr.clone();
}

// Initialize raftstore channels.
let (router, system) = create_raft_batch_system(&cfg.raft_store);

// Create localreader.
let local_reader = Worker::new("test-local-reader");
let local_ch = local_reader.scheduler();

let raft_router = ServerRaftStoreRouter::new(router.clone(), local_ch);
let sim_router = SimulateTransport::new(raft_router);

// Create engine
let (engines, path) = create_test_engine(engines, router.clone(), &cfg);

// Create storage.
let pd_worker = FutureWorker::new("test-future-worker");
let storage_read_pool =
Expand All @@ -152,8 +147,7 @@ impl Simulator for ServerCluster {
storage_read_pool,
None,
None,
)
.unwrap();
)?;
self.storages.insert(node_id, store.get_engine());

// Create import service.
Expand Down Expand Up @@ -235,8 +229,7 @@ impl Simulator for ServerCluster {
local_reader,
coprocessor_host,
importer,
)
.unwrap();
)?;
assert!(node_id == 0 || node_id == node.id());
let node_id = node.id();
if let Some(tmp) = tmp {
Expand All @@ -258,7 +251,7 @@ impl Simulator for ServerCluster {
);
self.addrs.insert(node_id, format!("{}", addr));

(node_id, engines, path)
Ok(node_id)
}

fn get_snap_dir(&self, node_id: u64) -> String {
Expand All @@ -272,7 +265,7 @@ impl Simulator for ServerCluster {
fn stop_node(&mut self, node_id: u64) {
if let Some(mut meta) = self.metas.remove(&node_id) {
meta.server.stop().unwrap();
meta.node.stop().unwrap();
meta.node.stop();
meta.worker.stop().unwrap().join().unwrap();
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/bin/tikv-server.rs
Expand Up @@ -325,8 +325,7 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig, security_mgr: Arc<Sec

metrics_flusher.stop();

node.stop()
.unwrap_or_else(|e| fatal!("failed to stop node: {}", e));
node.stop();

region_info_accessor.stop();

Expand Down

0 comments on commit d7ba49e

Please sign in to comment.