Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tikv/resolvelock: resolve lock in a batch #2389

Merged
merged 22 commits into from
Nov 11, 2017
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 14 additions & 5 deletions src/server/service/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// limitations under the License.

use std::boxed::FnBox;
use std::collections::HashMap;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use util HashMap

use std::fmt::Debug;
use std::io::Write;
use std::sync::Arc;
Expand Down Expand Up @@ -475,14 +476,21 @@ impl<T: RaftStoreRouter + 'static> tikvpb_grpc::Tikv for Service<T> {
.with_label_values(&[label])
.start_coarse_timer();

let commit_ts = match req.get_commit_version() {
0 => None,
x => Some(x),
};
let mut txn_2_status: HashMap<u64, u64> = HashMap::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/txn_2_status/txn_status/


let start_ts = req.get_start_version();
if start_ts > 0 {
let commit_ts = req.get_commit_version();
txn_2_status.insert(start_ts, commit_ts);
} else {
for temp in req.take_txn_infos().into_iter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

temp -> info.

txn_2_status.insert(temp.txn, temp.status);
}
}

let (cb, future) = make_callback();
let res = self.storage
.async_resolve_lock(req.take_context(), req.get_start_version(), commit_ts, cb);
.async_resolve_lock(req.take_context(), txn_2_status, cb);
if let Err(e) = res {
self.send_fail_status(ctx, sink, Error::from(e), RpcStatusCode::ResourceExhausted);
return;
Expand All @@ -507,6 +515,7 @@ impl<T: RaftStoreRouter + 'static> tikvpb_grpc::Tikv for Service<T> {
});

ctx.spawn(future);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove empty line

}

