Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server/node: allow restart with a StoreIdent when cluster not boostrappped #4334

Merged
merged 13 commits into from Mar 13, 2019
67 changes: 39 additions & 28 deletions components/test_raftstore/src/cluster.rs
Expand Up @@ -28,9 +28,10 @@ use kvproto::raft_serverpb::RaftMessage;

use tikv::config::TiKvConfig;
use tikv::pd::PdClient;
use tikv::raftstore::store::fsm::RaftRouter;
use tikv::raftstore::store::fsm::{create_raft_batch_system, RaftBatchSystem, RaftRouter};
use tikv::raftstore::store::*;
use tikv::raftstore::{Error, Result};
use tikv::server::Result as ServerResult;
use tikv::storage::CF_DEFAULT;
use tikv::util::collections::{HashMap, HashSet};
use tikv::util::{escape, rocksdb_util, HandyRwLock};
Expand All @@ -52,8 +53,10 @@ pub trait Simulator {
&mut self,
node_id: u64,
cfg: TiKvConfig,
_: Option<Engines>,
) -> (u64, Engines, Option<TempDir>);
engines: Engines,
router: RaftRouter,
system: RaftBatchSystem,
) -> ServerResult<u64>;
fn stop_node(&mut self, node_id: u64);
fn get_node_ids(&self) -> HashSet<u64>;
fn async_command_on_node(
Expand Down Expand Up @@ -98,10 +101,10 @@ pub trait Simulator {
pub struct Cluster<T: Simulator> {
pub cfg: TiKvConfig,
leaders: HashMap<u64, metapb::Peer>,
paths: Vec<TempDir>,
dbs: Vec<Engines>,
count: usize,

paths: Vec<TempDir>,
pub dbs: Vec<Engines>,
pub engines: HashMap<u64, Engines>,

pub sim: Arc<RwLock<T>>,
Expand Down Expand Up @@ -153,22 +156,24 @@ impl<T: Simulator> Cluster<T> {
}
}

pub fn start(&mut self) {
if self.engines.is_empty() {
let mut sim = self.sim.wl();
for _ in 0..self.count {
let (node_id, engines, path) = sim.run_node(0, self.cfg.clone(), None);
self.dbs.push(engines.clone());
self.engines.insert(node_id, engines);
self.paths.push(path.unwrap());
}
} else {
// recover from last shutdown.
let mut node_ids: Vec<u64> = self.engines.iter().map(|(&id, _)| id).collect();
for node_id in node_ids.drain(..) {
self.run_node(node_id);
}
pub fn start(&mut self) -> ServerResult<()> {
// Try recover from last shutdown.
let node_ids: Vec<u64> = self.engines.iter().map(|(&id, _)| id).collect();
for node_id in node_ids {
self.run_node(node_id)?;
}

// Try start new nodes.
let mut sim = self.sim.wl();
for _ in 0..self.count - self.engines.len() {
let (router, system) = create_raft_batch_system(&self.cfg.raft_store);
let (engines, path) = create_test_engine(None, router.clone(), &self.cfg);
self.dbs.push(engines.clone());
self.paths.push(path.unwrap());
let node_id = sim.run_node(0, self.cfg.clone(), engines.clone(), router, system)?;
self.engines.insert(node_id, engines);
}
Ok(())
}

pub fn compact_data(&self) {
Expand All @@ -183,29 +188,32 @@ impl<T: Simulator> Cluster<T> {
pub fn run(&mut self) {
self.create_engines();
self.bootstrap_region().unwrap();
self.start();
self.start().unwrap();
}

// Bootstrap the store with fixed ID (like 1, 2, .. 5) and
// initialize first region in store 1, then start the cluster.
pub fn run_conf_change(&mut self) -> u64 {
self.create_engines();
let region_id = self.bootstrap_conf_change();
self.start();
self.start().unwrap();
region_id
}

pub fn get_node_ids(&self) -> HashSet<u64> {
self.sim.rl().get_node_ids()
}

pub fn run_node(&mut self, node_id: u64) {
pub fn run_node(&mut self, node_id: u64) -> ServerResult<()> {
debug!("starting node {}", node_id);
let engines = self.engines[&node_id].clone();
let (router, system) = create_raft_batch_system(&self.cfg.raft_store);
// FIXME: rocksdb event listeners may not work, because we change the router.
self.sim
.wl()
.run_node(node_id, self.cfg.clone(), Some(engines));
.run_node(node_id, self.cfg.clone(), engines, router, system)?;
debug!("node {} started", node_id);
Ok(())
}

pub fn stop_node(&mut self, node_id: u64) {
Expand Down Expand Up @@ -437,7 +445,7 @@ impl<T: Simulator> Cluster<T> {
}

for engines in self.engines.values() {
write_prepare_bootstrap(engines, &region)?;
prepare_bootstrap_cluster(engines, &region)?;
}

self.bootstrap_cluster(region);
Expand All @@ -457,10 +465,13 @@ impl<T: Simulator> Cluster<T> {
}

let node_id = 1;
let region = prepare_bootstrap(&self.engines[&node_id], 1, 1, 1).unwrap();
let rid = region.get_id();
let region_id = 1;
let peer_id = 1;

let region = initial_region(node_id, region_id, peer_id);
prepare_bootstrap_cluster(&self.engines[&node_id], &region).unwrap();
self.bootstrap_cluster(region);
rid
region_id
}

// This is only for fixed id test.
Expand Down
33 changes: 17 additions & 16 deletions components/test_raftstore/src/node.rs
Expand Up @@ -25,11 +25,12 @@ use raft::SnapshotStatus;
use tikv::config::TiKvConfig;
use tikv::import::SSTImporter;
use tikv::raftstore::coprocessor::CoprocessorHost;
use tikv::raftstore::store::fsm::{create_raft_batch_system, RaftRouter};
use tikv::raftstore::store::fsm::{RaftBatchSystem, RaftRouter};
use tikv::raftstore::store::*;
use tikv::raftstore::Result;
use tikv::server::transport::{RaftStoreRouter, ServerRaftStoreRouter};
use tikv::server::Node;
use tikv::server::Result as ServerResult;
use tikv::util::collections::{HashMap, HashSet};
use tikv::util::worker::{FutureWorker, Worker};

Expand Down Expand Up @@ -166,10 +167,11 @@ impl Simulator for NodeCluster {
&mut self,
node_id: u64,
cfg: TiKvConfig,
engines: Option<Engines>,
) -> (u64, Engines, Option<TempDir>) {
engines: Engines,
router: RaftRouter,
system: RaftBatchSystem,
) -> ServerResult<u64> {
assert!(node_id == 0 || !self.nodes.contains_key(&node_id));
let (router, system) = create_raft_batch_system(&cfg.raft_store);
let pd_worker = FutureWorker::new("test-pd-worker");

// Create localreader.
Expand All @@ -184,10 +186,7 @@ impl Simulator for NodeCluster {
Arc::clone(&self.pd_client),
);

// Create engine
let (engines, path) = create_test_engine(engines, router.clone(), &cfg);

let (snap_mgr, tmp) = if node_id == 0
let (snap_mgr, snap_mgr_path) = if node_id == 0
|| !self
.trans
.core
Expand Down Expand Up @@ -225,29 +224,31 @@ impl Simulator for NodeCluster {
local_reader,
coprocessor_host,
importer,
)
.unwrap();
)?;
assert!(engines
.kv
.get_msg::<metapb::Region>(keys::PREPARE_BOOTSTRAP_KEY)
.unwrap()
.is_none());
assert!(node_id == 0 || node_id == node.id());

let node_id = node.id();
debug!(
"node_id: {} tmp: {:?}",
node_id,
tmp.as_ref().map(|p| p.path().to_str().unwrap().to_owned())
snap_mgr_path
.as_ref()
.map(|p| p.path().to_str().unwrap().to_owned())
);
if let Some(tmp) = tmp {
if let Some(tmp) = snap_mgr_path {
self.trans
.core
.lock()
.unwrap()
.snap_paths
.insert(node.id(), (snap_mgr, tmp));
.insert(node_id, (snap_mgr, tmp));
}

let node_id = node.id();
let router = ServerRaftStoreRouter::new(router.clone(), local_ch);
self.trans
.core
Expand All @@ -258,7 +259,7 @@ impl Simulator for NodeCluster {
self.nodes.insert(node_id, node);
self.simulate_trans.insert(node_id, simulate_trans);

(node_id, engines, path)
Ok(node_id)
}

fn get_snap_dir(&self, node_id: u64) -> String {
Expand All @@ -272,7 +273,7 @@ impl Simulator for NodeCluster {

fn stop_node(&mut self, node_id: u64) {
if let Some(mut node) = self.nodes.remove(&node_id) {
node.stop().unwrap();
node.stop();
}
self.trans
.core
Expand Down
27 changes: 10 additions & 17 deletions components/test_raftstore/src/server.rs
Expand Up @@ -25,14 +25,15 @@ use tikv::config::TiKvConfig;
use tikv::coprocessor;
use tikv::import::{ImportSSTService, SSTImporter};
use tikv::raftstore::coprocessor::{CoprocessorHost, RegionInfoAccessor};
use tikv::raftstore::store::fsm::{create_raft_batch_system, RaftRouter};
use tikv::raftstore::store::fsm::{RaftBatchSystem, RaftRouter};
use tikv::raftstore::store::{Callback, Engines, SnapManager};
use tikv::raftstore::Result;
use tikv::server::load_statistics::ThreadLoad;
use tikv::server::readpool::ReadPool;
use tikv::server::resolve::{self, Task as ResolveTask};
use tikv::server::transport::RaftStoreRouter;
use tikv::server::transport::ServerRaftStoreRouter;
use tikv::server::Result as ServerResult;
use tikv::server::{
create_raft_storage, Config, Error, Node, PdStoreAddrResolver, RaftClient, Server,
ServerTransport,
Expand Down Expand Up @@ -109,10 +110,10 @@ impl Simulator for ServerCluster {
&mut self,
node_id: u64,
mut cfg: TiKvConfig,
engines: Option<Engines>,
) -> (u64, Engines, Option<TempDir>) {
assert!(node_id == 0 || !self.metas.contains_key(&node_id));

engines: Engines,
router: RaftRouter,
system: RaftBatchSystem,
) -> ServerResult<u64> {
let (tmp_str, tmp) = if node_id == 0 || !self.snap_paths.contains_key(&node_id) {
let p = TempDir::new("test_cluster").unwrap();
(p.path().to_str().unwrap().to_owned(), Some(p))
Expand All @@ -127,19 +128,13 @@ impl Simulator for ServerCluster {
cfg.server.addr = addr.clone();
}

// Initialize raftstore channels.
let (router, system) = create_raft_batch_system(&cfg.raft_store);

// Create localreader.
let local_reader = Worker::new("test-local-reader");
let local_ch = local_reader.scheduler();

let raft_router = ServerRaftStoreRouter::new(router.clone(), local_ch);
let sim_router = SimulateTransport::new(raft_router);

// Create engine
let (engines, path) = create_test_engine(engines, router.clone(), &cfg);

// Create storage.
let pd_worker = FutureWorker::new("test-future-worker");
let storage_read_pool =
Expand All @@ -152,8 +147,7 @@ impl Simulator for ServerCluster {
storage_read_pool,
None,
None,
)
.unwrap();
)?;
self.storages.insert(node_id, store.get_engine());

// Create import service.
Expand Down Expand Up @@ -235,8 +229,7 @@ impl Simulator for ServerCluster {
local_reader,
coprocessor_host,
importer,
)
.unwrap();
)?;
assert!(node_id == 0 || node_id == node.id());
let node_id = node.id();
if let Some(tmp) = tmp {
Expand All @@ -258,7 +251,7 @@ impl Simulator for ServerCluster {
);
self.addrs.insert(node_id, format!("{}", addr));

(node_id, engines, path)
Ok(node_id)
}

fn get_snap_dir(&self, node_id: u64) -> String {
Expand All @@ -272,7 +265,7 @@ impl Simulator for ServerCluster {
fn stop_node(&mut self, node_id: u64) {
if let Some(mut meta) = self.metas.remove(&node_id) {
meta.server.stop().unwrap();
meta.node.stop().unwrap();
meta.node.stop();
meta.worker.stop().unwrap().join().unwrap();
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/bin/tikv-server.rs
Expand Up @@ -325,8 +325,7 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig, security_mgr: Arc<Sec

metrics_flusher.stop();

node.stop()
.unwrap_or_else(|e| fatal!("failed to stop node: {}", e));
node.stop();

region_info_accessor.stop();

Expand Down