Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement IngestSST API #2712

Merged
merged 2 commits into from Mar 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion src/bin/tikv-server.rs
Expand Up @@ -223,7 +223,11 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig, security_mgr: Arc<Sec
);

let importer = Arc::new(SSTImporter::new(import_path).unwrap());
let import_service = ImportSSTService::new(cfg.import.clone(), storage.clone(), importer);
let import_service = ImportSSTService::new(
cfg.import.clone(),
raft_router.clone(),
Arc::clone(&importer),
);

let server_cfg = Arc::new(cfg.server.clone());
// Create server
Expand Down Expand Up @@ -255,6 +259,7 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig, security_mgr: Arc<Sec
significant_msg_receiver,
pd_worker,
coprocessor_host,
importer,
).unwrap_or_else(|e| fatal!("failed to start node: {:?}", e));
initial_metric(&cfg.metric, Some(node.id()));

Expand Down
15 changes: 15 additions & 0 deletions src/import/errors.rs
Expand Up @@ -12,13 +12,15 @@
// limitations under the License.

use std::io::Error as IoError;
use std::num::ParseIntError;
use std::path::PathBuf;
use std::result;

use futures::sync::oneshot::Canceled;
use grpc::Error as GrpcError;
use uuid::ParseError;

use raftstore::errors::Error as RaftStoreError;
use util::codec::Error as CodecError;

quick_error! {
Expand Down Expand Up @@ -52,6 +54,16 @@ quick_error! {
from()
display("RocksDB {}", msg)
}
RaftStore(err: RaftStoreError) {
from()
cause(err)
description(err.description())
}
ParseIntError(err: ParseIntError) {
from()
cause(err)
description(err.description())
}
FileExists(path: PathBuf) {
display("File {:?} exists", path)
}
Expand All @@ -61,6 +73,9 @@ quick_error! {
FileCorrupted(path: PathBuf, reason: String) {
display("File {:?} corrupted: {}", path, reason)
}
InvalidSSTPath(path: PathBuf) {
display("Invalid SST path {:?}", path)
}
TokenExists(token: usize) {
display("Token {} exists", token)
}
Expand Down
13 changes: 12 additions & 1 deletion src/import/service.rs
Expand Up @@ -11,14 +11,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use grpc::{RpcStatus, RpcStatusCode};
use grpc::{RpcContext, RpcStatus, RpcStatusCode, UnarySink};
use futures::Future;

use super::Error;

pub fn make_rpc_error(err: Error) -> RpcStatus {
RpcStatus::new(RpcStatusCode::Unknown, Some(format!("{:?}", err)))
}

pub fn send_rpc_error<M, E>(ctx: RpcContext, sink: UnarySink<M>, error: E)
where
Error: From<E>,
{
let err = make_rpc_error(Error::from(error));
ctx.spawn(sink.fail(err).map_err(|e| {
warn!("send rpc error: {:?}", e);
}));
}

macro_rules! send_rpc_response {
($res:ident, $sink: ident, $label:ident, $timer:ident) => ({
let res = match $res {
Expand Down
115 changes: 115 additions & 0 deletions src/import/sst_importer.rs
Expand Up @@ -21,8 +21,10 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use crc::crc32::{self, Hasher32};
use uuid::Uuid;
use kvproto::importpb::*;
use rocksdb::{IngestExternalFileOptions, DB};

use util::collections::HashMap;
use util::rocksdb::{get_cf_handle, prepare_sst_for_ingestion, validate_sst_for_ingestion};

use super::{Error, Result};

Expand Down Expand Up @@ -120,6 +122,23 @@ impl SSTImporter {
}
}
}

pub fn ingest(&self, meta: &SSTMeta, db: &DB) -> Result<()> {
match self.dir.ingest(meta, db) {
Ok(_) => {
info!("ingest {:?}", meta);
Ok(())
}
Err(e) => {
error!("ingest {:?}: {:?}", meta, e);
Err(e)
}
}
}

pub fn list_ssts(&self) -> Result<Vec<SSTMeta>> {
self.dir.list_ssts()
}
}

// TODO: Add size and rate limit.
Expand Down Expand Up @@ -185,6 +204,35 @@ impl ImportDir {
}
Ok(path)
}

fn ingest(&self, meta: &SSTMeta, db: &DB) -> Result<()> {
let path = self.join(meta)?;
let cf = meta.get_cf_name();
prepare_sst_for_ingestion(&path.save, &path.clone)?;
validate_sst_for_ingestion(db, cf, &path.clone, meta.get_length(), meta.get_crc32())?;

let handle = get_cf_handle(db, cf)?;
let mut opts = IngestExternalFileOptions::new();
opts.move_files(true);
db.ingest_external_file_cf(handle, &opts, &[path.clone.to_str().unwrap()])?;
Ok(())
}

fn list_ssts(&self) -> Result<Vec<SSTMeta>> {
let mut ssts = Vec::new();
for e in fs::read_dir(&self.root_dir)? {
let e = e?;
if !e.file_type()?.is_file() {
continue;
}
let path = e.path();
match path_to_sst_meta(&path) {
Ok(sst) => ssts.push(sst),
Err(e) => error!("{}: {:?}", path.to_str().unwrap(), e),
}
}
Ok(ssts)
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -301,12 +349,42 @@ fn sst_meta_to_path(meta: &SSTMeta) -> Result<PathBuf> {
)))
}