fn kv_gc(&self, ctx: RpcContext, mut req: GCRequest, sink: UnarySink<GCResponse>) {
Expand Down
36 changes: 12 additions & 24 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// limitations under the License.

use std::thread;
use std::collections::HashMap;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

use std::boxed::FnBox;
use std::fmt::{self, Debug, Display, Formatter};
use std::sync::mpsc::{self, Receiver};
Expand All @@ -35,6 +36,7 @@ pub use self::engine::{new_local_engine, CFStatistics, Cursor, Engine, Error as
FlowStatistics, Modify, ScanMode, Snapshot, Statistics, StatisticsSummary,
TEMP_DIR};
pub use self::engine::raftkv::RaftKv;
use self::mvcc::Lock;
pub use self::txn::{Msg, Scheduler, SnapshotStore, StoreScanner};
pub use self::types::{make_key, Key, KvPair, MvccInfo, Value};
pub type Callback<T> = Box<FnBox(Result<T>) + Send>;
Expand Down Expand Up @@ -131,10 +133,9 @@ pub enum Command {
ScanLock { ctx: Context, max_ts: u64 },
ResolveLock {
ctx: Context,
start_ts: u64,
commit_ts: Option<u64>,
txn2status: HashMap<u64, u64>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/txn2status/txn_status/

scan_key: Option<Key>,
keys: Vec<Key>,
key_locks: Vec<(Key, Lock)>,
},
Gc {
ctx: Context,
Expand Down Expand Up @@ -241,18 +242,7 @@ impl Display for Command {
Command::ScanLock {
ref ctx, max_ts, ..
} => write!(f, "kv::scan_lock {} | {:?}", max_ts, ctx),
Command::ResolveLock {
ref ctx,
start_ts,
commit_ts,
..
} => write!(
f,
"kv::resolve_txn {} -> {:?} | {:?}",
start_ts,
commit_ts,
ctx
),
Command::ResolveLock { .. } => write!(f, "kv::resolve_lock"),
Command::Gc {
ref ctx,
safe_point,
Expand Down Expand Up @@ -328,7 +318,7 @@ impl Command {
Command::Pause { .. } |
Command::MvccByKey { .. } |
Command::MvccByStartTs { .. } => true,
Command::ResolveLock { ref keys, .. } |
Command::ResolveLock { ref key_locks, .. } => key_locks.is_empty(),
Command::Gc { ref keys, .. } => keys.is_empty(),
_ => false,
}
Expand Down Expand Up @@ -379,11 +369,11 @@ impl Command {
Command::Prewrite { start_ts, .. } |
Command::Cleanup { start_ts, .. } |
Command::Rollback { start_ts, .. } |
Command::ResolveLock { start_ts, .. } |
Command::MvccByStartTs { start_ts, .. } => start_ts,
Command::Commit { lock_ts, .. } => lock_ts,
Command::ScanLock { max_ts, .. } => max_ts,
Command::Gc { safe_point, .. } => safe_point,
Command::ResolveLock { .. } |
Command::RawGet { .. } |
Command::RawScan { .. } |
Command::DeleteRange { .. } |
Expand Down Expand Up @@ -449,8 +439,8 @@ impl Command {
Command::Gc { ref keys, .. } |
Command::BatchGet { ref keys, .. } |
Command::Commit { ref keys, .. } |
Command::Rollback { ref keys, .. } |
Command::ResolveLock { ref keys, .. } => keys.len(),
Command::Rollback { ref keys, .. } => keys.len(),
Command::ResolveLock { ref key_locks, .. } => key_locks.len(),
Command::Prewrite { ref mutations, .. } => mutations.len(),
}
}
Expand Down Expand Up @@ -768,16 +758,14 @@ impl Storage {
pub fn async_resolve_lock(
&self,
ctx: Context,
start_ts: u64,
commit_ts: Option<u64>,
txn2status: HashMap<u64, u64>,
callback: Callback<()>,
) -> Result<()> {
let cmd = Command::ResolveLock {
ctx: ctx,
start_ts: start_ts,
commit_ts: commit_ts,
txn2status: txn2status,
scan_key: None,
keys: vec![],
key_locks: vec![],
};
let tag = cmd.tag();
self.send(cmd, StorageCb::Boolean(callback))?;
Expand Down
116 changes: 64 additions & 52 deletions src/storage/txn/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
//! is ensured by the transaction protocol implemented in the client library, which is transparent
//! to the scheduler.

use std::collections::HashMap;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use util::collections::HashMap instead?

use std::fmt::{self, Debug, Formatter};
use std::sync::mpsc::Receiver;
use std::time::Duration;
Expand All @@ -52,7 +53,6 @@ use raftstore::store::engine::IterOption;
use util::transport::{Error as TransportError, SyncSendCh};
use util::threadpool::{Context as ThreadContext, ThreadPool, ThreadPoolBuilder};
use util::time::SlowTimer;
use util::collections::HashMap;

use super::Result;
use super::Error;
Expand Down Expand Up @@ -585,12 +585,9 @@ fn process_read(
Err(e) => ProcessResult::Failed { err: e.into() },
}
}
// Scan the locks with timestamp `start_ts`, then either commit them if the command has
// commit timestamp populated or rollback otherwise.
Command::ResolveLock {
ref ctx,
start_ts,
commit_ts,
ref txn2status,
ref mut scan_key,
..
} => {
Expand All @@ -605,24 +602,23 @@ fn process_read(
let res = reader
.scan_lock(
scan_key.take(),
|lock| lock.ts == start_ts,
|lock| txn2status.contains_key(&lock.ts),
Some(RESOLVE_LOCK_BATCH_SIZE),
)
.map_err(Error::from)
.and_then(|(v, next_scan_key)| {
let keys: Vec<Key> = v.into_iter().map(|x| x.0).collect();
let key_locks: Vec<_> = v.into_iter().map(|x| x).collect();
KV_COMMAND_KEYREAD_HISTOGRAM_VEC
.with_label_values(&[tag])
.observe(keys.len() as f64);
if keys.is_empty() {
.observe(key_locks.len() as f64);
if key_locks.is_empty() {
Ok(None)
} else {
Ok(Some(Command::ResolveLock {
ctx: ctx.clone(),
start_ts: start_ts,
commit_ts: commit_ts,
txn2status: txn2status.clone(),
scan_key: next_scan_key,
keys: keys,
key_locks: key_locks,
}))
}
});
Expand Down Expand Up @@ -884,52 +880,56 @@ fn process_write_impl(
}
Command::ResolveLock {
ref ctx,
start_ts,
commit_ts,
ref txn2status,
ref mut scan_key,
ref keys,
ref key_locks,
} => {
if let Some(cts) = commit_ts {
if cts <= start_ts {
return Err(Error::InvalidTxnTso {
start_ts: start_ts,
commit_ts: cts,
});
}
}
let mut scan_key = scan_key.take();
let mut txn = MvccTxn::new(
snapshot,
statistics,
start_ts,
None,
ctx.get_isolation_level(),
!ctx.get_not_fill_cache(),
);
let rows = keys.len();
for k in keys {
match commit_ts {
Some(ts) => txn.commit(k, ts)?,
None => txn.rollback(k)?,
let mut modifies: Vec<Modify> = vec![];
let rows = key_locks.len();
for &(ref current_key, ref current_lock) in key_locks {
let mut txn = MvccTxn::new(
snapshot,
statistics,
current_lock.ts,
None,
ctx.get_isolation_level(),
!ctx.get_not_fill_cache(),
);
let status = txn2status.get(&current_lock.ts);
let ts = match status {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/ts/commit_ts

Some(ts) => *ts,
None => panic!("txn status not found!"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.

};
if ts > 0 {
if current_lock.ts >= ts {
return Err(Error::InvalidTxnTso {
start_ts: current_lock.ts,
commit_ts: ts,
});
}
txn.commit(current_key, ts)?;
} else {
txn.rollback(current_key)?;
}
if txn.write_size() >= MAX_TXN_WRITE_SIZE {
scan_key = Some(k.to_owned());
modifies.append(&mut txn.modifies());
if modifies.len() >= MAX_TXN_WRITE_SIZE {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not length but bytes.

scan_key = Some(current_key.to_owned());
break;
}
}
if scan_key.is_none() {
(ProcessResult::Res, txn.modifies(), rows)
(ProcessResult::Res, modifies, rows)
} else {
let pr = ProcessResult::NextCommand {
cmd: Command::ResolveLock {
ctx: ctx.clone(),
start_ts: start_ts,
commit_ts: commit_ts,
txn2status: txn2status.clone(),
scan_key: scan_key.take(),
keys: vec![],
key_locks: vec![],
},
};
(pr, txn.modifies(), rows)
(pr, modifies, rows)
}
}
Command::Gc {
Expand Down Expand Up @@ -1375,6 +1375,7 @@ impl Scheduler {
.with_label_values(&[self.get_ctx_tag(cid), "write"])
.inc();
if to_be_write.is_empty() {
info!("modifies is empty");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add a log here?

return self.on_write_finished(cid, pr, Ok(()));
}
let engine_cb = make_engine_cb(cmd.tag(), cid, pr, self.schedch.clone(), rows);
Expand Down Expand Up @@ -1521,9 +1522,13 @@ pub fn gen_command_lock(latches: &Latches, cmd: &Command) -> Lock {
let keys: Vec<&Key> = mutations.iter().map(|x| x.key()).collect();
latches.gen_lock(&keys)
}
Command::Commit { ref keys, .. } |
Command::Rollback { ref keys, .. } |
Command::ResolveLock { ref keys, .. } => latches.gen_lock(keys),
Command::ResolveLock { ref key_locks, .. } => {
let keys: Vec<&Key> = key_locks.iter().map(|x| &x.0).collect();
latches.gen_lock(&keys)
}
Command::Commit { ref keys, .. } | Command::Rollback { ref keys, .. } => {
latches.gen_lock(keys)
}
Command::Cleanup { ref key, .. } => latches.gen_lock(&[key]),
_ => Lock::new(vec![]),
}
Expand All @@ -1532,12 +1537,16 @@ pub fn gen_command_lock(latches: &Latches, cmd: &Command) -> Lock {
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use kvproto::kvrpcpb::Context;
use storage::txn::latch::*;
use storage::{make_key, Command, Mutation, Options};
use storage::mvcc;

#[test]
fn test_command_latches() {
let mut temp_map = HashMap::new();
temp_map.insert(10, 20);
let readonly_cmds = vec![
Command::Get {
ctx: Context::new(),
Expand All @@ -1562,10 +1571,9 @@ mod tests {
},
Command::ResolveLock {
ctx: Context::new(),
start_ts: 10,
commit_ts: Some(20),
txn2status: temp_map.clone(),
scan_key: None,
keys: vec![],
key_locks: vec![],
},
Command::Gc {
ctx: Context::new(),
Expand Down Expand Up @@ -1609,10 +1617,14 @@ mod tests {
},
Command::ResolveLock {
ctx: Context::new(),
start_ts: 10,
commit_ts: Some(20),
txn2status: temp_map.clone(),
scan_key: None,
keys: vec![make_key(b"k")],
key_locks: vec![
(
make_key(b"k"),
mvcc::Lock::new(mvcc::LockType::Put, b"k".to_vec(), 10, 20, None),
),
],
},
];

Expand Down
9 changes: 7 additions & 2 deletions tests/raftstore/test_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ fn test_mvcc_rollback_and_cleanup() {

#[test]
fn test_mvcc_resolve_lock_gc_and_delete() {
use kvproto::kvrpcpb::*;
use protobuf;
let (_cluster, client, ctx) = must_new_cluster_and_kv_client();
let (k, v) = (b"key".to_vec(), b"value".to_vec());

Expand Down Expand Up @@ -388,9 +390,12 @@ fn test_mvcc_resolve_lock_gc_and_delete() {
ts += 1;
let resolve_lock_commit_version = ts;
let mut resolve_lock_req = ResolveLockRequest::new();
let mut temp_txninfo = TxnInfo::new();
temp_txninfo.txn = prewrite_start_version2;
temp_txninfo.status = resolve_lock_commit_version;
let vec_txninfo = vec![temp_txninfo];
resolve_lock_req.set_context(ctx.clone());
resolve_lock_req.start_version = prewrite_start_version2;
resolve_lock_req.commit_version = resolve_lock_commit_version;
resolve_lock_req.set_txn_infos(protobuf::RepeatedField::from_vec(vec_txninfo));
let resolve_lock_resp = client.kv_resolve_lock(resolve_lock_req).unwrap();
assert!(!resolve_lock_resp.has_region_error());
assert!(!resolve_lock_resp.has_error());
Expand Down
Loading