Skip to content

Commit

Permalink
Extract sst_importer (tikv#5438)
Browse files Browse the repository at this point in the history
Signed-off-by: Brian Anderson <andersrb@gmail.com>
  • Loading branch information
brson authored and sre-bot committed Sep 11, 2019
1 parent f6aab53 commit e52cb76
Show file tree
Hide file tree
Showing 13 changed files with 122 additions and 50 deletions.
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Expand Up @@ -120,6 +120,7 @@ engine = { path = "components/engine" }
tidb_query = { path = "components/tidb_query" }
pd_client = { path = "components/pd_client" }
keys = { path = "components/keys" }
sst_importer = { path = "components/sst_importer" }

[dependencies.murmur3]
git = "https://github.com/pingcap/murmur3.git"
Expand Down Expand Up @@ -189,6 +190,7 @@ members = [
"components/external_storage",
"components/backup",
"components/keys",
"components/sst_importer",
"cmd",
]
default-members = ["cmd"]
Expand Down
29 changes: 29 additions & 0 deletions components/sst_importer/Cargo.toml
@@ -0,0 +1,29 @@
[package]
name = "sst_importer"
version = "0.1.0"
edition = "2018"
publish = false

[dependencies]
crc = "1.8"
futures = "0.1"
grpcio = { version = "0.5.0-alpha.3", features = [ "openssl-vendored" ] }
lazy_static = "1.3"
quick-error = "1.2.2"
serde = "1.0"
serde_derive = "1.0"
uuid = { version = "0.6", features = [ "serde", "v4" ] }
slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] }
slog-global = { version = "0.1", git = "https://github.com/breeswish/slog-global.git", rev = "91904ade" }
kvproto = { git = "https://github.com/pingcap/kvproto.git" }

engine = { path = "../engine" }
keys = { path = "../keys" }
tikv_alloc = { path = "../tikv_alloc", default-features = false }

[dependencies.prometheus]
version = "0.4.2"
default-features = false

[dev-dependencies]
tempfile = "3.0"
File renamed without changes.
Expand Up @@ -9,8 +9,6 @@ use futures::sync::oneshot::Canceled;
use grpcio::Error as GrpcError;
use uuid::ParseError;

use crate::raftstore::errors::Error as RaftStoreError;