fn path_to_sst_meta<P: AsRef<Path>>(path: P) -> Result<SSTMeta> {
let path = path.as_ref();
let file_name = match path.file_name().and_then(|n| n.to_str()) {
Some(name) => name,
None => return Err(Error::InvalidSSTPath(path.to_owned())),
};

// A valid file name should be in the format:
// "{uuid}_{region_id}_{region_epoch.conf_ver}_{region_epoch.version}.sst"
if !file_name.ends_with(SST_SUFFIX) {
return Err(Error::InvalidSSTPath(path.to_owned()));
}
let elems: Vec<_> = file_name
.trim_right_matches(SST_SUFFIX)
.split('_')
.collect();
if elems.len() != 4 {
return Err(Error::InvalidSSTPath(path.to_owned()));
}

let mut meta = SSTMeta::new();
let uuid = Uuid::parse_str(elems[0])?;
meta.set_uuid(uuid.as_bytes().to_vec());
meta.set_region_id(elems[1].parse()?);
meta.mut_region_epoch().set_conf_ver(elems[2].parse()?);
meta.mut_region_epoch().set_version(elems[3].parse()?);
Ok(meta)
}

#[cfg(test)]
mod tests {
use super::*;
use import::test_helpers::*;

use tempdir::TempDir;
use util::rocksdb::new_engine;

#[test]
fn test_import_dir() {
Expand Down Expand Up @@ -338,6 +416,40 @@ mod tests {
assert!(!path.save.exists());
assert!(!path.clone.exists());
}

// Test ImportDir::ingest()

let db_path = temp_dir.path().join("db");
let db = new_engine(db_path.to_str().unwrap(), &["default"], None).unwrap();

let cases = vec![(0, 10), (5, 15), (10, 20), (0, 100)];

let mut ingested = Vec::new();

for (i, &range) in cases.iter().enumerate() {
let path = temp_dir.path().join(format!("{}.sst", i));
let (meta, data) = gen_sst_file(&path, range);

let mut f = dir.create(&meta).unwrap();
f.append(&data).unwrap();
f.finish().unwrap();

dir.ingest(&meta, &db).unwrap();
check_db_range(&db, range);

ingested.push(meta);
}

let ssts = dir.list_ssts().unwrap();
assert_eq!(ssts.len(), ingested.len());
for sst in &ssts {
ingested
.iter()
.find(|s| s.get_uuid() == sst.get_uuid())
.unwrap();
dir.delete(sst).unwrap();
}
assert!(dir.list_ssts().unwrap().is_empty());
}

#[test]
Expand Down Expand Up @@ -398,5 +510,8 @@ mod tests {
let path = sst_meta_to_path(&meta).unwrap();
let expected_path = format!("{}_1_2_3.sst", uuid);
assert_eq!(path.to_str().unwrap(), &expected_path);

let new_meta = path_to_sst_meta(path).unwrap();
assert_eq!(meta, new_meta);
}
}