Skip to content

Commit

Permalink
cmd, backup: enable backup feature (tikv#5476)
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus authored and sre-bot committed Sep 18, 2019
1 parent 8524c80 commit 4e787a9
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 50 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cmd/Cargo.toml
Expand Up @@ -52,6 +52,7 @@ kvproto = { git = "https://github.com/pingcap/kvproto.git" }
tikv_util = { path = "../components/tikv_util" }
engine = { path = "../components/engine" }
pd_client = { path = "../components/pd_client" }
backup = { path = "../components/backup" }

[target.'cfg(unix)'.dependencies]
signal = "0.6"
52 changes: 48 additions & 4 deletions cmd/src/server.rs
Expand Up @@ -7,6 +7,7 @@ use engine::rocks::util::metrics_flusher::{MetricsFlusher, DEFAULT_FLUSHER_INTER
use engine::rocks::util::security::encrypted_env_from_cipher_file;
use engine::Engines;
use fs2::FileExt;
use kvproto::backup::create_backup;
use kvproto::deadlock::create_deadlock;
use kvproto::debugpb::create_debug;
use kvproto::import_sstpb::create_import_sst;
Expand Down Expand Up @@ -225,6 +226,11 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig, security_mgr: Arc<Sec
// Create Debug service.
let debug_service = DebugService::new(engines.clone(), raft_router.clone());

// Create Backup service.
let mut backup_worker = tikv_util::worker::Worker::new("backup-endpoint");
let backup_scheduler = backup_worker.scheduler();
let backup_service = backup::Service::new(backup_scheduler);

// Create server
let mut server = Server::new(
&server_cfg,
Expand All @@ -238,10 +244,31 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig, security_mgr: Arc<Sec
.unwrap_or_else(|e| fatal!("failed to create server: {}", e));

// Register services.
server.register_service(create_import_sst(import_service));
server.register_service(create_debug(debug_service));
if server
.register_service(create_import_sst(import_service))
.is_some()
{
fatal!("failed to register import service");
}
if server
.register_service(create_debug(debug_service))
.is_some()
{
fatal!("failed to register debug service");
}
if let Some(lm) = lock_mgr.as_ref() {
server.register_service(create_deadlock(lm.deadlock_service()));
if server
.register_service(create_deadlock(lm.deadlock_service()))
.is_some()
{
fatal!("failed to register deadlock service");
}
}
if server
.register_service(create_backup(backup_service))
.is_some()
{
fatal!("failed to register backup service");
}

let trans = server.transport();
Expand Down Expand Up @@ -273,6 +300,17 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig, security_mgr: Arc<Sec
.unwrap_or_else(|e| fatal!("failed to start node: {}", e));
initial_metric(&cfg.metric, Some(node.id()));

// Start backup endpoint.
let backup_endpoint = backup::Endpoint::new(
node.id(),
engine.clone(),
region_info_accessor.clone(),
engines.kv.clone(),
);
backup_worker
.start(backup_endpoint)
.unwrap_or_else(|e| fatal!("failed to start backup endpoint: {}", e));

// Start auto gc
let auto_gc_cfg = AutoGCConfig::new(
Arc::clone(&pd_client),
Expand Down Expand Up @@ -335,7 +373,13 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig, security_mgr: Arc<Sec

signal_handler::handle_signal(Some(engines));

// Stop.
// Stop backup worker.
if let Some(j) = backup_worker.stop() {
j.join()
.unwrap_or_else(|e| fatal!("failed to stop backup: {:?}", e))
}

// Stop server.
server
.stop()
.unwrap_or_else(|e| fatal!("failed to stop server: {}", e));
Expand Down
6 changes: 3 additions & 3 deletions cmd/src/setup.rs
Expand Up @@ -15,11 +15,11 @@ pub static LOG_INITIALIZED: AtomicBool = AtomicBool::new(false);

#[macro_export]
macro_rules! fatal {
($lvl:expr, $($arg:tt)+) => ({
($lvl:expr $(, $arg:expr)*) => ({
if $crate::setup::LOG_INITIALIZED.load(::std::sync::atomic::Ordering::SeqCst) {
crit!($lvl, $($arg)+);
crit!($lvl $(, $arg)*);
} else {
eprintln!($lvl, $($arg)+);
eprintln!($lvl $(, $arg)*);
}
slog_global::clear_global();
::std::process::exit(1)
Expand Down
36 changes: 11 additions & 25 deletions components/backup/src/endpoint.rs
@@ -1,10 +1,5 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

// TODO: remove it after all code been merged.
#![allow(unused_variables)]
#![allow(dead_code)]

use std::cmp;
use std::fmt;
use std::sync::atomic::*;
use std::sync::*;
Expand All @@ -13,24 +8,17 @@ use std::time::*;
use engine::rocks::util::io_limiter::IOLimiter;
use engine::DB;
use external_storage::*;
use futures::lazy;
use futures::sync::mpsc::*;
use futures::{lazy, Future};
use kvproto::backup::*;
use kvproto::kvrpcpb::{Context, IsolationLevel};
use kvproto::metapb::*;
use raft::StateRole;
use tikv::raftstore::coprocessor::RegionInfoAccessor;
use tikv::raftstore::store::util::find_peer;
use tikv::server::transport::ServerRaftStoreRouter;
use tikv::storage::kv::{
Engine, Error as EngineError, RegionInfoProvider, ScanMode, StatisticsSummary,
};
use tikv::storage::txn::{
EntryBatch, Error as TxnError, Msg, Scanner, SnapshotStore, Store, TxnEntryScanner,
TxnEntryStore,
};
use tikv::storage::kv::{Engine, RegionInfoProvider};
use tikv::storage::txn::{EntryBatch, SnapshotStore, TxnEntryScanner, TxnEntryStore};
use tikv::storage::{Key, Statistics};
use tikv_util::worker::{Runnable, RunnableWithTimer};
use tikv_util::worker::Runnable;
use tokio_threadpool::{Builder as ThreadPoolBuilder, ThreadPool};

use crate::metrics::*;
Expand Down Expand Up @@ -141,12 +129,14 @@ pub struct Endpoint<E: Engine, R: RegionInfoProvider> {

impl<E: Engine, R: RegionInfoProvider> Endpoint<E, R> {
pub fn new(store_id: u64, engine: E, region_info: R, db: Arc<DB>) -> Endpoint<E, R> {
let workers = ThreadPoolBuilder::new().name_prefix("backworker").build();
let workers = ThreadPoolBuilder::new()
.name_prefix("backworker")
.pool_size(8) // TODO: make it configure.
.build();
Endpoint {
store_id,
engine,
region_info,
// TODO: support more config.
workers,
db,
}
Expand Down Expand Up @@ -326,7 +316,7 @@ impl<E: Engine, R: RegionInfoProvider> Endpoint<E, R> {
response.set_end_key(end_key.clone());
match res {
Ok((mut files, stat)) => {
info!("backup region finish";
debug!("backup region finish";
"region" => ?brange.region,
"start_key" => hex::encode_upper(&start_key),
"end_key" => hex::encode_upper(&end_key),
Expand Down Expand Up @@ -439,18 +429,14 @@ pub mod tests {
use external_storage::LocalStorage;
use futures::{Future, Stream};
use kvproto::metapb;
use std::collections::BTreeMap;
use std::sync::mpsc::{channel, Receiver, Sender};
use tempfile::TempDir;
use tikv::raftstore::coprocessor::RegionCollector;
use tikv::raftstore::coprocessor::{RegionInfo, SeekRegionCallback};
use tikv::raftstore::coprocessor::SeekRegionCallback;
use tikv::raftstore::store::util::new_peer;
use tikv::storage::kv::Result as EngineResult;
use tikv::storage::mvcc::tests::*;
use tikv::storage::SHORT_VALUE_MAX_LEN;
use tikv::storage::{
Mutation, Options, RocksEngine, Storage, TestEngineBuilder, TestStorageBuilder,
};
use tikv::storage::{RocksEngine, TestEngineBuilder};

#[derive(Clone)]
pub struct MockRegionInfoProvider {
Expand Down
18 changes: 0 additions & 18 deletions components/backup/src/lib.rs
@@ -1,28 +1,10 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

#![recursion_limit = "200"]
// TODO: remove it after all code been merged.
#![allow(unused_imports)]

#[macro_use(
kv,
slog_kv,
slog_trace,
slog_debug,
slog_info,
slog_warn,
slog_error,
slog_record,
slog_b,
slog_log,
slog_record_static
)]
extern crate slog;
#[macro_use]
extern crate slog_global;
#[macro_use]
extern crate serde_derive;
#[macro_use]
extern crate failure;
#[allow(unused_extern_crates)]
extern crate tikv_alloc;
Expand Down

0 comments on commit 4e787a9

Please sign in to comment.