From 2645bd0c02b2d1cea0ea4c2e0aef94d0d85d1473 Mon Sep 17 00:00:00 2001 From: Huachao Huang Date: Tue, 9 Jan 2018 20:02:14 +0800 Subject: [PATCH 1/2] Implement ImportSST::Ingest API Client must guarantee that the SST file has been uploaded to the servers where the region's peers locate in, before issuing an ingest request to the region's leader. The SST metadata provided in the ingest request will first be replicated through Raft. Then when the log entry is applied, the coresponding uploaded SST file will be ingested to the local engine. To make sure that the region's range and peers are not changed on during this process, the region's epoch will be checked before ingestion. --- Cargo.toml | 3 +- src/bin/tikv-server.rs | 7 +- src/import/errors.rs | 15 ++ src/import/service.rs | 13 +- src/import/sst_importer.rs | 115 ++++++++++ src/import/sst_service.rs | 84 ++++++-- src/raftstore/store/peer.rs | 5 +- src/raftstore/store/store.rs | 33 ++- src/raftstore/store/worker/apply.rs | 252 +++++++++++++++++++--- src/raftstore/store/worker/cleanup_sst.rs | 60 ++++++ src/raftstore/store/worker/mod.rs | 2 + src/server/node.rs | 5 + src/server/server.rs | 2 +- tests/import/sst_service.rs | 61 +++++- tests/raftstore/node.rs | 8 + tests/raftstore/server.rs | 10 +- tests/raftstore/util.rs | 4 +- tests/raftstore_cases/test_bootstrap.rs | 7 + 18 files changed, 618 insertions(+), 68 deletions(-) create mode 100644 src/raftstore/store/worker/cleanup_sst.rs diff --git a/Cargo.toml b/Cargo.toml index 0a8614d51b8..2036c33d683 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,7 +79,8 @@ signal = "0.4" git = "https://github.com/pingcap/rust-rocksdb.git" [dependencies.kvproto] -git = "https://github.com/pingcap/kvproto.git" +git = "https://github.com/huachaohuang/kvproto.git" +branch = "ingest-sst" [dependencies.tipb] git = "https://github.com/pingcap/tipb.git" diff --git a/src/bin/tikv-server.rs b/src/bin/tikv-server.rs index 1119e70786e..4694896e55f 100644 --- a/src/bin/tikv-server.rs +++ b/src/bin/tikv-server.rs @@ -223,7 +223,11 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig, security_mgr: Arc RpcStatus { RpcStatus::new(RpcStatusCode::Unknown, Some(format!("{:?}", err))) } +pub fn send_rpc_error(ctx: RpcContext, sink: UnarySink, error: E) +where + Error: From, +{ + let err = make_rpc_error(Error::from(error)); + ctx.spawn(sink.fail(err).map_err(|e| { + warn!("send rpc error: {:?}", e); + })); +} + macro_rules! send_rpc_response { ($res:ident, $sink: ident, $label:ident, $timer:ident) => ({ let res = match $res { diff --git a/src/import/sst_importer.rs b/src/import/sst_importer.rs index 5db0ec0bedd..f61b65a2f85 100644 --- a/src/import/sst_importer.rs +++ b/src/import/sst_importer.rs @@ -21,8 +21,10 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use crc::crc32::{self, Hasher32}; use uuid::Uuid; use kvproto::importpb::*; +use rocksdb::{IngestExternalFileOptions, DB}; use util::collections::HashMap; +use util::rocksdb::{get_cf_handle, prepare_sst_for_ingestion, validate_sst_for_ingestion}; use super::{Error, Result}; @@ -120,6 +122,23 @@ impl SSTImporter { } } } + + pub fn ingest(&self, meta: &SSTMeta, db: &DB) -> Result<()> { + match self.dir.ingest(meta, db) { + Ok(_) => { + info!("ingest {:?}", meta); + Ok(()) + } + Err(e) => { + error!("ingest {:?}: {:?}", meta, e); + Err(e) + } + } + } + + pub fn list_ssts(&self) -> Result> { + self.dir.list_ssts() + } } // TODO: Add size and rate limit. @@ -185,6 +204,35 @@ impl ImportDir { } Ok(path) } + + fn ingest(&self, meta: &SSTMeta, db: &DB) -> Result<()> { + let path = self.join(meta)?; + let cf = meta.get_cf_name(); + prepare_sst_for_ingestion(&path.save, &path.clone)?; + validate_sst_for_ingestion(db, cf, &path.clone, meta.get_length(), meta.get_crc32())?; + + let handle = get_cf_handle(db, cf)?; + let mut opts = IngestExternalFileOptions::new(); + opts.move_files(true); + db.ingest_external_file_cf(handle, &opts, &[path.clone.to_str().unwrap()])?; + Ok(()) + } + + fn list_ssts(&self) -> Result> { + let mut ssts = Vec::new(); + for e in fs::read_dir(&self.root_dir)? { + let e = e?; + if !e.file_type()?.is_file() { + continue; + } + let path = e.path(); + match path_to_sst_meta(&path) { + Ok(sst) => ssts.push(sst), + Err(e) => error!("{}: {:?}", path.to_str().unwrap(), e), + } + } + Ok(ssts) + } } #[derive(Clone)] @@ -301,12 +349,42 @@ fn sst_meta_to_path(meta: &SSTMeta) -> Result { ))) } +fn path_to_sst_meta>(path: P) -> Result { + let path = path.as_ref(); + let file_name = match path.file_name().and_then(|n| n.to_str()) { + Some(name) => name, + None => return Err(Error::InvalidSSTPath(path.to_owned())), + }; + + // A valid file name should be in the format: + // "{uuid}_{region_id}_{region_epoch.conf_ver}_{region_epoch.version}.sst" + if !file_name.ends_with(SST_SUFFIX) { + return Err(Error::InvalidSSTPath(path.to_owned())); + } + let elems: Vec<_> = file_name + .trim_right_matches(SST_SUFFIX) + .split('_') + .collect(); + if elems.len() != 4 { + return Err(Error::InvalidSSTPath(path.to_owned())); + } + + let mut meta = SSTMeta::new(); + let uuid = Uuid::parse_str(elems[0])?; + meta.set_uuid(uuid.as_bytes().to_vec()); + meta.set_region_id(elems[1].parse()?); + meta.mut_region_epoch().set_conf_ver(elems[2].parse()?); + meta.mut_region_epoch().set_version(elems[3].parse()?); + Ok(meta) +} + #[cfg(test)] mod tests { use super::*; use import::test_helpers::*; use tempdir::TempDir; + use util::rocksdb::new_engine; #[test] fn test_import_dir() { @@ -338,6 +416,40 @@ mod tests { assert!(!path.save.exists()); assert!(!path.clone.exists()); } + + // Test ImportDir::ingest() + + let db_path = temp_dir.path().join("db"); + let db = new_engine(db_path.to_str().unwrap(), &["default"], None).unwrap(); + + let cases = vec![(0, 10), (5, 15), (10, 20), (0, 100)]; + + let mut ingested = Vec::new(); + + for (i, &range) in cases.iter().enumerate() { + let path = temp_dir.path().join(format!("{}.sst", i)); + let (meta, data) = gen_sst_file(&path, range); + + let mut f = dir.create(&meta).unwrap(); + f.append(&data).unwrap(); + f.finish().unwrap(); + + dir.ingest(&meta, &db).unwrap(); + check_db_range(&db, range); + + ingested.push(meta); + } + + let ssts = dir.list_ssts().unwrap(); + assert_eq!(ssts.len(), ingested.len()); + for sst in &ssts { + ingested + .iter() + .find(|s| s.get_uuid() == sst.get_uuid()) + .unwrap(); + dir.delete(sst).unwrap(); + } + assert!(dir.list_ssts().unwrap().is_empty()); } #[test] @@ -398,5 +510,8 @@ mod tests { let path = sst_meta_to_path(&meta).unwrap(); let expected_path = format!("{}_1_2_3.sst", uuid); assert_eq!(path.to_str().unwrap(), &expected_path); + + let new_meta = path_to_sst_meta(path).unwrap(); + assert_eq!(meta, new_meta); } } diff --git a/src/import/sst_service.rs b/src/import/sst_service.rs index 57affee1a50..f126ee0dc37 100644 --- a/src/import/sst_service.rs +++ b/src/import/sst_service.rs @@ -14,43 +14,50 @@ use std::sync::Arc; use grpc::{ClientStreamingSink, RequestStream, RpcContext, UnarySink}; -use futures::{Future, Stream}; +use futures::{future, Future, Stream}; use futures::sync::mpsc; use futures_cpupool::{Builder, CpuPool}; use kvproto::importpb::*; use kvproto::importpb_grpc::*; +use kvproto::raft_cmdpb::*; -use storage::Storage; +use util::future::paired_future_callback; use util::time::Instant; +use raftstore::store::Callback; +use server::transport::RaftStoreRouter; use super::service::*; use super::metrics::*; use super::{Config, Error, SSTImporter}; #[derive(Clone)] -pub struct ImportSSTService { +pub struct ImportSSTService { cfg: Config, + router: Router, threads: CpuPool, - storage: Storage, importer: Arc, } -impl ImportSSTService { - pub fn new(cfg: Config, storage: Storage, importer: Arc) -> ImportSSTService { +impl ImportSSTService { + pub fn new( + cfg: Config, + router: Router, + importer: Arc, + ) -> ImportSSTService { let threads = Builder::new() .name_prefix("sst-importer") .pool_size(cfg.num_threads) .create(); ImportSSTService { cfg: cfg, + router: router, threads: threads, - storage: storage, importer: importer, } } } -impl ImportSst for ImportSSTService { +impl ImportSst for ImportSSTService { fn upload( &self, ctx: RpcContext, @@ -61,18 +68,15 @@ impl ImportSst for ImportSSTService { let timer = Instant::now_coarse(); let token = self.importer.token(); - let thread1 = self.threads.clone(); - let thread2 = self.threads.clone(); let import1 = Arc::clone(&self.importer); let import2 = Arc::clone(&self.importer); let bounded_stream = mpsc::spawn(stream, &self.threads, self.cfg.stream_channel_window); ctx.spawn( - bounded_stream - .map_err(Error::from) - .for_each(move |chunk| { - let import1 = Arc::clone(&import1); - thread1.spawn_fn(move || { + self.threads.spawn( + bounded_stream + .map_err(Error::from) + .for_each(move |chunk| { let start = Instant::now_coarse(); if chunk.has_meta() { import1.create(token, chunk.get_meta())?; @@ -85,9 +89,7 @@ impl ImportSst for ImportSSTService { IMPORT_UPLOAD_CHUNK_DURATION.observe(start.elapsed_secs()); Ok(()) }) - }) - .then(move |res| { - thread2.spawn_fn(move || match res { + .then(move |res| match res { Ok(_) => import2.finish(token), Err(e) => { if let Some(f) = import2.remove(token) { @@ -96,13 +98,49 @@ impl ImportSst for ImportSSTService { Err(e) } }) - }) - .map(|_| UploadResponse::new()) - .then(move |res| send_rpc_response!(res, sink, label, timer)), + .map(|_| UploadResponse::new()) + .then(move |res| send_rpc_response!(res, sink, label, timer)), + ), ) } - fn ingest(&self, _: RpcContext, _: IngestRequest, _: UnarySink) { - unimplemented!(); + fn ingest(&self, ctx: RpcContext, mut req: IngestRequest, sink: UnarySink) { + let label = "ingest"; + let timer = Instant::now_coarse(); + + // Make ingest command. + let mut ingest = Request::new(); + ingest.set_cmd_type(CmdType::IngestSST); + ingest.mut_ingest_sst().set_sst(req.take_sst()); + let mut context = req.take_context(); + let mut header = RaftRequestHeader::new(); + header.set_peer(context.take_peer()); + header.set_region_id(context.get_region_id()); + header.set_region_epoch(context.take_region_epoch()); + let mut cmd = RaftCmdRequest::new(); + cmd.set_header(header); + cmd.mut_requests().push(ingest); + + let (cb, future) = paired_future_callback(); + if let Err(e) = self.router.send_command(cmd, Callback::Write(cb)) { + return send_rpc_error(ctx, sink, e); + } + + ctx.spawn( + future + .map_err(Error::from) + .then(|res| match res { + Ok(mut res) => { + let mut resp = IngestResponse::new(); + let mut header = res.response.take_header(); + if header.has_error() { + resp.set_error(header.take_error()); + } + future::ok(resp) + } + Err(e) => future::err(e), + }) + .then(move |res| send_rpc_response!(res, sink, label, timer)), + ) } } diff --git a/src/raftstore/store/peer.rs b/src/raftstore/store/peer.rs index 54907e37963..e0d20a97c84 100644 --- a/src/raftstore/store/peer.rs +++ b/src/raftstore/store/peer.rs @@ -1091,7 +1091,9 @@ impl Peer { for r in req.get_requests() { match r.get_cmd_type() { CmdType::Get | CmdType::Snap => is_read = true, - CmdType::Delete | CmdType::Put | CmdType::DeleteRange => is_write = true, + CmdType::Delete | CmdType::Put | CmdType::DeleteRange | CmdType::IngestSST => { + is_write = true + } CmdType::Prewrite | CmdType::Invalid => { return Err(box_err!( "invalid cmd type {:?}, message maybe currupted", @@ -1638,6 +1640,7 @@ impl Peer { | CmdType::Put | CmdType::Delete | CmdType::DeleteRange + | CmdType::IngestSST | CmdType::Invalid => unreachable!(), }; resp.set_cmd_type(cmd_type); diff --git a/src/raftstore/store/store.rs b/src/raftstore/store/store.rs index 890dee84897..378a337b15a 100644 --- a/src/raftstore/store/store.rs +++ b/src/raftstore/store/store.rs @@ -31,6 +31,7 @@ use kvproto::raft_serverpb::{PeerState, RaftMessage, RaftSnapshotData, RaftTrunc RegionLocalState}; use raft::eraftpb::{ConfChangeType, MessageType}; use kvproto::pdpb::StoreStats; +use kvproto::importpb::SSTMeta; use util::{escape, rocksdb}; use util::time::{duration_to_sec, SlowTimer}; use pd::{PdClient, PdRunner, PdTask}; @@ -49,9 +50,11 @@ use util::sys as util_sys; use storage::{CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE}; use raftstore::coprocessor::CoprocessorHost; use raftstore::coprocessor::split_observer::SplitObserver; -use super::worker::{ApplyRunner, ApplyTask, ApplyTaskRes, CompactRunner, CompactTask, - ConsistencyCheckRunner, ConsistencyCheckTask, RaftlogGcRunner, RaftlogGcTask, - RegionRunner, RegionTask, SplitCheckRunner, SplitCheckTask}; +use import::SSTImporter; +use super::worker::{ApplyRunner, ApplyTask, ApplyTaskRes, CleanupSSTRunner, CleanupSSTTask, + CompactRunner, CompactTask, ConsistencyCheckRunner, ConsistencyCheckTask, + RaftlogGcRunner, RaftlogGcTask, RegionRunner, RegionTask, SplitCheckRunner, + SplitCheckTask}; use super::worker::apply::{ChangePeer, ExecResult}; use super::{util, Msg, SignificantMsg, SnapKey, SnapManager, SnapshotDeleter, Tick}; use super::keys::{self, data_end_key, data_key, enc_end_key, enc_start_key}; @@ -148,6 +151,7 @@ pub struct Store { compact_worker: Worker, pd_worker: FutureWorker, consistency_check_worker: Worker, + cleanup_sst_worker: Worker, pub apply_worker: Worker, apply_res_receiver: Option>, @@ -156,6 +160,8 @@ pub struct Store { pub coprocessor_host: Arc, + pub importer: Arc, + snap_mgr: SnapManager, raft_metrics: RaftMetrics, @@ -197,6 +203,7 @@ impl Store { mgr: SnapManager, pd_worker: FutureWorker, mut coprocessor_host: CoprocessorHost, + importer: Arc, ) -> Result> { // TODO: we can get cluster meta regularly too later. cfg.validate()?; @@ -224,6 +231,7 @@ impl Store { compact_worker: Worker::new("compact worker"), pd_worker: pd_worker, consistency_check_worker: Worker::new("consistency check worker"), + cleanup_sst_worker: Worker::new("cleanup sst worker"), apply_worker: Worker::new("apply worker"), apply_res_receiver: None, region_ranges: BTreeMap::new(), @@ -231,6 +239,7 @@ impl Store { trans: trans, pd_client: pd_client, coprocessor_host: Arc::new(coprocessor_host), + importer: importer, snap_mgr: mgr, raft_metrics: RaftMetrics::default(), entry_cache_metries: Rc::new(RefCell::new(CacheQueryStats::default())), @@ -534,6 +543,9 @@ impl Store { .start(consistency_check_runner) ); + let cleanup_sst_runner = CleanupSSTRunner::new(Arc::clone(&self.importer)); + box_try!(self.cleanup_sst_worker.start(cleanup_sst_runner)); + let (tx, rx) = mpsc::channel(); let apply_runner = ApplyRunner::new(self, tx, self.cfg.sync_log, self.cfg.use_delete_range); self.apply_res_receiver = Some(rx); @@ -563,6 +575,7 @@ impl Store { handles.push(self.compact_worker.stop()); handles.push(self.pd_worker.stop()); handles.push(self.consistency_check_worker.stop()); + handles.push(self.cleanup_sst_worker.stop()); handles.push(self.apply_worker.stop()); for h in handles { @@ -1534,6 +1547,7 @@ impl Store { ExecResult::DeleteRange { .. } => { // TODO: clean user properties? } + ExecResult::IngestSST { ssts } => self.on_ingest_sst_result(ssts), } } } @@ -2394,6 +2408,19 @@ impl Store { ); } } + + fn on_ingest_sst_result(&mut self, ssts: Vec) { + for sst in ssts { + let region_id = sst.get_region_id(); + if let Some(region) = self.region_peers.get_mut(®ion_id) { + region.size_diff_hint += sst.get_length(); + } + let task = CleanupSSTTask::DeleteSST { sst }; + if let Err(e) = self.cleanup_sst_worker.schedule(task) { + error!("[region {}] schedule delete sst: {:?}", region_id, e); + } + } + } } fn new_admin_request(region_id: u64, peer: metapb::Peer) -> RaftCmdRequest { diff --git a/src/raftstore/store/worker/apply.rs b/src/raftstore/store/worker/apply.rs index 1dc0960bd1a..754105bcdf3 100644 --- a/src/raftstore/store/worker/apply.rs +++ b/src/raftstore/store/worker/apply.rs @@ -23,18 +23,20 @@ use std::collections::VecDeque; use rocksdb::{Writable, WriteBatch, DB}; use rocksdb::rocksdb_options::WriteOptions; use protobuf::RepeatedField; +use uuid::Uuid; use kvproto::metapb::{Peer as PeerMeta, Region}; use raft::eraftpb::{ConfChange, ConfChangeType, Entry, EntryType}; use kvproto::raft_serverpb::{PeerState, RaftApplyState, RaftTruncatedState}; use kvproto::raft_cmdpb::{AdminCmdType, AdminRequest, AdminResponse, ChangePeerRequest, CmdType, RaftCmdRequest, RaftCmdResponse, Request, Response}; +use kvproto::importpb::SSTMeta; use util::worker::Runnable; use util::{escape, rocksdb, MustConsumeVec}; use util::time::{duration_to_sec, Instant, SlowTimer}; use util::collections::HashMap; -use storage::{ALL_CFS, CF_DEFAULT, CF_LOCK, CF_RAFT}; +use storage::{ALL_CFS, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE}; use raftstore::{Error, Result}; use raftstore::coprocessor::CoprocessorHost; use raftstore::store::{cmd_resp, keys, util, Store}; @@ -44,6 +46,7 @@ use raftstore::store::peer_storage::{self, compact_raft_log, write_initial_apply write_peer_state}; use raftstore::store::peer::{check_epoch, Peer}; use raftstore::store::metrics::*; +use import::SSTImporter; use super::metrics::*; @@ -170,6 +173,9 @@ pub enum ExecResult { DeleteRange { ranges: Vec, }, + IngestSST { + ssts: Vec, + }, } struct ApplyCallback { @@ -205,6 +211,7 @@ struct Stash { struct ApplyContextCore<'a> { host: &'a CoprocessorHost, + importer: &'a SSTImporter, wb: Option, cbs: MustConsumeVec, apply_res: Vec, @@ -222,9 +229,10 @@ struct ApplyContextCore<'a> { } impl<'a> ApplyContextCore<'a> { - pub fn new(host: &CoprocessorHost) -> ApplyContextCore { + pub fn new(host: &'a CoprocessorHost, importer: &'a SSTImporter) -> ApplyContextCore<'a> { ApplyContextCore { host: host, + importer: importer, wb: None, cbs: MustConsumeVec::new("callback of apply context"), apply_res: vec![], @@ -436,12 +444,15 @@ fn should_write_to_engine(cmd: &RaftCmdRequest, wb_keys: usize) -> bool { return true; } - // When encounter DeleteRange command, we must write current write batch to engine first, - // because current write batch may contains keys are covered by DeleteRange. + // Some commands may modify keys covered by the current write batch, so we + // must write the current write batch to the engine first. for req in cmd.get_requests() { if req.has_delete_range() { return true; } + if req.has_ingest_sst() { + return true; + } } false @@ -727,7 +738,8 @@ impl ApplyDelegate { ExecResult::ComputeHash { .. } | ExecResult::VerifyHash { .. } | ExecResult::CompactLog { .. } - | ExecResult::DeleteRange { .. } => {} + | ExecResult::DeleteRange { .. } + | ExecResult::IngestSST { .. } => {} ExecResult::SplitRegion { ref left, ref right, @@ -1152,6 +1164,7 @@ impl ApplyDelegate { let mut responses = Vec::with_capacity(requests.len()); let mut ranges = vec![]; + let mut ssts = vec![]; for req in requests { let cmd_type = req.get_cmd_type(); let mut resp = match cmd_type { @@ -1160,6 +1173,7 @@ impl ApplyDelegate { CmdType::DeleteRange => { self.handle_delete_range(req, &mut ranges, ctx.use_delete_range) } + CmdType::IngestSST => self.handle_ingest_sst(ctx, req, &mut ssts), // Readonly commands are handled in raftstore directly. // Don't panic here in case there are old entries need to be applied. // It's also safe to skip them here, because a restart must have happened, @@ -1189,10 +1203,13 @@ impl ApplyDelegate { resp.mut_header().set_uuid(uuid); resp.set_responses(RepeatedField::from_vec(responses)); - let exec_res = if ranges.is_empty() { - None - } else { + assert!(ranges.is_empty() || ssts.is_empty()); + let exec_res = if !ranges.is_empty() { Some(ExecResult::DeleteRange { ranges: ranges }) + } else if !ssts.is_empty() { + Some(ExecResult::IngestSST { ssts: ssts }) + } else { + None }; Ok((resp, exec_res)) @@ -1337,6 +1354,31 @@ impl ApplyDelegate { Ok(resp) } + + fn handle_ingest_sst( + &mut self, + ctx: &ApplyContext, + req: &Request, + ssts: &mut Vec, + ) -> Result { + let sst = req.get_ingest_sst().get_sst(); + + if let Err(e) = check_sst_for_ingestion(sst, &self.region) { + error!("ingest {:?} to region {:?}: {:?}", sst, self.region, e); + // This file is not valid, we can delete it here. + let _ = ctx.importer.delete(sst); + return Err(e); + } + + ctx.importer.ingest(sst, &self.engine).unwrap_or_else(|e| { + // If this failed, it means that the file is corrupted or something + // is wrong with the engine, but we can do nothing about that. + panic!("{} ingest {:?}: {:?}", self.tag, sst, e); + }); + + ssts.push(sst.clone()); + Ok(Response::new()) + } } pub fn get_change_peer_cmd(msg: &RaftCmdRequest) -> Option<&ChangePeerRequest> { @@ -1358,6 +1400,38 @@ fn check_data_key(key: &[u8], region: &Region) -> Result<()> { Ok(()) } +fn check_sst_for_ingestion(sst: &SSTMeta, region: &Region) -> Result<()> { + let uuid = sst.get_uuid(); + if let Err(e) = Uuid::from_bytes(uuid) { + return Err(box_err!("invalid uuid {:?}: {:?}", uuid, e)); + } + + let cf_name = sst.get_cf_name(); + if cf_name != CF_DEFAULT && cf_name != CF_WRITE { + return Err(box_err!("invalid cf name {}", cf_name)); + } + + let region_id = sst.get_region_id(); + if region_id != region.get_id() { + return Err(Error::RegionNotFound(region_id)); + } + + let epoch = sst.get_region_epoch(); + let region_epoch = region.get_region_epoch(); + if epoch.get_conf_ver() != region_epoch.get_conf_ver() + || epoch.get_version() != region_epoch.get_version() + { + let error = format!("{:?} != {:?}", epoch, region_epoch); + return Err(Error::StaleEpoch(error, vec![region.clone()])); + } + + let range = sst.get_range(); + util::check_key_in_region(range.get_start(), region)?; + util::check_key_in_region(range.get_end(), region)?; + + Ok(()) +} + pub fn do_get(tag: &str, region: &Region, snap: &Snapshot, req: &Request) -> Result { // TODO: the get_get looks wried, maybe we should figure out a better name later. let key = req.get_get().get_key(); @@ -1580,6 +1654,7 @@ pub enum TaskRes { pub struct Runner { db: Arc, host: Arc, + importer: Arc, delegates: HashMap>, notifier: Sender, sync_log: bool, @@ -1602,6 +1677,7 @@ impl Runner { Runner { db: store.kv_engine(), host: Arc::clone(&store.coprocessor_host), + importer: Arc::clone(&store.importer), delegates: delegates, notifier: notifier, sync_log: sync_log, @@ -1613,7 +1689,7 @@ impl Runner { fn handle_applies(&mut self, applys: Vec) { let t = SlowTimer::new(); - let mut core = ApplyContextCore::new(self.host.as_ref()) + let mut core = ApplyContextCore::new(self.host.as_ref(), self.importer.as_ref()) .apply_res_capacity(applys.len()) .use_delete_range(self.use_delete_range) .enable_sync_log(self.sync_log); @@ -1763,14 +1839,13 @@ mod tests { use rocksdb::{Writable, WriteBatch, DB}; use protobuf::Message; use kvproto::metapb::RegionEpoch; - use kvproto::raft_cmdpb::CmdType; - + use kvproto::raft_cmdpb::*; use raftstore::coprocessor::*; use raftstore::store::msg::WriteResponse; - use storage::{ALL_CFS, CF_WRITE}; - use util::collections::HashMap; use super::*; + use import::test_helpers::*; + use util::collections::HashMap; pub fn create_tmp_engine(path: &str) -> (TempDir, Arc) { let path = TempDir::new(path).unwrap(); @@ -1779,10 +1854,22 @@ mod tests { (path, db) } - fn new_runner(db: Arc, host: Arc, tx: Sender) -> Runner { + pub fn create_tmp_importer(path: &str) -> (TempDir, Arc) { + let dir = TempDir::new(path).unwrap(); + let importer = Arc::new(SSTImporter::new(dir.path()).unwrap()); + (dir, importer) + } + + fn new_runner( + db: Arc, + host: Arc, + importer: Arc, + tx: Sender, + ) -> Runner { Runner { db: db, host: host, + importer: importer, delegates: HashMap::default(), notifier: tx, sync_log: false, @@ -1808,6 +1895,15 @@ mod tests { let wb = WriteBatch::new(); assert_eq!(should_write_to_engine(&req, wb.count()), true); + // IngestSST command + let mut req = Request::new(); + req.set_cmd_type(CmdType::IngestSST); + req.set_ingest_sst(IngestSSTRequest::new()); + let mut cmd = RaftCmdRequest::new(); + cmd.mut_requests().push(req); + let wb = WriteBatch::new(); + assert_eq!(should_write_to_engine(&cmd, wb.count()), true); + // Write batch keys reach WRITE_BATCH_MAX_KEYS let req = RaftCmdRequest::new(); let wb = WriteBatch::new(); @@ -1832,7 +1928,13 @@ mod tests { let (tx, rx) = mpsc::channel(); let (_tmp, db) = create_tmp_engine("apply-basic"); let host = Arc::new(CoprocessorHost::default()); - let mut runner = new_runner(Arc::clone(&db), host, tx); + let (_dir, importer) = create_tmp_importer("apply-basic"); + let mut runner = new_runner( + Arc::clone(&db), + Arc::clone(&host), + Arc::clone(&importer), + tx, + ); let mut reg = Registration::default(); reg.id = 1; @@ -2068,6 +2170,14 @@ mod tests { self } + fn ingest_sst(mut self, meta: &SSTMeta) -> EntryBuilder { + let mut cmd = Request::new(); + cmd.set_cmd_type(CmdType::IngestSST); + cmd.mut_ingest_sst().set_sst(meta.clone()); + self.req.mut_requests().push(cmd); + self + } + fn build(mut self) -> Entry { self.entry.set_data(self.req.write_to_bytes().unwrap()); self.entry @@ -2097,6 +2207,7 @@ mod tests { #[test] fn test_handle_raft_committed_entries() { let (_path, db) = create_tmp_engine("test-delegate"); + let (import_dir, importer) = create_tmp_importer("test-delegate"); let mut reg = Registration::default(); reg.region.set_end_key(b"k5".to_vec()); reg.region.mut_region_epoch().set_version(3); @@ -2115,7 +2226,7 @@ mod tests { let obs = ApplyObserver::default(); host.registry .register_query_observer(1, Box::new(obs.clone())); - let mut core = ApplyContextCore::new(&host).use_delete_range(true); + let mut core = ApplyContextCore::new(&host, &importer).use_delete_range(true); let mut apply_ctx = ApplyContext::new(&mut core, &mut delegates); delegate.handle_raft_committed_entries(&mut apply_ctx, vec![put_entry]); apply_ctx.write_to_db(&db); @@ -2245,9 +2356,54 @@ mod tests { assert!(db.get(&dk_k2).unwrap().is_none()); assert!(db.get(&dk_k3).unwrap().is_none()); + // UploadSST + let sst_path = import_dir.path().join("test.sst"); + let sst_epoch = delegate.region.get_region_epoch().clone(); + let sst_range = (0, 100); + let (mut meta1, data1) = gen_sst_file(&sst_path, sst_range); + meta1.set_region_epoch(sst_epoch); + importer.create(1, &meta1).unwrap(); + importer.append(1, &data1).unwrap(); + importer.finish(1).unwrap(); + let (mut meta2, data2) = gen_sst_file(&sst_path, sst_range); + meta2.mut_region_epoch().set_version(1234); + importer.create(2, &meta2).unwrap(); + importer.append(2, &data2).unwrap(); + importer.finish(2).unwrap(); + + // IngestSST + let put_ok = EntryBuilder::new(9, 3) + .capture_resp(&mut delegate, tx.clone()) + .put(&[sst_range.0], &[sst_range.1]) + .epoch(0, 3) + .build(); + // Add a put above to test flush before ingestion. + let ingest_ok = EntryBuilder::new(10, 3) + .capture_resp(&mut delegate, tx.clone()) + .ingest_sst(&meta1) + .epoch(0, 3) + .build(); + let ingest_stale_epoch = EntryBuilder::new(11, 3) + .capture_resp(&mut delegate, tx.clone()) + .ingest_sst(&meta2) + .epoch(0, 3) + .build(); + let entries = vec![put_ok, ingest_ok, ingest_stale_epoch]; + delegate.handle_raft_committed_entries(&mut apply_ctx, entries); + apply_ctx.write_to_db(&db); + let resp = rx.try_recv().unwrap(); + assert!(!resp.get_header().has_error(), "{:?}", resp); + let resp = rx.try_recv().unwrap(); + assert!(!resp.get_header().has_error(), "{:?}", resp); + check_db_range(&db, sst_range); + let resp = rx.try_recv().unwrap(); + assert!(resp.get_header().has_error()); + assert_eq!(delegate.applied_index_term, 3); + assert_eq!(delegate.apply_state.get_applied_index(), 11); + let mut entries = vec![]; for i in 0..WRITE_BATCH_MAX_KEYS { - let put_entry = EntryBuilder::new(i as u64 + 9, 2) + let put_entry = EntryBuilder::new(i as u64 + 12, 3) .put(b"k", b"v") .epoch(1, 3) .capture_resp(&mut delegate, tx.clone()) @@ -2259,24 +2415,60 @@ mod tests { for _ in 0..WRITE_BATCH_MAX_KEYS { rx.try_recv().unwrap(); } - assert_eq!( - delegate.apply_state.get_applied_index(), - WRITE_BATCH_MAX_KEYS as u64 + 8 - ); + let index = WRITE_BATCH_MAX_KEYS + 11; + assert_eq!(delegate.apply_state.get_applied_index(), index as u64); + assert_eq!(obs.pre_query_count.load(Ordering::SeqCst), index); + assert_eq!(obs.post_query_count.load(Ordering::SeqCst), index); + } - assert_eq!( - obs.pre_query_count.load(Ordering::SeqCst), - 8 + WRITE_BATCH_MAX_KEYS - ); - assert_eq!( - obs.post_query_count.load(Ordering::SeqCst), - 8 + WRITE_BATCH_MAX_KEYS - ); + #[test] + fn test_check_sst_for_ingestion() { + let mut sst = SSTMeta::new(); + let mut region = Region::new(); + + // Check uuid and cf name + assert!(check_sst_for_ingestion(&sst, ®ion).is_err()); + sst.set_uuid(Uuid::new_v4().as_bytes().to_vec()); + sst.set_cf_name(CF_DEFAULT.to_owned()); + check_sst_for_ingestion(&sst, ®ion).unwrap(); + sst.set_cf_name("test".to_owned()); + assert!(check_sst_for_ingestion(&sst, ®ion).is_err()); + sst.set_cf_name(CF_WRITE.to_owned()); + check_sst_for_ingestion(&sst, ®ion).unwrap(); + + // Check region id + region.set_id(1); + sst.set_region_id(2); + assert!(check_sst_for_ingestion(&sst, ®ion).is_err()); + sst.set_region_id(1); + check_sst_for_ingestion(&sst, ®ion).unwrap(); + + // Check region epoch + region.mut_region_epoch().set_conf_ver(1); + assert!(check_sst_for_ingestion(&sst, ®ion).is_err()); + sst.mut_region_epoch().set_conf_ver(1); + check_sst_for_ingestion(&sst, ®ion).unwrap(); + region.mut_region_epoch().set_version(1); + assert!(check_sst_for_ingestion(&sst, ®ion).is_err()); + sst.mut_region_epoch().set_version(1); + check_sst_for_ingestion(&sst, ®ion).unwrap(); + + // Check region range + region.set_start_key(vec![2]); + region.set_end_key(vec![8]); + sst.mut_range().set_start(vec![1]); + sst.mut_range().set_end(vec![8]); + assert!(check_sst_for_ingestion(&sst, ®ion).is_err()); + sst.mut_range().set_start(vec![2]); + assert!(check_sst_for_ingestion(&sst, ®ion).is_err()); + sst.mut_range().set_end(vec![7]); + check_sst_for_ingestion(&sst, ®ion).unwrap(); } #[test] fn test_stash() { let (_path, db) = create_tmp_engine("test-delegate"); + let (_import_dir, importer) = create_tmp_importer("test-delegate"); let mut reg = Registration::default(); reg.region.set_end_key(b"k5".to_vec()); reg.region.mut_region_epoch().set_version(3); @@ -2289,7 +2481,7 @@ mod tests { delegate2.apply_state.set_applied_index(1); let host = CoprocessorHost::default(); - let mut core = ApplyContextCore::new(&host); + let mut core = ApplyContextCore::new(&host, &importer); let (tx, rx) = mpsc::channel(); core.prepare_for(&delegate1); assert_eq!(core.last_applied_index, 3); diff --git a/src/raftstore/store/worker/cleanup_sst.rs b/src/raftstore/store/worker/cleanup_sst.rs new file mode 100644 index 00000000000..1835af3a789 --- /dev/null +++ b/src/raftstore/store/worker/cleanup_sst.rs @@ -0,0 +1,60 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt; +use std::sync::Arc; + +use uuid::Uuid; +use kvproto::importpb::SSTMeta; + +use import::SSTImporter; +use util::worker::Runnable; + +pub enum Task { + DeleteSST { sst: SSTMeta }, +} + +impl fmt::Display for Task { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + Task::DeleteSST { ref sst } => match Uuid::from_bytes(sst.get_uuid()) { + Ok(uuid) => write!(f, "Delete SST {}", uuid), + Err(e) => write!(f, "Delete SST {:?}", e), + }, + } + } +} + +pub struct Runner { + importer: Arc, +} + +impl Runner { + pub fn new(importer: Arc) -> Runner { + Runner { importer: importer } + } + + fn handle_delete_sst(&self, sst: SSTMeta) { + let _ = self.importer.delete(&sst); + } +} + +impl Runnable for Runner { + fn run(&mut self, task: Task) { + match task { + Task::DeleteSST { sst } => { + self.handle_delete_sst(sst); + } + } + } +} diff --git a/src/raftstore/store/worker/mod.rs b/src/raftstore/store/worker/mod.rs index fb6d73a161a..b6cfeffe570 100644 --- a/src/raftstore/store/worker/mod.rs +++ b/src/raftstore/store/worker/mod.rs @@ -50,6 +50,7 @@ mod compact; mod raftlog_gc; mod metrics; mod consistency_check; +mod cleanup_sst; pub mod apply; pub use self::region::{Runner as RegionRunner, Task as RegionTask}; @@ -57,5 +58,6 @@ pub use self::split_check::{Runner as SplitCheckRunner, Task as SplitCheckTask}; pub use self::compact::{Runner as CompactRunner, Task as CompactTask}; pub use self::raftlog_gc::{Runner as RaftlogGcRunner, Task as RaftlogGcTask}; pub use self::consistency_check::{Runner as ConsistencyCheckRunner, Task as ConsistencyCheckTask}; +pub use self::cleanup_sst::{Runner as CleanupSSTRunner, Task as CleanupSSTTask}; pub use self::apply::{Apply, ApplyMetrics, ApplyRes, Proposal, RegionProposal, Registration, Runner as ApplyRunner, Task as ApplyTask, TaskRes as ApplyTaskRes}; diff --git a/src/server/node.rs b/src/server/node.rs index f03430ca025..14bdc96b09a 100644 --- a/src/server/node.rs +++ b/src/server/node.rs @@ -29,6 +29,7 @@ use raftstore::coprocessor::dispatcher::CoprocessorHost; use raftstore::store::{self, keys, Config as StoreConfig, Engines, Msg, Peekable, SignificantMsg, SnapManager, Store, StoreChannel, Transport}; use super::Result; +use import::SSTImporter; use server::Config as ServerConfig; use storage::{self, Config as StorageConfig, RaftKv, Storage}; use super::transport::RaftStoreRouter; @@ -133,6 +134,7 @@ where significant_msg_receiver: Receiver, pd_worker: FutureWorker, coprocessor_host: CoprocessorHost, + importer: Arc, ) -> Result<()> where T: Transport + 'static, @@ -172,6 +174,7 @@ where significant_msg_receiver, pd_worker, coprocessor_host, + importer, )?; Ok(()) } @@ -323,6 +326,7 @@ where significant_msg_receiver: Receiver, pd_worker: FutureWorker, coprocessor_host: CoprocessorHost, + importer: Arc, ) -> Result<()> where T: Transport + 'static, @@ -355,6 +359,7 @@ where snap_mgr, pd_worker, coprocessor_host, + importer, ) { Err(e) => panic!("construct store {} err {:?}", store_id, e), Ok(s) => s, diff --git a/src/server/server.rs b/src/server/server.rs index 1594abbb376..c2571787f81 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -68,7 +68,7 @@ impl Server { snap_mgr: SnapManager, pd_scheduler: FutureScheduler, debug_engines: Option, - import_service: Option, + import_service: Option>, ) -> Result> { let env = Arc::new( EnvBuilder::new() diff --git a/tests/import/sst_service.rs b/tests/import/sst_service.rs index d7de107ac36..c13ffb69b5c 100644 --- a/tests/import/sst_service.rs +++ b/tests/import/sst_service.rs @@ -15,8 +15,10 @@ use std::sync::Arc; use uuid::Uuid; use futures::{stream, Future, Stream}; +use tempdir::TempDir; use kvproto::kvrpcpb::*; +use kvproto::tikvpb_grpc::*; use kvproto::importpb::*; use kvproto::importpb_grpc::*; use grpc::{ChannelBuilder, Environment, Result, WriteFlags}; @@ -43,7 +45,8 @@ fn new_cluster() -> (Cluster, Context) { (cluster, ctx) } -fn new_cluster_and_import_client() -> (Cluster, ImportSstClient) { +fn new_cluster_and_tikv_import_client( +) -> (Cluster, Context, TikvClient, ImportSstClient) { let (cluster, ctx) = new_cluster(); let ch = { @@ -51,14 +54,15 @@ fn new_cluster_and_import_client() -> (Cluster, ImportSstClient) let node = ctx.get_peer().get_store_id(); ChannelBuilder::new(env).connect(cluster.sim.rl().get_addr(node)) }; - let import = ImportSstClient::new(ch); + let tikv = TikvClient::new(ch.clone()); + let import = ImportSstClient::new(ch.clone()); - (cluster, import) + (cluster, ctx, tikv, import) } #[test] fn test_upload_sst() { - let (_cluster, import) = new_cluster_and_import_client(); + let (_cluster, _, _, import) = new_cluster_and_tikv_import_client(); let data = vec![1; 1024]; let crc32 = calc_data_crc32(&data); @@ -85,6 +89,55 @@ fn test_upload_sst() { assert!(send_upload_sst(&import, &upload).is_err()); } +#[test] +fn test_ingest_sst() { + let (_cluster, ctx, tikv, import) = new_cluster_and_tikv_import_client(); + + let temp_dir = TempDir::new("test_ingest_sst").unwrap(); + + let sst_path = temp_dir.path().join("test.sst"); + let sst_range = (0, 100); + let (mut meta, data) = gen_sst_file(sst_path, sst_range); + + // No region id and epoch. + let mut upload = UploadRequest::new(); + upload.set_meta(meta.clone()); + upload.set_data(data.clone()); + send_upload_sst(&import, &upload).unwrap(); + + let mut ingest = IngestRequest::new(); + ingest.set_context(ctx.clone()); + ingest.set_sst(meta.clone()); + let resp = import.ingest(&ingest).unwrap(); + assert!(resp.has_error()); + + // Set region id and epoch. + meta.set_region_id(ctx.get_region_id()); + meta.set_region_epoch(ctx.get_region_epoch().clone()); + upload.set_meta(meta.clone()); + send_upload_sst(&import, &upload).unwrap(); + // Cann't upload the same file again. + assert!(send_upload_sst(&import, &upload).is_err()); + + ingest.set_sst(meta.clone()); + let resp = import.ingest(&ingest).unwrap(); + assert!(!resp.has_error()); + + // Check ingested kvs + for i in sst_range.0..sst_range.1 { + let mut m = RawGetRequest::new(); + m.set_context(ctx.clone()); + m.set_key(vec![i]); + let resp = tikv.raw_get(&m).unwrap(); + assert!(resp.get_error().is_empty()); + assert!(!resp.has_region_error()); + assert_eq!(resp.get_value(), &[i]); + } + + // Upload the same file again to check if the ingested file has been deleted. + send_upload_sst(&import, &upload).unwrap(); +} + fn new_sst_meta(crc32: u32, length: u64) -> SSTMeta { let mut m = SSTMeta::new(); m.set_uuid(Uuid::new_v4().as_bytes().to_vec()); diff --git a/tests/raftstore/node.rs b/tests/raftstore/node.rs index 31415d27156..aef5430307a 100644 --- a/tests/raftstore/node.rs +++ b/tests/raftstore/node.rs @@ -14,6 +14,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::{mpsc, Arc, RwLock}; use std::ops::Deref; +use std::path::Path; use tempdir::TempDir; @@ -32,6 +33,7 @@ use tikv::util::worker::FutureWorker; use tikv::util::transport::SendCh; use tikv::server::transport::{RaftStoreRouter, ServerRaftStoreRouter}; use raft::SnapshotStatus; +use tikv::import::SSTImporter; use super::pd::TestPdClient; use super::transport_simulate::*; use super::util::create_test_engine; @@ -187,6 +189,11 @@ impl Simulator for NodeCluster { // Create coprocessor. let coprocessor_host = CoprocessorHost::new(cfg.coprocessor, node.get_sendch()); + let importer = { + let dir = Path::new(engines.kv_engine.path()).join("import-sst"); + Arc::new(SSTImporter::new(dir).unwrap()) + }; + node.start( event_loop, engines.clone(), @@ -195,6 +202,7 @@ impl Simulator for NodeCluster { snap_status_receiver, pd_worker, coprocessor_host, + importer, ).unwrap(); assert!( Arc::clone(&engines.kv_engine) diff --git a/tests/raftstore/server.rs b/tests/raftstore/server.rs index 8c7a62c33a5..ab3038471d5 100644 --- a/tests/raftstore/server.rs +++ b/tests/raftstore/server.rs @@ -13,6 +13,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::{mpsc, Arc, RwLock}; +use std::path::Path; use grpc::EnvBuilder; use tempdir::TempDir; @@ -130,10 +131,14 @@ impl Simulator for ServerCluster { // Create import service. let importer = { - let dir = TempDir::new("test-import-sst").unwrap().into_path(); + let dir = Path::new(engines.kv_engine.path()).join("import-sst"); Arc::new(SSTImporter::new(dir).unwrap()) }; - let import_service = ImportSSTService::new(cfg.import.clone(), store.clone(), importer); + let import_service = ImportSSTService::new( + cfg.import.clone(), + sim_router.clone(), + Arc::clone(&importer), + ); // Create pd client, snapshot manager, server. let (worker, resolver) = resolve::new_resolver(Arc::clone(&self.pd_client)).unwrap(); @@ -178,6 +183,7 @@ impl Simulator for ServerCluster { snap_status_receiver, pd_worker, coprocessor_host, + importer, ).unwrap(); assert!(node_id == 0 || node_id == node.id()); let node_id = node.id(); diff --git a/tests/raftstore/util.rs b/tests/raftstore/util.rs index 9d47a8d4428..b9f70c5a4ff 100644 --- a/tests/raftstore/util.rs +++ b/tests/raftstore/util.rs @@ -327,7 +327,9 @@ pub fn make_cb(cmd: &RaftCmdRequest) -> (Callback, mpsc::Receiver is_read = true, - CmdType::Put | CmdType::Delete | CmdType::DeleteRange => is_write = true, + CmdType::Put | CmdType::Delete | CmdType::DeleteRange | CmdType::IngestSST => { + is_write = true + } CmdType::Invalid | CmdType::Prewrite => panic!("Invalid RaftCmdRequest: {:?}", cmd), } } diff --git a/tests/raftstore_cases/test_bootstrap.rs b/tests/raftstore_cases/test_bootstrap.rs index 1385fff5033..7e14b7a67ac 100644 --- a/tests/raftstore_cases/test_bootstrap.rs +++ b/tests/raftstore_cases/test_bootstrap.rs @@ -13,6 +13,7 @@ use std::sync::{mpsc, Arc}; use std::path::Path; +use tikv::import::SSTImporter; use tikv::raftstore::store::{bootstrap_store, create_event_loop, keys, Engines, Peekable, SnapManager}; use tikv::server::Node; @@ -95,6 +96,11 @@ fn test_node_bootstrap_with_prepared_data() { // Create coprocessor. let coprocessor_host = CoprocessorHost::new(cfg.coprocessor, node.get_sendch()); + let importer = { + let dir = tmp_path.path().join("import-sst"); + Arc::new(SSTImporter::new(dir).unwrap()) + }; + // try to restart this node, will clear the prepare data node.start( event_loop, @@ -104,6 +110,7 @@ fn test_node_bootstrap_with_prepared_data() { snapshot_status_receiver, pd_worker, coprocessor_host, + importer, ).unwrap(); assert!( Arc::clone(&engine) From f91b6866eb46acd2770f812858bf7b5c42bd08d9 Mon Sep 17 00:00:00 2001 From: Huachao Huang Date: Thu, 15 Mar 2018 01:37:40 +0800 Subject: [PATCH 2/2] Update kvproto --- Cargo.lock | 2 +- Cargo.toml | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d343f4af094..e67663cd516 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -422,7 +422,7 @@ dependencies = [ [[package]] name = "kvproto" version = "0.0.1" -source = "git+https://github.com/pingcap/kvproto.git#7839cb7338370fbe4b496b888872533943d744d1" +source = "git+https://github.com/pingcap/kvproto.git#5d41f201048b3fecbf953f267d64a739398e0b13" dependencies = [ "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "grpcio 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 2036c33d683..0a8614d51b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,8 +79,7 @@ signal = "0.4" git = "https://github.com/pingcap/rust-rocksdb.git" [dependencies.kvproto] -git = "https://github.com/huachaohuang/kvproto.git" -branch = "ingest-sst" +git = "https://github.com/pingcap/kvproto.git" [dependencies.tipb] git = "https://github.com/pingcap/tipb.git"