quick_error! {
#[derive(Debug)]
pub enum Error {
Expand Down Expand Up @@ -42,11 +40,6 @@ quick_error! {
description("Engine error")
display("Engine {:?}", err)
}
RaftStore(err: RaftStoreError) {
from()
cause(err)
description(err.description())
}
ParseIntError(err: ParseIntError) {
from()
cause(err)
Expand Down
Expand Up @@ -5,6 +5,8 @@ use kvproto::import_sstpb::*;

use super::Result;

type RocksDBMetricsFn = fn(cf: &str, name: &str, v: f64);

pub struct ImportModeSwitcher {
mode: SwitchMode,
backup_db_options: ImportModeDBOptions,
Expand All @@ -20,21 +22,21 @@ impl ImportModeSwitcher {
}
}

pub fn enter_normal_mode(&mut self, db: &DB) -> Result<()> {
pub fn enter_normal_mode(&mut self, db: &DB, mf: RocksDBMetricsFn) -> Result<()> {
if self.mode == SwitchMode::Normal {
return Ok(());
}

self.backup_db_options.set_options(db)?;
for (cf_name, cf_opts) in &self.backup_cf_options {
cf_opts.set_options(db, cf_name)?;
cf_opts.set_options(db, cf_name, mf)?;
}

self.mode = SwitchMode::Normal;
Ok(())
}

pub fn enter_import_mode(&mut self, db: &DB) -> Result<()> {
pub fn enter_import_mode(&mut self, db: &DB, mf: RocksDBMetricsFn) -> Result<()> {
if self.mode == SwitchMode::Import {
return Ok(());
}
Expand All @@ -48,7 +50,7 @@ impl ImportModeSwitcher {
for cf_name in db.cf_names() {
let cf_opts = ImportModeCFOptions::new_options(db, cf_name);
self.backup_cf_options.push((cf_name.to_owned(), cf_opts));
import_cf_options.set_options(db, cf_name)?;
import_cf_options.set_options(db, cf_name, mf)?;
}

self.mode = SwitchMode::Import;
Expand Down Expand Up @@ -117,14 +119,11 @@ impl ImportModeCFOptions {
}
}

fn set_options(&self, db: &DB, cf_name: &str) -> Result<()> {
use crate::server::CONFIG_ROCKSDB_GAUGE;
fn set_options(&self, db: &DB, cf_name: &str, mf: RocksDBMetricsFn) -> Result<()> {
let cf = db.cf_handle(cf_name).unwrap();
let cf_opts = db.get_options_cf(cf);
cf_opts.set_block_cache_capacity(self.block_cache_size)?;
CONFIG_ROCKSDB_GAUGE
.with_label_values(&[cf_name, "block_cache_size"])
.set(self.block_cache_size as f64);
mf(cf_name, "block_cache_size", self.block_cache_size as f64);
let opts = [
(
"level0_stop_writes_trigger".to_owned(),
Expand All @@ -148,9 +147,7 @@ impl ImportModeCFOptions {
db.set_options_cf(cf, tmp_opts.as_slice())?;
for (key, value) in &opts {
if let Ok(v) = value.parse::<f64>() {
CONFIG_ROCKSDB_GAUGE
.with_label_values(&[cf_name, key])
.set(v);
mf(cf_name, key, v);
}
}
Ok(())
Expand Down Expand Up @@ -214,15 +211,17 @@ mod tests {
let import_cf_options = ImportModeCFOptions::new();
let normal_cf_options = ImportModeCFOptions::new_options(&db, "default");

fn mf(_cf: &str, _name: &str, _v: f64) {}

let mut switcher = ImportModeSwitcher::new();
check_import_options(&db, &normal_db_options, &normal_cf_options);
switcher.enter_import_mode(&db).unwrap();
switcher.enter_import_mode(&db, mf).unwrap();
check_import_options(&db, &import_db_options, &import_cf_options);
switcher.enter_import_mode(&db).unwrap();
switcher.enter_import_mode(&db, mf).unwrap();
check_import_options(&db, &import_db_options, &import_cf_options);
switcher.enter_normal_mode(&db).unwrap();
switcher.enter_normal_mode(&db, mf).unwrap();
check_import_options(&db, &normal_db_options, &normal_cf_options);
switcher.enter_normal_mode(&db).unwrap();
switcher.enter_normal_mode(&db, mf).unwrap();
check_import_options(&db, &normal_db_options, &normal_cf_options);
}
}
30 changes: 30 additions & 0 deletions components/sst_importer/src/lib.rs
@@ -0,0 +1,30 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.

//! Importing RocksDB SST files into TiKV

#[macro_use]
extern crate lazy_static;
#[macro_use]
extern crate quick_error;
#[macro_use]
extern crate serde_derive;
#[macro_use(slog_error, slog_warn, slog_info)]
extern crate slog;
#[macro_use]
extern crate slog_global;
#[allow(unused_extern_crates)]
extern crate tikv_alloc;

mod config;
mod errors;
pub mod metrics;
#[macro_use]
pub mod service;
pub mod import_mode;
pub mod sst_importer;

pub mod test_helpers;

pub use self::config::Config;
pub use self::errors::{Error, Result};
pub use self::sst_importer::SSTImporter;
File renamed without changes.
14 changes: 6 additions & 8 deletions src/import/service.rs → components/sst_importer/src/service.rs
Expand Up @@ -2,23 +2,21 @@

use futures::Future;
use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink};
use std::fmt::Debug;

use super::Error;

pub fn make_rpc_error(err: Error) -> RpcStatus {
pub fn make_rpc_error<E: Debug>(err: E) -> RpcStatus {
// FIXME: Just spewing debug error formatting here seems pretty unfriendly
RpcStatus::new(RpcStatusCode::UNKNOWN, Some(format!("{:?}", err)))
}

pub fn send_rpc_error<M, E>(ctx: RpcContext<'_>, sink: UnarySink<M>, error: E)
where
Error: From<E>,
{
let err = make_rpc_error(Error::from(error));
pub fn send_rpc_error<M, E: Debug>(ctx: RpcContext<'_>, sink: UnarySink<M>, error: E) {
let err = make_rpc_error(error);
ctx.spawn(sink.fail(err).map_err(|e| {
warn!("send rpc failed"; "err" => %e);
}));
}

#[macro_export]
macro_rules! send_rpc_response {
($res:ident, $sink:ident, $label:ident, $timer:ident) => {{
let res = match $res {
Expand Down
Expand Up @@ -315,7 +315,7 @@ fn path_to_sst_meta<P: AsRef<Path>>(path: P) -> Result<SstMeta> {
#[cfg(test)]
mod tests {
use super::*;
use crate::import::test_helpers::*;
use crate::test_helpers::*;

use engine::rocks::util::new_engine;
use tempfile::Builder;
Expand Down
Expand Up @@ -7,7 +7,6 @@ use crc::crc32::{self, Hasher32};
use kvproto::import_sstpb::*;
use uuid::Uuid;

use crate::raftstore::store::keys;
use engine::rocks::{SstWriterBuilder, DB};

pub fn calc_data_crc32(data: &[u8]) -> u32 {
Expand Down
15 changes: 4 additions & 11 deletions src/import/mod.rs
Expand Up @@ -12,18 +12,11 @@
//! thread to notify it of the ingesting operation. This service is running
//! inside TiKV because it needs to interact with raftstore.

mod config;
mod errors;
mod metrics;
#[macro_use]
mod service;
mod import_mode;
mod sst_importer;
mod sst_service;

pub mod test_helpers;
pub use sst_importer::test_helpers;

pub use self::config::Config;
pub use self::errors::{Error, Result};
pub use self::sst_importer::SSTImporter;
pub use self::sst_service::ImportSSTService;
pub use sst_importer::Config;
pub use sst_importer::SSTImporter;
pub use sst_importer::{Error, Result};
18 changes: 12 additions & 6 deletions src/import/sst_service.rs
Expand Up @@ -13,13 +13,15 @@ use kvproto::raft_cmdpb::*;

use crate::raftstore::store::Callback;
use crate::server::transport::RaftStoreRouter;
use crate::server::CONFIG_ROCKSDB_GAUGE;
use sst_importer::send_rpc_response;
use tikv_util::future::paired_future_callback;
use tikv_util::time::Instant;

use super::import_mode::*;
use super::metrics::*;
use super::service::*;
use super::{Config, Error, SSTImporter};
use sst_importer::import_mode::*;
use sst_importer::metrics::*;
use sst_importer::service::*;
use sst_importer::{Config, Error, SSTImporter};

/// ImportSSTService provides tikv-server with the ability to ingest SST files.
///
Expand Down Expand Up @@ -69,9 +71,13 @@ impl<Router: RaftStoreRouter> ImportSst for ImportSSTService<Router> {

let res = {
let mut switcher = self.switcher.lock().unwrap();
fn mf(cf: &str, name: &str, v: f64) {
CONFIG_ROCKSDB_GAUGE.with_label_values(&[cf, name]).set(v);
}

match req.get_mode() {
SwitchMode::Normal => switcher.enter_normal_mode(&self.engine),
SwitchMode::Import => switcher.enter_import_mode(&self.engine),
SwitchMode::Normal => switcher.enter_normal_mode(&self.engine, mf),
SwitchMode::Import => switcher.enter_import_mode(&self.engine, mf),
}
};
match res {
Expand Down

0 comments on commit e52cb76

Please sign in to comment.