From 30ac9132a08c55ca072a572a440f305720be4c61 Mon Sep 17 00:00:00 2001 From: atmzhou Date: Mon, 23 Oct 2017 16:40:27 +0800 Subject: [PATCH 01/13] scheduler: batch resolve lock. --- Cargo.lock | 4 +- src/server/service/kv.rs | 19 ++++-- src/storage/mod.rs | 36 ++++------ src/storage/mvcc/txn.rs | 2 + src/storage/txn/scheduler.rs | 116 ++++++++++++++++++-------------- tests/raftstore/test_service.rs | 9 ++- tests/storage/sync_storage.rs | 9 ++- 7 files changed, 110 insertions(+), 85 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 63de5322185..76d7ff27082 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ [root] name = "tikv" -version = "0.9.0" +version = "1.0.0" dependencies = [ "backtrace 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -415,7 +415,7 @@ dependencies = [ [[package]] name = "kvproto" version = "0.0.1" -source = "git+https://github.com/pingcap/kvproto.git#146f02770efa42296666ca2e6974c6cb48fecdcf" +source = "git+https://github.com/pingcap/kvproto.git#7a1f6ad948f19de9d8e829f10f3efffc978a01d2" dependencies = [ "futures 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)", "grpcio 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/src/server/service/kv.rs b/src/server/service/kv.rs index 4de5ce032c3..3eaf8261fd6 100644 --- a/src/server/service/kv.rs +++ b/src/server/service/kv.rs @@ -16,6 +16,7 @@ use std::fmt::Debug; use std::io::Write; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::collections::HashMap; use mio::Token; use grpc::{ClientStreamingSink, RequestStream, RpcContext, RpcStatus, RpcStatusCode, UnarySink}; use futures::{future, Future, Stream}; @@ -475,14 +476,21 @@ impl tikvpb_grpc::Tikv for Service { .with_label_values(&[label]) .start_coarse_timer(); - let commit_ts = match req.get_commit_version() { - 0 => None, - x => Some(x), - }; + let mut txn_2_status: HashMap = HashMap::new(); + + let start_ts = req.get_start_version(); + if start_ts > 0 { + let commit_ts = req.get_commit_version(); + txn_2_status.insert(start_ts, commit_ts); + } else { + for temp in req.take_txn_infos().into_iter() { + txn_2_status.insert(temp.txn, temp.status); + } + } let (cb, future) = make_callback(); let res = self.storage - .async_resolve_lock(req.take_context(), req.get_start_version(), commit_ts, cb); + .async_resolve_lock(req.take_context(), txn_2_status, cb); if let Err(e) = res { self.send_fail_status(ctx, sink, Error::from(e), RpcStatusCode::ResourceExhausted); return; @@ -507,6 +515,7 @@ impl tikvpb_grpc::Tikv for Service { }); ctx.spawn(future); + } fn kv_gc(&self, ctx: RpcContext, mut req: GCRequest, sink: UnarySink) { diff --git a/src/storage/mod.rs b/src/storage/mod.rs index ee37859d4d6..01f4e656ad4 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -22,6 +22,7 @@ use std::u64; use kvproto::kvrpcpb::{CommandPri, LockInfo}; use kvproto::errorpb; use self::metrics::*; +use std::collections::HashMap; pub mod engine; pub mod mvcc; @@ -35,6 +36,7 @@ pub use self::engine::{new_local_engine, CFStatistics, Cursor, Engine, Error as FlowStatistics, Modify, ScanMode, Snapshot, Statistics, StatisticsSummary, TEMP_DIR}; pub use self::engine::raftkv::RaftKv; +use self::mvcc::Lock; pub use self::txn::{Msg, Scheduler, SnapshotStore, StoreScanner}; pub use self::types::{make_key, Key, KvPair, MvccInfo, Value}; pub type Callback = Box) + Send>; @@ -131,10 +133,9 @@ pub enum Command { ScanLock { ctx: Context, max_ts: u64 }, ResolveLock { ctx: Context, - start_ts: u64, - commit_ts: Option, + txn2status: HashMap, scan_key: Option, - keys: Vec, + key_locks: Vec<(Key, Lock)>, }, Gc { ctx: Context, @@ -241,18 +242,7 @@ impl Display for Command { Command::ScanLock { ref ctx, max_ts, .. } => write!(f, "kv::scan_lock {} | {:?}", max_ts, ctx), - Command::ResolveLock { - ref ctx, - start_ts, - commit_ts, - .. - } => write!( - f, - "kv::resolve_txn {} -> {:?} | {:?}", - start_ts, - commit_ts, - ctx - ), + Command::ResolveLock { .. } => write!(f, "kv::resolve_lock"), Command::Gc { ref ctx, safe_point, @@ -328,7 +318,7 @@ impl Command { Command::Pause { .. } | Command::MvccByKey { .. } | Command::MvccByStartTs { .. } => true, - Command::ResolveLock { ref keys, .. } | + Command::ResolveLock { ref key_locks, .. } => key_locks.is_empty(), Command::Gc { ref keys, .. } => keys.is_empty(), _ => false, } @@ -379,11 +369,11 @@ impl Command { Command::Prewrite { start_ts, .. } | Command::Cleanup { start_ts, .. } | Command::Rollback { start_ts, .. } | - Command::ResolveLock { start_ts, .. } | Command::MvccByStartTs { start_ts, .. } => start_ts, Command::Commit { lock_ts, .. } => lock_ts, Command::ScanLock { max_ts, .. } => max_ts, Command::Gc { safe_point, .. } => safe_point, + Command::ResolveLock { .. } | Command::RawGet { .. } | Command::RawScan { .. } | Command::DeleteRange { .. } | @@ -449,8 +439,8 @@ impl Command { Command::Gc { ref keys, .. } | Command::BatchGet { ref keys, .. } | Command::Commit { ref keys, .. } | - Command::Rollback { ref keys, .. } | - Command::ResolveLock { ref keys, .. } => keys.len(), + Command::Rollback { ref keys, .. } => keys.len(), + Command::ResolveLock { ref key_locks, .. } => key_locks.len(), Command::Prewrite { ref mutations, .. } => mutations.len(), } } @@ -768,16 +758,14 @@ impl Storage { pub fn async_resolve_lock( &self, ctx: Context, - start_ts: u64, - commit_ts: Option, + txn2status: HashMap, callback: Callback<()>, ) -> Result<()> { let cmd = Command::ResolveLock { ctx: ctx, - start_ts: start_ts, - commit_ts: commit_ts, + txn2status: txn2status, scan_key: None, - keys: vec![], + key_locks: vec![], }; let tag = cmd.tag(); self.send(cmd, StorageCb::Boolean(callback))?; diff --git a/src/storage/mvcc/txn.rs b/src/storage/mvcc/txn.rs index 42b13b5c877..eecab31abd1 100644 --- a/src/storage/mvcc/txn.rs +++ b/src/storage/mvcc/txn.rs @@ -188,9 +188,11 @@ impl<'a> MvccTxn<'a> { pub fn commit(&mut self, key: &Key, commit_ts: u64) -> Result<()> { let (lock_type, short_value) = match self.reader.load_lock(key)? { Some(ref mut lock) if lock.ts == self.start_ts => { + println!("commit branch 1"); (lock.lock_type, lock.short_value.take()) } _ => { + println!("commit branch 2"); return match self.reader.get_txn_commit_info(key, self.start_ts)? { Some((_, WriteType::Rollback)) | None => { MVCC_CONFLICT_COUNTER diff --git a/src/storage/txn/scheduler.rs b/src/storage/txn/scheduler.rs index 11e37cc5468..54b5580d702 100644 --- a/src/storage/txn/scheduler.rs +++ b/src/storage/txn/scheduler.rs @@ -585,12 +585,9 @@ fn process_read( Err(e) => ProcessResult::Failed { err: e.into() }, } } - // Scan the locks with timestamp `start_ts`, then either commit them if the command has - // commit timestamp populated or rollback otherwise. Command::ResolveLock { ref ctx, - start_ts, - commit_ts, + ref txn2status, ref mut scan_key, .. } => { @@ -605,24 +602,23 @@ fn process_read( let res = reader .scan_lock( scan_key.take(), - |lock| lock.ts == start_ts, + |lock| txn2status.contains_key(&lock.ts), Some(RESOLVE_LOCK_BATCH_SIZE), ) .map_err(Error::from) .and_then(|(v, next_scan_key)| { - let keys: Vec = v.into_iter().map(|x| x.0).collect(); + let key_locks: Vec<_> = v.into_iter().map(|x| x).collect(); KV_COMMAND_KEYREAD_HISTOGRAM_VEC .with_label_values(&[tag]) - .observe(keys.len() as f64); - if keys.is_empty() { + .observe(key_locks.len() as f64); + if key_locks.is_empty() { Ok(None) } else { Ok(Some(Command::ResolveLock { ctx: ctx.clone(), - start_ts: start_ts, - commit_ts: commit_ts, + txn2status: txn2status.clone(), scan_key: next_scan_key, - keys: keys, + key_locks: key_locks, })) } }); @@ -884,52 +880,56 @@ fn process_write_impl( } Command::ResolveLock { ref ctx, - start_ts, - commit_ts, + ref txn2status, ref mut scan_key, - ref keys, + ref key_locks, } => { - if let Some(cts) = commit_ts { - if cts <= start_ts { - return Err(Error::InvalidTxnTso { - start_ts: start_ts, - commit_ts: cts, - }); - } - } let mut scan_key = scan_key.take(); - let mut txn = MvccTxn::new( - snapshot, - statistics, - start_ts, - None, - ctx.get_isolation_level(), - !ctx.get_not_fill_cache(), - ); - let rows = keys.len(); - for k in keys { - match commit_ts { - Some(ts) => txn.commit(k, ts)?, - None => txn.rollback(k)?, + let mut modifies: Vec = vec![]; + let rows = key_locks.len(); + for key_lock in key_locks { + let mut txn = MvccTxn::new( + snapshot, + statistics, + key_lock.1.ts, + None, + ctx.get_isolation_level(), + !ctx.get_not_fill_cache(), + ); + let status = txn2status.get(&(key_lock.1.ts)); + let ts = match status { + Some(ts) => *ts, + None => panic!("txn status not found!"), + }; + if ts > 0 { + if key_lock.1.ts >= ts { + return Err(Error::InvalidTxnTso { + start_ts: key_lock.1.ts, + commit_ts: ts, + }); + } + txn.commit(&key_lock.0, ts)?; + } else { + txn.rollback(&key_lock.0)?; } - if txn.write_size() >= MAX_TXN_WRITE_SIZE { - scan_key = Some(k.to_owned()); + modifies.append(&mut txn.modifies()); + if modifies.len() >= MAX_TXN_WRITE_SIZE { + scan_key = Some(key_lock.0.to_owned()); break; } } if scan_key.is_none() { - (ProcessResult::Res, txn.modifies(), rows) + (ProcessResult::Res, modifies, rows) } else { let pr = ProcessResult::NextCommand { cmd: Command::ResolveLock { ctx: ctx.clone(), - start_ts: start_ts, - commit_ts: commit_ts, + txn2status: txn2status.clone(), scan_key: scan_key.take(), - keys: vec![], + key_locks: vec![], }, }; - (pr, txn.modifies(), rows) + (pr, modifies, rows) } } Command::Gc { @@ -1371,10 +1371,12 @@ impl Scheduler { to_be_write: Vec, rows: usize, ) { + info!("we are going to write"); SCHED_STAGE_COUNTER_VEC .with_label_values(&[self.get_ctx_tag(cid), "write"]) .inc(); if to_be_write.is_empty() { + info!("modifies is empty"); return self.on_write_finished(cid, pr, Ok(())); } let engine_cb = make_engine_cb(cmd.tag(), cid, pr, self.schedch.clone(), rows); @@ -1394,6 +1396,7 @@ impl Scheduler { .with_label_values(&[self.get_ctx_tag(cid), "write_finish"]) .inc(); debug!("write finished for command, cid={}", cid); + info!("write finished for command, cid={}", cid); let mut ctx = self.remove_ctx(cid); let cb = ctx.callback.take().unwrap(); let pr = match result { @@ -1521,9 +1524,13 @@ pub fn gen_command_lock(latches: &Latches, cmd: &Command) -> Lock { let keys: Vec<&Key> = mutations.iter().map(|x| x.key()).collect(); latches.gen_lock(&keys) } - Command::Commit { ref keys, .. } | - Command::Rollback { ref keys, .. } | - Command::ResolveLock { ref keys, .. } => latches.gen_lock(keys), + Command::ResolveLock { ref key_locks, .. } => { + let keys: Vec<&Key> = key_locks.iter().map(|x| &x.0).collect(); + latches.gen_lock(&keys) + } + Command::Commit { ref keys, .. } | Command::Rollback { ref keys, .. } => { + latches.gen_lock(keys) + } Command::Cleanup { ref key, .. } => latches.gen_lock(&[key]), _ => Lock::new(vec![]), } @@ -1532,12 +1539,16 @@ pub fn gen_command_lock(latches: &Latches, cmd: &Command) -> Lock { #[cfg(test)] mod tests { use super::*; + use std::collections::HashMap; use kvproto::kvrpcpb::Context; use storage::txn::latch::*; use storage::{make_key, Command, Mutation, Options}; + use storage::mvcc; #[test] fn test_command_latches() { + let mut temp_map = HashMap::new(); + temp_map.insert(10, 20); let readonly_cmds = vec![ Command::Get { ctx: Context::new(), @@ -1562,10 +1573,9 @@ mod tests { }, Command::ResolveLock { ctx: Context::new(), - start_ts: 10, - commit_ts: Some(20), + txn2status: temp_map.clone(), scan_key: None, - keys: vec![], + key_locks: vec![], }, Command::Gc { ctx: Context::new(), @@ -1609,10 +1619,14 @@ mod tests { }, Command::ResolveLock { ctx: Context::new(), - start_ts: 10, - commit_ts: Some(20), + txn2status: temp_map.clone(), scan_key: None, - keys: vec![make_key(b"k")], + key_locks: vec![ + ( + make_key(b"k"), + mvcc::Lock::new(mvcc::LockType::Put, b"k".to_vec(), 10, 20, None), + ), + ], }, ]; diff --git a/tests/raftstore/test_service.rs b/tests/raftstore/test_service.rs index 171bba00156..62e64190a1c 100644 --- a/tests/raftstore/test_service.rs +++ b/tests/raftstore/test_service.rs @@ -332,6 +332,8 @@ fn test_mvcc_rollback_and_cleanup() { #[test] fn test_mvcc_resolve_lock_gc_and_delete() { + use kvproto::kvrpcpb::*; + use protobuf; let (_cluster, client, ctx) = must_new_cluster_and_kv_client(); let (k, v) = (b"key".to_vec(), b"value".to_vec()); @@ -388,9 +390,12 @@ fn test_mvcc_resolve_lock_gc_and_delete() { ts += 1; let resolve_lock_commit_version = ts; let mut resolve_lock_req = ResolveLockRequest::new(); + let mut temp_txninfo = TxnInfo::new(); + temp_txninfo.txn = prewrite_start_version2; + temp_txninfo.status = resolve_lock_commit_version; + let vec_txninfo = vec![temp_txninfo]; resolve_lock_req.set_context(ctx.clone()); - resolve_lock_req.start_version = prewrite_start_version2; - resolve_lock_req.commit_version = resolve_lock_commit_version; + resolve_lock_req.set_txn_infos(protobuf::RepeatedField::from_vec(vec_txninfo)); let resolve_lock_resp = client.kv_resolve_lock(resolve_lock_req).unwrap(); assert!(!resolve_lock_resp.has_region_error()); assert!(!resolve_lock_resp.has_error()); diff --git a/tests/storage/sync_storage.rs b/tests/storage/sync_storage.rs index 6fed8f9d00c..e53b129caeb 100644 --- a/tests/storage/sync_storage.rs +++ b/tests/storage/sync_storage.rs @@ -13,6 +13,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::collections::HashMap; use tikv::storage::{Engine, Key, KvPair, Mutation, Options, Result, Storage, Value}; use tikv::storage::config::Config; @@ -150,9 +151,15 @@ impl SyncStorage { } pub fn resolve_lock(&self, ctx: Context, start_ts: u64, commit_ts: Option) -> Result<()> { + let mut temp_map = HashMap::new(); + let temp = match commit_ts { + None => 0, + Some(x) => x, + }; + temp_map.insert(start_ts, temp); wait_op!(|cb| { self.store - .async_resolve_lock(ctx, start_ts, commit_ts, cb) + .async_resolve_lock(ctx, temp_map.clone(), cb) .unwrap() }).unwrap() } From 419473ef50634294b44c18b7af9a930f18aef160 Mon Sep 17 00:00:00 2001 From: atmzhou Date: Mon, 23 Oct 2017 16:51:58 +0800 Subject: [PATCH 02/13] clean code --- src/storage/txn/scheduler.rs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/storage/txn/scheduler.rs b/src/storage/txn/scheduler.rs index 54b5580d702..89ba1c6517d 100644 --- a/src/storage/txn/scheduler.rs +++ b/src/storage/txn/scheduler.rs @@ -887,34 +887,34 @@ fn process_write_impl( let mut scan_key = scan_key.take(); let mut modifies: Vec = vec![]; let rows = key_locks.len(); - for key_lock in key_locks { + for (current_key, current_lock) in key_locks { let mut txn = MvccTxn::new( snapshot, statistics, - key_lock.1.ts, + current_lock.ts, None, ctx.get_isolation_level(), !ctx.get_not_fill_cache(), ); - let status = txn2status.get(&(key_lock.1.ts)); + let status = txn2status.get(¤t_lock.ts)); let ts = match status { Some(ts) => *ts, None => panic!("txn status not found!"), }; if ts > 0 { - if key_lock.1.ts >= ts { + if current_lock.ts >= ts { return Err(Error::InvalidTxnTso { - start_ts: key_lock.1.ts, + start_ts: current_lock.ts, commit_ts: ts, }); } - txn.commit(&key_lock.0, ts)?; + txn.commit(¤t_key, ts)?; } else { - txn.rollback(&key_lock.0)?; + txn.rollback(¤t_key)?; } modifies.append(&mut txn.modifies()); if modifies.len() >= MAX_TXN_WRITE_SIZE { - scan_key = Some(key_lock.0.to_owned()); + scan_key = Some(current_key.to_owned()); break; } } @@ -1371,7 +1371,6 @@ impl Scheduler { to_be_write: Vec, rows: usize, ) { - info!("we are going to write"); SCHED_STAGE_COUNTER_VEC .with_label_values(&[self.get_ctx_tag(cid), "write"]) .inc(); @@ -1396,7 +1395,6 @@ impl Scheduler { .with_label_values(&[self.get_ctx_tag(cid), "write_finish"]) .inc(); debug!("write finished for command, cid={}", cid); - info!("write finished for command, cid={}", cid); let mut ctx = self.remove_ctx(cid); let cb = ctx.callback.take().unwrap(); let pr = match result { From 881d5931d52abb1b3ebb470277a145ea4a864a18 Mon Sep 17 00:00:00 2001 From: atmzhou Date: Mon, 23 Oct 2017 16:53:10 +0800 Subject: [PATCH 03/13] clean code --- src/server/service/kv.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/service/kv.rs b/src/server/service/kv.rs index 3eaf8261fd6..ca030f6a582 100644 --- a/src/server/service/kv.rs +++ b/src/server/service/kv.rs @@ -16,7 +16,6 @@ use std::fmt::Debug; use std::io::Write; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::collections::HashMap; use mio::Token; use grpc::{ClientStreamingSink, RequestStream, RpcContext, RpcStatus, RpcStatusCode, UnarySink}; use futures::{future, Future, Stream}; @@ -28,6 +27,7 @@ use kvproto::kvrpcpb::*; use kvproto::coprocessor::*; use kvproto::errorpb::{Error as RegionError, ServerIsBusy}; +use util::collections::HashMap; use util::worker::Scheduler; use util::buf::PipeBuffer; use storage::{self, Key, Mutation, Options, Storage, Value}; From bd2fec494470895befd36968f8025402819a976d Mon Sep 17 00:00:00 2001 From: atmzhou Date: Mon, 23 Oct 2017 17:40:24 +0800 Subject: [PATCH 04/13] clean code --- src/server/service/kv.rs | 2 +- src/storage/mod.rs | 2 +- src/storage/mvcc/txn.rs | 2 -- src/storage/txn/scheduler.rs | 10 +++++----- 4 files changed, 7 insertions(+), 9 deletions(-) diff --git a/src/server/service/kv.rs b/src/server/service/kv.rs index ca030f6a582..a7d15e1869c 100644 --- a/src/server/service/kv.rs +++ b/src/server/service/kv.rs @@ -12,6 +12,7 @@ // limitations under the License. use std::boxed::FnBox; +use std::collections::HashMap; use std::fmt::Debug; use std::io::Write; use std::sync::Arc; @@ -27,7 +28,6 @@ use kvproto::kvrpcpb::*; use kvproto::coprocessor::*; use kvproto::errorpb::{Error as RegionError, ServerIsBusy}; -use util::collections::HashMap; use util::worker::Scheduler; use util::buf::PipeBuffer; use storage::{self, Key, Mutation, Options, Storage, Value}; diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 01f4e656ad4..fd3abb01283 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -12,6 +12,7 @@ // limitations under the License. use std::thread; +use std::collections::HashMap; use std::boxed::FnBox; use std::fmt::{self, Debug, Display, Formatter}; use std::sync::mpsc::{self, Receiver}; @@ -22,7 +23,6 @@ use std::u64; use kvproto::kvrpcpb::{CommandPri, LockInfo}; use kvproto::errorpb; use self::metrics::*; -use std::collections::HashMap; pub mod engine; pub mod mvcc; diff --git a/src/storage/mvcc/txn.rs b/src/storage/mvcc/txn.rs index eecab31abd1..42b13b5c877 100644 --- a/src/storage/mvcc/txn.rs +++ b/src/storage/mvcc/txn.rs @@ -188,11 +188,9 @@ impl<'a> MvccTxn<'a> { pub fn commit(&mut self, key: &Key, commit_ts: u64) -> Result<()> { let (lock_type, short_value) = match self.reader.load_lock(key)? { Some(ref mut lock) if lock.ts == self.start_ts => { - println!("commit branch 1"); (lock.lock_type, lock.short_value.take()) } _ => { - println!("commit branch 2"); return match self.reader.get_txn_commit_info(key, self.start_ts)? { Some((_, WriteType::Rollback)) | None => { MVCC_CONFLICT_COUNTER diff --git a/src/storage/txn/scheduler.rs b/src/storage/txn/scheduler.rs index 89ba1c6517d..646499ed2db 100644 --- a/src/storage/txn/scheduler.rs +++ b/src/storage/txn/scheduler.rs @@ -31,6 +31,7 @@ //! is ensured by the transaction protocol implemented in the client library, which is transparent //! to the scheduler. +use std::collections::HashMap; use std::fmt::{self, Debug, Formatter}; use std::sync::mpsc::Receiver; use std::time::Duration; @@ -52,7 +53,6 @@ use raftstore::store::engine::IterOption; use util::transport::{Error as TransportError, SyncSendCh}; use util::threadpool::{Context as ThreadContext, ThreadPool, ThreadPoolBuilder}; use util::time::SlowTimer; -use util::collections::HashMap; use super::Result; use super::Error; @@ -887,7 +887,7 @@ fn process_write_impl( let mut scan_key = scan_key.take(); let mut modifies: Vec = vec![]; let rows = key_locks.len(); - for (current_key, current_lock) in key_locks { + for &(ref current_key, ref current_lock) in key_locks { let mut txn = MvccTxn::new( snapshot, statistics, @@ -896,7 +896,7 @@ fn process_write_impl( ctx.get_isolation_level(), !ctx.get_not_fill_cache(), ); - let status = txn2status.get(¤t_lock.ts)); + let status = txn2status.get(¤t_lock.ts); let ts = match status { Some(ts) => *ts, None => panic!("txn status not found!"), @@ -908,9 +908,9 @@ fn process_write_impl( commit_ts: ts, }); } - txn.commit(¤t_key, ts)?; + txn.commit(current_key, ts)?; } else { - txn.rollback(¤t_key)?; + txn.rollback(current_key)?; } modifies.append(&mut txn.modifies()); if modifies.len() >= MAX_TXN_WRITE_SIZE { From c9efcc4264e49ffe863e2cdb0fcf92cc909e54ca Mon Sep 17 00:00:00 2001 From: atmzhou Date: Tue, 24 Oct 2017 13:55:24 +0800 Subject: [PATCH 05/13] clean code --- src/server/service/kv.rs | 10 +++++----- src/storage/mod.rs | 8 ++++---- src/storage/txn/scheduler.rs | 20 ++++++++++---------- tests/storage/sync_storage.rs | 4 ++-- 4 files changed, 21 insertions(+), 21 deletions(-) diff --git a/src/server/service/kv.rs b/src/server/service/kv.rs index a7d15e1869c..09685fd289b 100644 --- a/src/server/service/kv.rs +++ b/src/server/service/kv.rs @@ -12,7 +12,6 @@ // limitations under the License. use std::boxed::FnBox; -use std::collections::HashMap; use std::fmt::Debug; use std::io::Write; use std::sync::Arc; @@ -29,6 +28,7 @@ use kvproto::coprocessor::*; use kvproto::errorpb::{Error as RegionError, ServerIsBusy}; use util::worker::Scheduler; +use util::collections::HashMap; use util::buf::PipeBuffer; use storage::{self, Key, Mutation, Options, Storage, Value}; use storage::txn::Error as TxnError; @@ -476,21 +476,21 @@ impl tikvpb_grpc::Tikv for Service { .with_label_values(&[label]) .start_coarse_timer(); - let mut txn_2_status: HashMap = HashMap::new(); + let mut txn_status = HashMap::default(); let start_ts = req.get_start_version(); if start_ts > 0 { let commit_ts = req.get_commit_version(); - txn_2_status.insert(start_ts, commit_ts); + txn_status.insert(start_ts, commit_ts); } else { for temp in req.take_txn_infos().into_iter() { - txn_2_status.insert(temp.txn, temp.status); + txn_status.insert(temp.txn, temp.status); } } let (cb, future) = make_callback(); let res = self.storage - .async_resolve_lock(req.take_context(), txn_2_status, cb); + .async_resolve_lock(req.take_context(), txn_status, cb); if let Err(e) = res { self.send_fail_status(ctx, sink, Error::from(e), RpcStatusCode::ResourceExhausted); return; diff --git a/src/storage/mod.rs b/src/storage/mod.rs index fd3abb01283..db7c75bad47 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -12,7 +12,6 @@ // limitations under the License. use std::thread; -use std::collections::HashMap; use std::boxed::FnBox; use std::fmt::{self, Debug, Display, Formatter}; use std::sync::mpsc::{self, Receiver}; @@ -22,6 +21,7 @@ use std::io::Error as IoError; use std::u64; use kvproto::kvrpcpb::{CommandPri, LockInfo}; use kvproto::errorpb; +use util::collections::HashMap; use self::metrics::*; pub mod engine; @@ -133,7 +133,7 @@ pub enum Command { ScanLock { ctx: Context, max_ts: u64 }, ResolveLock { ctx: Context, - txn2status: HashMap, + txn_status: HashMap, scan_key: Option, key_locks: Vec<(Key, Lock)>, }, @@ -758,12 +758,12 @@ impl Storage { pub fn async_resolve_lock( &self, ctx: Context, - txn2status: HashMap, + txn_status: HashMap, callback: Callback<()>, ) -> Result<()> { let cmd = Command::ResolveLock { ctx: ctx, - txn2status: txn2status, + txn_status: txn_status, scan_key: None, key_locks: vec![], }; diff --git a/src/storage/txn/scheduler.rs b/src/storage/txn/scheduler.rs index 646499ed2db..c4bf995bbed 100644 --- a/src/storage/txn/scheduler.rs +++ b/src/storage/txn/scheduler.rs @@ -587,7 +587,7 @@ fn process_read( } Command::ResolveLock { ref ctx, - ref txn2status, + ref txn_status, ref mut scan_key, .. } => { @@ -602,7 +602,7 @@ fn process_read( let res = reader .scan_lock( scan_key.take(), - |lock| txn2status.contains_key(&lock.ts), + |lock| txn_status.contains_key(&lock.ts), Some(RESOLVE_LOCK_BATCH_SIZE), ) .map_err(Error::from) @@ -616,7 +616,7 @@ fn process_read( } else { Ok(Some(Command::ResolveLock { ctx: ctx.clone(), - txn2status: txn2status.clone(), + txn_status: txn_status.clone(), scan_key: next_scan_key, key_locks: key_locks, })) @@ -880,7 +880,7 @@ fn process_write_impl( } Command::ResolveLock { ref ctx, - ref txn2status, + ref txn_status, ref mut scan_key, ref key_locks, } => { @@ -896,7 +896,7 @@ fn process_write_impl( ctx.get_isolation_level(), !ctx.get_not_fill_cache(), ); - let status = txn2status.get(¤t_lock.ts); + let status = txn_status.get(¤t_lock.ts); let ts = match status { Some(ts) => *ts, None => panic!("txn status not found!"), @@ -924,7 +924,7 @@ fn process_write_impl( let pr = ProcessResult::NextCommand { cmd: Command::ResolveLock { ctx: ctx.clone(), - txn2status: txn2status.clone(), + txn_status: txn_status.clone(), scan_key: scan_key.take(), key_locks: vec![], }, @@ -1537,15 +1537,15 @@ pub fn gen_command_lock(latches: &Latches, cmd: &Command) -> Lock { #[cfg(test)] mod tests { use super::*; - use std::collections::HashMap; use kvproto::kvrpcpb::Context; + use util::collections::HashMap; use storage::txn::latch::*; use storage::{make_key, Command, Mutation, Options}; use storage::mvcc; #[test] fn test_command_latches() { - let mut temp_map = HashMap::new(); + let mut temp_map = HashMap::default(); temp_map.insert(10, 20); let readonly_cmds = vec![ Command::Get { @@ -1571,7 +1571,7 @@ mod tests { }, Command::ResolveLock { ctx: Context::new(), - txn2status: temp_map.clone(), + txn_status: temp_map.clone(), scan_key: None, key_locks: vec![], }, @@ -1617,7 +1617,7 @@ mod tests { }, Command::ResolveLock { ctx: Context::new(), - txn2status: temp_map.clone(), + txn_status: temp_map.clone(), scan_key: None, key_locks: vec![ ( diff --git a/tests/storage/sync_storage.rs b/tests/storage/sync_storage.rs index e53b129caeb..7133e5bc0d3 100644 --- a/tests/storage/sync_storage.rs +++ b/tests/storage/sync_storage.rs @@ -13,8 +13,8 @@ use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::collections::HashMap; +use tikv::util::collections::HashMap; use tikv::storage::{Engine, Key, KvPair, Mutation, Options, Result, Storage, Value}; use tikv::storage::config::Config; use kvproto::kvrpcpb::{Context, LockInfo}; @@ -151,7 +151,7 @@ impl SyncStorage { } pub fn resolve_lock(&self, ctx: Context, start_ts: u64, commit_ts: Option) -> Result<()> { - let mut temp_map = HashMap::new(); + let mut temp_map = HashMap::default(); let temp = match commit_ts { None => 0, Some(x) => x, From b53b1a097bd98f378185a43e6a5fb6d3d527a45d Mon Sep 17 00:00:00 2001 From: atmzhou Date: Tue, 24 Oct 2017 18:11:25 +0800 Subject: [PATCH 06/13] add test --- tests/storage/assert_storage.rs | 16 ++++++++++++++++ tests/storage/sync_storage.rs | 17 +++++++++++++++++ tests/storage/test_storage.rs | 27 +++++++++++++++++++++++++++ 3 files changed, 60 insertions(+) diff --git a/tests/storage/assert_storage.rs b/tests/storage/assert_storage.rs index cf0f85c9e6a..77f2b401d90 100644 --- a/tests/storage/assert_storage.rs +++ b/tests/storage/assert_storage.rs @@ -389,6 +389,22 @@ impl AssertionStorage { .unwrap(); } + pub fn resolve_lock_batch_ok( + &self, + start_ts_1: u64, + commit_ts_1: u64, + start_ts_2: u64, + commit_ts_2: u64, + ) { + self.store + .resolve_lock_batch( + self.ctx.clone(), + vec![start_ts_1, start_ts_2], + vec![commit_ts_1, commit_ts_2], + ) + .unwrap(); + } + pub fn resolve_lock_with_illegal_tso(&self, start_ts: u64, commit_ts: Option) { let resp = self.store .resolve_lock(self.ctx.clone(), start_ts, commit_ts); diff --git a/tests/storage/sync_storage.rs b/tests/storage/sync_storage.rs index 7133e5bc0d3..0204a5afc9f 100644 --- a/tests/storage/sync_storage.rs +++ b/tests/storage/sync_storage.rs @@ -164,6 +164,23 @@ impl SyncStorage { }).unwrap() } + pub fn resolve_lock_batch( + &self, + ctx: Context, + start_ts: Vec, + commit_ts: Vec, + ) -> Result<()> { + let mut txn_status = HashMap::default(); + for i in 0..start_ts.len() { + txn_status.insert(start_ts[i], commit_ts[i]); + } + wait_op!(|cb| { + self.store + .async_resolve_lock(ctx, txn_status.clone(), cb) + .unwrap() + }).unwrap() + } + pub fn gc(&self, ctx: Context, safe_point: u64) -> Result<()> { wait_op!(|cb| self.store.async_gc(ctx, safe_point, cb).unwrap()).unwrap() } diff --git a/tests/storage/test_storage.rs b/tests/storage/test_storage.rs index 6b42101cc57..53d19bda5c7 100644 --- a/tests/storage/test_storage.rs +++ b/tests/storage/test_storage.rs @@ -391,6 +391,33 @@ fn test_txn_store_resolve_lock_batch(key_prefix_len: usize, n: usize) { } } +#[test] +fn test_txn_store_resolve_lock_in_a_batch() { + let store = AssertionStorage::default(); + + store.prewrite_ok( + vec![ + Mutation::Put((make_key(b"p1"), b"v5".to_vec())), + Mutation::Put((make_key(b"s1"), b"v5".to_vec())), + ], + b"p1", + 5, + ); + store.prewrite_ok( + vec![ + Mutation::Put((make_key(b"p2"), b"v10".to_vec())), + Mutation::Put((make_key(b"s2"), b"v10".to_vec())), + ], + b"p2", + 10, + ); + store.resolve_lock_batch_ok(5, 0, 10, 20); + store.get_none(b"p1", 20); + store.get_none(b"s1", 30); + store.get_ok(b"p2", 20, b"v10"); + store.get_ok(b"s2", 30, b"v10"); + store.scan_lock_ok(30, vec![]); +} #[test] fn test_txn_store_resolve_lock2() { for &i in &[ From 1ed7a1e7f8546d2d3958c2a849ccbbe264e0f461 Mon Sep 17 00:00:00 2001 From: atmzhou Date: Tue, 24 Oct 2017 18:31:59 +0800 Subject: [PATCH 07/13] clean code --- src/server/service/kv.rs | 4 ++-- src/storage/txn/scheduler.rs | 1 - tests/storage/sync_storage.rs | 16 ++++------------ 3 files changed, 6 insertions(+), 15 deletions(-) diff --git a/src/server/service/kv.rs b/src/server/service/kv.rs index 09685fd289b..c70bf363bb3 100644 --- a/src/server/service/kv.rs +++ b/src/server/service/kv.rs @@ -483,8 +483,8 @@ impl tikvpb_grpc::Tikv for Service { let commit_ts = req.get_commit_version(); txn_status.insert(start_ts, commit_ts); } else { - for temp in req.take_txn_infos().into_iter() { - txn_status.insert(temp.txn, temp.status); + for info in req.take_txn_infos().into_iter() { + txn_status.insert(info.txn, info.status); } } diff --git a/src/storage/txn/scheduler.rs b/src/storage/txn/scheduler.rs index c4bf995bbed..f68317d31ed 100644 --- a/src/storage/txn/scheduler.rs +++ b/src/storage/txn/scheduler.rs @@ -1375,7 +1375,6 @@ impl Scheduler { .with_label_values(&[self.get_ctx_tag(cid), "write"]) .inc(); if to_be_write.is_empty() { - info!("modifies is empty"); return self.on_write_finished(cid, pr, Ok(())); } let engine_cb = make_engine_cb(cmd.tag(), cid, pr, self.schedch.clone(), rows); diff --git a/tests/storage/sync_storage.rs b/tests/storage/sync_storage.rs index 0204a5afc9f..3ca8b151544 100644 --- a/tests/storage/sync_storage.rs +++ b/tests/storage/sync_storage.rs @@ -151,16 +151,10 @@ impl SyncStorage { } pub fn resolve_lock(&self, ctx: Context, start_ts: u64, commit_ts: Option) -> Result<()> { - let mut temp_map = HashMap::default(); - let temp = match commit_ts { - None => 0, - Some(x) => x, - }; - temp_map.insert(start_ts, temp); + let mut txn_status = HashMap::default(); + txn_status.insert(start_ts, commit_ts.unwrap_or(0)); wait_op!(|cb| { - self.store - .async_resolve_lock(ctx, temp_map.clone(), cb) - .unwrap() + self.store.async_resolve_lock(ctx, txn_status, cb).unwrap() }).unwrap() } @@ -175,9 +169,7 @@ impl SyncStorage { txn_status.insert(start_ts[i], commit_ts[i]); } wait_op!(|cb| { - self.store - .async_resolve_lock(ctx, txn_status.clone(), cb) - .unwrap() + self.store.async_resolve_lock(ctx, txn_status, cb).unwrap() }).unwrap() } From c06d81ca9c69f72f169d501b5d3f305962fb66a8 Mon Sep 17 00:00:00 2001 From: disksing Date: Wed, 25 Oct 2017 15:20:15 +0800 Subject: [PATCH 08/13] address comments. --- src/storage/txn/scheduler.rs | 14 ++++++++------ tests/storage/test_storage.rs | 1 + 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/storage/txn/scheduler.rs b/src/storage/txn/scheduler.rs index f68317d31ed..6c053e84dd1 100644 --- a/src/storage/txn/scheduler.rs +++ b/src/storage/txn/scheduler.rs @@ -886,6 +886,7 @@ fn process_write_impl( } => { let mut scan_key = scan_key.take(); let mut modifies: Vec = vec![]; + let mut write_size = 0; let rows = key_locks.len(); for &(ref current_key, ref current_lock) in key_locks { let mut txn = MvccTxn::new( @@ -897,23 +898,24 @@ fn process_write_impl( !ctx.get_not_fill_cache(), ); let status = txn_status.get(¤t_lock.ts); - let ts = match status { + let commit_ts = match status { Some(ts) => *ts, None => panic!("txn status not found!"), }; - if ts > 0 { - if current_lock.ts >= ts { + if commit_ts > 0 { + if current_lock.ts >= commit_ts { return Err(Error::InvalidTxnTso { start_ts: current_lock.ts, - commit_ts: ts, + commit_ts: commit_ts, }); } - txn.commit(current_key, ts)?; + txn.commit(current_key, commit_ts)?; } else { txn.rollback(current_key)?; } + write_size += txn.write_size(); modifies.append(&mut txn.modifies()); - if modifies.len() >= MAX_TXN_WRITE_SIZE { + if write_size >= MAX_TXN_WRITE_SIZE { scan_key = Some(current_key.to_owned()); break; } diff --git a/tests/storage/test_storage.rs b/tests/storage/test_storage.rs index 53d19bda5c7..9eeac98d8be 100644 --- a/tests/storage/test_storage.rs +++ b/tests/storage/test_storage.rs @@ -418,6 +418,7 @@ fn test_txn_store_resolve_lock_in_a_batch() { store.get_ok(b"s2", 30, b"v10"); store.scan_lock_ok(30, vec![]); } + #[test] fn test_txn_store_resolve_lock2() { for &i in &[ From 5ef4d87be9a2d7ccf9eb7fd90f2a302097220ab8 Mon Sep 17 00:00:00 2001 From: disksing Date: Wed, 25 Oct 2017 15:51:20 +0800 Subject: [PATCH 09/13] address comments. --- src/storage/txn/scheduler.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/storage/txn/scheduler.rs b/src/storage/txn/scheduler.rs index 6c053e84dd1..a3eb3959ab4 100644 --- a/src/storage/txn/scheduler.rs +++ b/src/storage/txn/scheduler.rs @@ -587,7 +587,7 @@ fn process_read( } Command::ResolveLock { ref ctx, - ref txn_status, + ref mut txn_status, ref mut scan_key, .. } => { @@ -616,7 +616,7 @@ fn process_read( } else { Ok(Some(Command::ResolveLock { ctx: ctx.clone(), - txn_status: txn_status.clone(), + txn_status: txn_status.drain().collect(), scan_key: next_scan_key, key_locks: key_locks, })) From f03b228c13e57ddad617144b395aba5914986615 Mon Sep 17 00:00:00 2001 From: disksing Date: Thu, 26 Oct 2017 09:54:15 +0800 Subject: [PATCH 10/13] address comments. --- src/storage/txn/scheduler.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/storage/txn/scheduler.rs b/src/storage/txn/scheduler.rs index a3eb3959ab4..0d72ba6c246 100644 --- a/src/storage/txn/scheduler.rs +++ b/src/storage/txn/scheduler.rs @@ -38,6 +38,7 @@ use std::time::Duration; use std::thread; use std::hash::{Hash, Hasher}; use std::u64; +use std::mem; use prometheus::HistogramTimer; use kvproto::kvrpcpb::{CommandPri, Context, LockInfo}; @@ -616,7 +617,7 @@ fn process_read( } else { Ok(Some(Command::ResolveLock { ctx: ctx.clone(), - txn_status: txn_status.drain().collect(), + txn_status: mem::replace(txn_status, Default::default()), scan_key: next_scan_key, key_locks: key_locks, })) From 1cc7613dacc0c62f4ba49ed2caf6fa9f8d8dd21a Mon Sep 17 00:00:00 2001 From: disksing Date: Mon, 30 Oct 2017 14:23:48 +0800 Subject: [PATCH 11/13] address comment. --- src/storage/txn/scheduler.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/storage/txn/scheduler.rs b/src/storage/txn/scheduler.rs index 90c34c99f58..975edb17156 100644 --- a/src/storage/txn/scheduler.rs +++ b/src/storage/txn/scheduler.rs @@ -64,7 +64,9 @@ use super::super::metrics::*; // TODO: make it configurable. pub const GC_BATCH_SIZE: usize = 512; -pub const RESOLVE_LOCK_BATCH_SIZE: usize = 512; +// To resolve a key, the write size is about 100~150 bytes, depending on key and value length. +// The write batch will be around 32KB if we scan 256 keys each time. +pub const RESOLVE_LOCK_BATCH_SIZE: usize = 256; /// Process result of a command. pub enum ProcessResult { From 2ac4bac581ab9436e78c6db347b714c1a85dc58d Mon Sep 17 00:00:00 2001 From: disksing Date: Mon, 30 Oct 2017 16:33:09 +0800 Subject: [PATCH 12/13] address comment. --- src/server/service/kv.rs | 1 - src/storage/txn/scheduler.rs | 8 ++++---- tests/storage/assert_storage.rs | 3 +-- tests/storage/sync_storage.rs | 12 ++---------- tests/storage/test_storage.rs | 4 ++-- 5 files changed, 9 insertions(+), 19 deletions(-) diff --git a/src/server/service/kv.rs b/src/server/service/kv.rs index c70bf363bb3..8bf7477d00e 100644 --- a/src/server/service/kv.rs +++ b/src/server/service/kv.rs @@ -515,7 +515,6 @@ impl tikvpb_grpc::Tikv for Service { }); ctx.spawn(future); - } fn kv_gc(&self, ctx: RpcContext, mut req: GCRequest, sink: UnarySink) { diff --git a/src/storage/txn/scheduler.rs b/src/storage/txn/scheduler.rs index 975edb17156..509e4396aca 100644 --- a/src/storage/txn/scheduler.rs +++ b/src/storage/txn/scheduler.rs @@ -31,7 +31,6 @@ //! is ensured by the transaction protocol implemented in the client library, which is transparent //! to the scheduler. -use std::collections::HashMap; use std::fmt::{self, Debug, Formatter}; use std::sync::mpsc::Receiver; use std::time::Duration; @@ -54,6 +53,7 @@ use raftstore::store::engine::IterOption; use util::transport::{Error as TransportError, SyncSendCh}; use util::threadpool::{Context as ThreadContext, ThreadPool, ThreadPoolBuilder}; use util::time::SlowTimer; +use util::collections::HashMap; use super::Result; use super::Error; @@ -883,7 +883,7 @@ fn process_write_impl( } Command::ResolveLock { ref ctx, - ref txn_status, + ref mut txn_status, ref mut scan_key, ref key_locks, } => { @@ -903,7 +903,7 @@ fn process_write_impl( let status = txn_status.get(¤t_lock.ts); let commit_ts = match status { Some(ts) => *ts, - None => panic!("txn status not found!"), + None => panic!("txn status not found."), }; if commit_ts > 0 { if current_lock.ts >= commit_ts { @@ -929,7 +929,7 @@ fn process_write_impl( let pr = ProcessResult::NextCommand { cmd: Command::ResolveLock { ctx: ctx.clone(), - txn_status: txn_status.clone(), + txn_status: mem::replace(txn_status, Default::default()), scan_key: scan_key.take(), key_locks: vec![], }, diff --git a/tests/storage/assert_storage.rs b/tests/storage/assert_storage.rs index 77f2b401d90..c6d0adba664 100644 --- a/tests/storage/assert_storage.rs +++ b/tests/storage/assert_storage.rs @@ -399,8 +399,7 @@ impl AssertionStorage { self.store .resolve_lock_batch( self.ctx.clone(), - vec![start_ts_1, start_ts_2], - vec![commit_ts_1, commit_ts_2], + vec![(start_ts_1, commit_ts_1), (start_ts_2, commit_ts_2)], ) .unwrap(); } diff --git a/tests/storage/sync_storage.rs b/tests/storage/sync_storage.rs index 3ca8b151544..c1705c87922 100644 --- a/tests/storage/sync_storage.rs +++ b/tests/storage/sync_storage.rs @@ -158,16 +158,8 @@ impl SyncStorage { }).unwrap() } - pub fn resolve_lock_batch( - &self, - ctx: Context, - start_ts: Vec, - commit_ts: Vec, - ) -> Result<()> { - let mut txn_status = HashMap::default(); - for i in 0..start_ts.len() { - txn_status.insert(start_ts[i], commit_ts[i]); - } + pub fn resolve_lock_batch(&self, ctx: Context, txns: Vec<(u64, u64)>) -> Result<()> { + let txn_status: HashMap = txns.into_iter().collect(); wait_op!(|cb| { self.store.async_resolve_lock(ctx, txn_status, cb).unwrap() }).unwrap() diff --git a/tests/storage/test_storage.rs b/tests/storage/test_storage.rs index 9eeac98d8be..04ea0ef7869 100644 --- a/tests/storage/test_storage.rs +++ b/tests/storage/test_storage.rs @@ -412,9 +412,9 @@ fn test_txn_store_resolve_lock_in_a_batch() { 10, ); store.resolve_lock_batch_ok(5, 0, 10, 20); - store.get_none(b"p1", 20); + store.get_none(b"p1", 30); store.get_none(b"s1", 30); - store.get_ok(b"p2", 20, b"v10"); + store.get_ok(b"p2", 30, b"v10"); store.get_ok(b"s2", 30, b"v10"); store.scan_lock_ok(30, vec![]); } From 68d5eb86b3d6662fd1abaf118cb8463e99b2d3e1 Mon Sep 17 00:00:00 2001 From: disksing Date: Fri, 10 Nov 2017 13:25:26 +0800 Subject: [PATCH 13/13] address comment. --- src/server/service/kv.rs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/src/server/service/kv.rs b/src/server/service/kv.rs index 8bf7477d00e..6a2ef9cd0c6 100644 --- a/src/server/service/kv.rs +++ b/src/server/service/kv.rs @@ -14,6 +14,7 @@ use std::boxed::FnBox; use std::fmt::Debug; use std::io::Write; +use std::iter::{self, FromIterator}; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use mio::Token; @@ -476,17 +477,17 @@ impl tikvpb_grpc::Tikv for Service { .with_label_values(&[label]) .start_coarse_timer(); - let mut txn_status = HashMap::default(); - - let start_ts = req.get_start_version(); - if start_ts > 0 { - let commit_ts = req.get_commit_version(); - txn_status.insert(start_ts, commit_ts); + let txn_status = if req.get_start_version() > 0 { + HashMap::from_iter(iter::once( + (req.get_start_version(), req.get_commit_version()), + )) } else { - for info in req.take_txn_infos().into_iter() { - txn_status.insert(info.txn, info.status); - } - } + HashMap::from_iter( + req.take_txn_infos() + .into_iter() + .map(|info| (info.txn, info.status)), + ) + }; let (cb, future) = make_callback(); let res = self.storage