Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tikv/resolvelock: resolve lock in a batch #2389

Merged
merged 22 commits into from Nov 11, 2017
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 13 additions & 5 deletions src/server/service/kv.rs
Expand Up @@ -28,6 +28,7 @@ use kvproto::coprocessor::*;
use kvproto::errorpb::{Error as RegionError, ServerIsBusy};

use util::worker::Scheduler;
use util::collections::HashMap;
use util::buf::PipeBuffer;
use storage::{self, Key, Mutation, Options, Storage, Value};
use storage::txn::Error as TxnError;
Expand Down Expand Up @@ -475,14 +476,21 @@ impl<T: RaftStoreRouter + 'static> tikvpb_grpc::Tikv for Service<T> {
.with_label_values(&[label])
.start_coarse_timer();

let commit_ts = match req.get_commit_version() {
0 => None,
x => Some(x),
};
let mut txn_status = HashMap::default();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with capacity


let start_ts = req.get_start_version();
if start_ts > 0 {
let commit_ts = req.get_commit_version();
txn_status.insert(start_ts, commit_ts);
} else {
for info in req.take_txn_infos().into_iter() {
txn_status.insert(info.txn, info.status);
}
}

let (cb, future) = make_callback();
let res = self.storage
.async_resolve_lock(req.take_context(), req.get_start_version(), commit_ts, cb);
.async_resolve_lock(req.take_context(), txn_status, cb);
if let Err(e) = res {
self.send_fail_status(ctx, sink, Error::from(e), RpcStatusCode::ResourceExhausted);
return;
Expand Down
43 changes: 17 additions & 26 deletions src/storage/mod.rs
Expand Up @@ -21,6 +21,7 @@ use std::io::Error as IoError;
use std::u64;
use kvproto::kvrpcpb::{CommandPri, LockInfo};
use kvproto::errorpb;
use util::collections::HashMap;
use self::metrics::*;

pub mod engine;
Expand All @@ -35,6 +36,7 @@ pub use self::engine::{new_local_engine, CFStatistics, Cursor, Engine, Error as
FlowStatistics, Modify, ScanMode, Snapshot, Statistics, StatisticsSummary,
TEMP_DIR};
pub use self::engine::raftkv::RaftKv;
use self::mvcc::Lock;
pub use self::txn::{Msg, Scheduler, SnapshotStore, StoreScanner};
pub use self::types::{make_key, Key, KvPair, MvccInfo, Value};
pub type Callback<T> = Box<FnBox(Result<T>) + Send>;
Expand Down Expand Up @@ -131,10 +133,9 @@ pub enum Command {
ScanLock { ctx: Context, max_ts: u64 },
ResolveLock {
ctx: Context,
start_ts: u64,
commit_ts: Option<u64>,
txn_status: HashMap<u64, u64>,
scan_key: Option<Key>,
keys: Vec<Key>,
key_locks: Vec<(Key, Lock)>,
},
Gc {
ctx: Context,
Expand Down Expand Up @@ -241,18 +242,7 @@ impl Display for Command {
Command::ScanLock {
ref ctx, max_ts, ..
} => write!(f, "kv::scan_lock {} | {:?}", max_ts, ctx),
Command::ResolveLock {
ref ctx,
start_ts,
commit_ts,
..
} => write!(
f,
"kv::resolve_txn {} -> {:?} | {:?}",
start_ts,
commit_ts,
ctx
),
Command::ResolveLock { .. } => write!(f, "kv::resolve_lock"),
Command::Gc {
ref ctx,
safe_point,
Expand Down Expand Up @@ -328,7 +318,7 @@ impl Command {
Command::Pause { .. } |
Command::MvccByKey { .. } |
Command::MvccByStartTs { .. } => true,
Command::ResolveLock { ref keys, .. } |
Command::ResolveLock { ref key_locks, .. } => key_locks.is_empty(),
Command::Gc { ref keys, .. } => keys.is_empty(),
_ => false,
}
Expand Down Expand Up @@ -379,11 +369,11 @@ impl Command {
Command::Prewrite { start_ts, .. } |
Command::Cleanup { start_ts, .. } |
Command::Rollback { start_ts, .. } |
Command::ResolveLock { start_ts, .. } |
Command::MvccByStartTs { start_ts, .. } => start_ts,
Command::Commit { lock_ts, .. } => lock_ts,
Command::ScanLock { max_ts, .. } => max_ts,
Command::Gc { safe_point, .. } => safe_point,
Command::ResolveLock { .. } |
Command::RawGet { .. } |
Command::RawScan { .. } |
Command::DeleteRange { .. } |
Expand Down Expand Up @@ -448,10 +438,13 @@ impl Command {
}
}
},
Command::Commit { ref keys, .. } |
Command::Rollback { ref keys, .. } |
Command::ResolveLock { ref keys, .. } => for key in keys {
bytes += key.encoded().len();
Command::Commit { ref keys, .. } | Command::Rollback { ref keys, .. } => {
for key in keys {
bytes += key.encoded().len();
}
}
Command::ResolveLock { ref key_locks, .. } => for lock in key_locks {
bytes += lock.0.encoded().len();
},
Command::Cleanup { ref key, .. } => {
bytes += key.encoded().len();
Expand Down Expand Up @@ -774,16 +767,14 @@ impl Storage {
pub fn async_resolve_lock(
&self,
ctx: Context,
start_ts: u64,
commit_ts: Option<u64>,
txn_status: HashMap<u64, u64>,
callback: Callback<()>,
) -> Result<()> {
let cmd = Command::ResolveLock {
ctx: ctx,
start_ts: start_ts,
commit_ts: commit_ts,
txn_status: txn_status,
scan_key: None,
keys: vec![],
key_locks: vec![],
};
let tag = cmd.tag();
self.send(cmd, StorageCb::Boolean(callback))?;
Expand Down
120 changes: 68 additions & 52 deletions src/storage/txn/scheduler.rs
Expand Up @@ -37,6 +37,7 @@ use std::time::Duration;
use std::thread;
use std::hash::{Hash, Hasher};
use std::u64;
use std::mem;

use prometheus::HistogramTimer;
use kvproto::kvrpcpb::{CommandPri, Context, LockInfo};
Expand All @@ -63,7 +64,9 @@ use super::super::metrics::*;
// TODO: make it configurable.
pub const GC_BATCH_SIZE: usize = 512;

pub const RESOLVE_LOCK_BATCH_SIZE: usize = 512;
// To resolve a key, the write size is about 100~150 bytes, depending on key and value length.
// The write batch will be around 32KB if we scan 256 keys each time.
pub const RESOLVE_LOCK_BATCH_SIZE: usize = 256;

/// Process result of a command.
pub enum ProcessResult {
Expand Down Expand Up @@ -585,12 +588,9 @@ fn process_read(
Err(e) => ProcessResult::Failed { err: e.into() },
}
}
// Scan the locks with timestamp `start_ts`, then either commit them if the command has
// commit timestamp populated or rollback otherwise.
Command::ResolveLock {
ref ctx,
start_ts,
commit_ts,
ref mut txn_status,
ref mut scan_key,
..
} => {
Expand All @@ -605,24 +605,23 @@ fn process_read(
let res = reader
.scan_lock(
scan_key.take(),
|lock| lock.ts == start_ts,
|lock| txn_status.contains_key(&lock.ts),
Some(RESOLVE_LOCK_BATCH_SIZE),
)
.map_err(Error::from)
.and_then(|(v, next_scan_key)| {
let keys: Vec<Key> = v.into_iter().map(|x| x.0).collect();
let key_locks: Vec<_> = v.into_iter().map(|x| x).collect();
KV_COMMAND_KEYREAD_HISTOGRAM_VEC
.with_label_values(&[tag])
.observe(keys.len() as f64);
if keys.is_empty() {
.observe(key_locks.len() as f64);
if key_locks.is_empty() {
Ok(None)
} else {
Ok(Some(Command::ResolveLock {
ctx: ctx.clone(),
start_ts: start_ts,
commit_ts: commit_ts,
txn_status: mem::replace(txn_status, Default::default()),
scan_key: next_scan_key,
keys: keys,
key_locks: key_locks,
}))
}
});
Expand Down Expand Up @@ -884,52 +883,58 @@ fn process_write_impl(
}
Command::ResolveLock {
ref ctx,
start_ts,
commit_ts,
ref mut txn_status,
ref mut scan_key,
ref keys,
ref key_locks,
} => {
if let Some(cts) = commit_ts {
if cts <= start_ts {
return Err(Error::InvalidTxnTso {
start_ts: start_ts,
commit_ts: cts,
});
}
}
let mut scan_key = scan_key.take();
let mut txn = MvccTxn::new(
snapshot,
statistics,
start_ts,
None,
ctx.get_isolation_level(),
!ctx.get_not_fill_cache(),
);
let rows = keys.len();
for k in keys {
match commit_ts {
Some(ts) => txn.commit(k, ts)?,
None => txn.rollback(k)?,
let mut modifies: Vec<Modify> = vec![];
let mut write_size = 0;
let rows = key_locks.len();
for &(ref current_key, ref current_lock) in key_locks {
let mut txn = MvccTxn::new(
snapshot,
statistics,
current_lock.ts,
None,
ctx.get_isolation_level(),
!ctx.get_not_fill_cache(),
);
let status = txn_status.get(&current_lock.ts);
let commit_ts = match status {
Some(ts) => *ts,
None => panic!("txn status not found."),
};
if commit_ts > 0 {
if current_lock.ts >= commit_ts {
return Err(Error::InvalidTxnTso {
start_ts: current_lock.ts,
commit_ts: commit_ts,
});
}
txn.commit(current_key, commit_ts)?;
} else {
txn.rollback(current_key)?;
}
if txn.write_size() >= MAX_TXN_WRITE_SIZE {
scan_key = Some(k.to_owned());
write_size += txn.write_size();
modifies.append(&mut txn.modifies());
if write_size >= MAX_TXN_WRITE_SIZE {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please adjust MAX_TXN_WRITE_SIZE and RESOLVE_LOCK_BATCH_SIZE to make scan_lock never scan duplicated keys.

scan_key = Some(current_key.to_owned());
break;
}
}
if scan_key.is_none() {
(ProcessResult::Res, txn.modifies(), rows)
(ProcessResult::Res, modifies, rows)
} else {
let pr = ProcessResult::NextCommand {
cmd: Command::ResolveLock {
ctx: ctx.clone(),
start_ts: start_ts,
commit_ts: commit_ts,
txn_status: mem::replace(txn_status, Default::default()),
scan_key: scan_key.take(),
keys: vec![],
key_locks: vec![],
},
};
(pr, txn.modifies(), rows)
(pr, modifies, rows)
}
}
Command::Gc {
Expand Down Expand Up @@ -1535,9 +1540,13 @@ pub fn gen_command_lock(latches: &Latches, cmd: &Command) -> Lock {
let keys: Vec<&Key> = mutations.iter().map(|x| x.key()).collect();
latches.gen_lock(&keys)
}
Command::Commit { ref keys, .. } |
Command::Rollback { ref keys, .. } |
Command::ResolveLock { ref keys, .. } => latches.gen_lock(keys),
Command::ResolveLock { ref key_locks, .. } => {
let keys: Vec<&Key> = key_locks.iter().map(|x| &x.0).collect();
latches.gen_lock(&keys)
}
Command::Commit { ref keys, .. } | Command::Rollback { ref keys, .. } => {
latches.gen_lock(keys)
}
Command::Cleanup { ref key, .. } => latches.gen_lock(&[key]),
_ => Lock::new(vec![]),
}
Expand All @@ -1547,11 +1556,15 @@ pub fn gen_command_lock(latches: &Latches, cmd: &Command) -> Lock {
mod tests {
use super::*;
use kvproto::kvrpcpb::Context;
use util::collections::HashMap;
use storage::txn::latch::*;
use storage::{make_key, Command, Mutation, Options};
use storage::mvcc;

#[test]
fn test_command_latches() {
let mut temp_map = HashMap::default();
temp_map.insert(10, 20);
let readonly_cmds = vec![
Command::Get {
ctx: Context::new(),
Expand All @@ -1576,10 +1589,9 @@ mod tests {
},
Command::ResolveLock {
ctx: Context::new(),
start_ts: 10,
commit_ts: Some(20),
txn_status: temp_map.clone(),
scan_key: None,
keys: vec![],
key_locks: vec![],
},
Command::Gc {
ctx: Context::new(),
Expand Down Expand Up @@ -1623,10 +1635,14 @@ mod tests {
},
Command::ResolveLock {
ctx: Context::new(),
start_ts: 10,
commit_ts: Some(20),
txn_status: temp_map.clone(),
scan_key: None,
keys: vec![make_key(b"k")],
key_locks: vec![
(
make_key(b"k"),
mvcc::Lock::new(mvcc::LockType::Put, b"k".to_vec(), 10, 20, None),
),
],
},
];

Expand Down
9 changes: 7 additions & 2 deletions tests/raftstore/test_service.rs
Expand Up @@ -332,6 +332,8 @@ fn test_mvcc_rollback_and_cleanup() {

#[test]
fn test_mvcc_resolve_lock_gc_and_delete() {
use kvproto::kvrpcpb::*;
use protobuf;
let (_cluster, client, ctx) = must_new_cluster_and_kv_client();
let (k, v) = (b"key".to_vec(), b"value".to_vec());

Expand Down Expand Up @@ -388,9 +390,12 @@ fn test_mvcc_resolve_lock_gc_and_delete() {
ts += 1;
let resolve_lock_commit_version = ts;
let mut resolve_lock_req = ResolveLockRequest::new();
let mut temp_txninfo = TxnInfo::new();
temp_txninfo.txn = prewrite_start_version2;
temp_txninfo.status = resolve_lock_commit_version;
let vec_txninfo = vec![temp_txninfo];
resolve_lock_req.set_context(ctx.clone());
resolve_lock_req.start_version = prewrite_start_version2;
resolve_lock_req.commit_version = resolve_lock_commit_version;
resolve_lock_req.set_txn_infos(protobuf::RepeatedField::from_vec(vec_txninfo));
let resolve_lock_resp = client.kv_resolve_lock(resolve_lock_req).unwrap();
assert!(!resolve_lock_resp.has_region_error());
assert!(!resolve_lock_resp.has_error());
Expand Down