Skip to content

Commit

Permalink
pessimistic-txn: implement pessimistic rollback (tikv#4848)
Browse files Browse the repository at this point in the history
Signed-off-by: youjiali1995 <zlwgx1023@gmail.com>
  • Loading branch information
youjiali1995 committed Jun 13, 2019
1 parent c2213df commit d2234b5
Show file tree
Hide file tree
Showing 13 changed files with 310 additions and 41 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ kvproto = { git = "https://github.com/pingcap/kvproto.git", branch = "release-3.
log_wrappers = { path = "components/log_wrappers" }
engine = { path = "components/engine" }
tikv_util = { path = "components/tikv_util" }
farmhash = "1.1.5"

[dependencies.murmur3]
git = "https://github.com/pingcap/murmur3.git"
Expand Down
1 change: 1 addition & 0 deletions src/server/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ make_static_metric! {
kv_scan,
kv_prewrite,
kv_pessimistic_lock,
kv_pessimistic_rollback,
kv_commit,
kv_cleanup,
kv_batch_get,
Expand Down
71 changes: 65 additions & 6 deletions src/server/service/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,24 @@ impl<T: RaftStoreRouter + 'static, E: Engine> tikvpb_grpc::Tikv for Service<T, E
fn kv_pessimistic_rollback(
&mut self,
ctx: RpcContext<'_>,
_req: PessimisticRollbackRequest,
req: PessimisticRollbackRequest,
sink: UnarySink<PessimisticRollbackResponse>,
) {
let f = sink
.fail(RpcStatus::new(RpcStatusCode::Unimplemented, None))
.map_err(|e| error!("kv_pessimistic_rollback error"; "err" => %e));
ctx.spawn(f);
let timer = GRPC_MSG_HISTOGRAM_VEC
.kv_pessimistic_rollback
.start_coarse_timer();
let future = future_pessimistic_rollback(&self.storage, req)
.and_then(|res| sink.success(res).map_err(Error::from))
.map(|_| timer.observe_duration())
.map_err(move |e| {
debug!("kv rpc failed";
"request" => "kv_pessimistic_rollback",
"err" => ?e
);
GRPC_MSG_FAIL_COUNTER.kv_pessimistic_rollback.inc();
});

ctx.spawn(future);
}

fn kv_commit(
Expand Down Expand Up @@ -1241,7 +1252,17 @@ fn handle_batch_commands_request<E: Engine>(
.map_err(|_| GRPC_MSG_FAIL_COUNTER.kv_pessimistic_lock.inc());
response_batch_commands_request(id, resp, tx, timer);
}
Some(BatchCommandsRequest_Request_oneof_cmd::PessimisticRollback(_)) => unimplemented!(),
Some(BatchCommandsRequest_Request_oneof_cmd::PessimisticRollback(req)) => {
let timer = GRPC_MSG_HISTOGRAM_VEC
.kv_pessimistic_rollback
.start_coarse_timer();
let resp = future_pessimistic_rollback(&storage, req)
.map(oneof!(
BatchCommandsResponse_Response_oneof_cmd::PessimisticRollback
))
.map_err(|_| GRPC_MSG_FAIL_COUNTER.kv_pessimistic_rollback.inc());
response_batch_commands_request(id, resp, tx, timer);
}
}
}

Expand Down Expand Up @@ -1388,6 +1409,31 @@ fn future_acquire_pessimistic_lock<E: Engine>(
})
}

fn future_pessimistic_rollback<E: Engine>(
storage: &Storage<E>,
mut req: PessimisticRollbackRequest,
) -> impl Future<Item = PessimisticRollbackResponse, Error = Error> {
let keys = req.get_keys().iter().map(|x| Key::from_raw(x)).collect();
let (cb, f) = paired_future_callback();
let res = storage.async_pessimistic_rollback(
req.take_context(),
keys,
req.get_start_version(),
req.get_for_update_ts(),
cb,
);

AndThenWith::new(res, f.map_err(Error::from)).map(|v| {
let mut resp = PessimisticRollbackResponse::new();
if let Some(err) = extract_region_error(&v) {
resp.set_region_error(err);
} else {
resp.set_errors(RepeatedField::from_vec(extract_key_errors(v)));
}
resp
})
}

fn future_commit<E: Engine>(
storage: &Storage<E>,
mut req: CommitRequest,
Expand Down Expand Up @@ -1883,6 +1929,19 @@ fn extract_key_error(err: &storage::Error) -> KeyError {
warn!("txn conflicts"; "err" => ?err);
key_error.set_retryable(format!("{:?}", err));
}
storage::Error::Txn(TxnError::Mvcc(MvccError::Deadlock {
lock_ts,
ref lock_key,
deadlock_key_hash,
..
})) => {
warn!("txn deadlocks"; "err" => ?err);
let mut deadlock = Deadlock::new();
deadlock.set_lock_ts(lock_ts);
deadlock.set_lock_key(lock_key.to_owned());
deadlock.set_deadlock_key_hash(deadlock_key_hash);
key_error.set_deadlock(deadlock);
}
_ => {
error!("txn aborts"; "err" => ?err);
key_error.set_abort(format!("{:?}", err));
Expand Down
44 changes: 37 additions & 7 deletions src/storage/lock_manager/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@

use super::Lock;
use crate::storage::mvcc::Error as MvccError;
use crate::storage::txn::Error as TxnError;
use crate::storage::txn::{Error as TxnError, ProcessResult};
use crate::storage::Error as StorageError;
use crate::storage::Key;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use farmhash;

pub const PHYSICAL_SHIFT_BITS: usize = 18;

Expand All @@ -21,20 +20,34 @@ pub fn extract_lock_from_result(res: &Result<(), StorageError>) -> Lock {
ts: *ts,
hash: gen_key_hash(&Key::from_raw(&key)),
},
_ => panic!("unsupported mvcc error"),
_ => panic!("unexpected mvcc error"),
}
}

// TiDB uses the same hash algorithm.
pub fn gen_key_hash(key: &Key) -> u64 {
let mut s = DefaultHasher::new();
key.hash(&mut s);
s.finish()
farmhash::fingerprint64(&key.to_raw().unwrap())
}

pub fn gen_key_hashes(keys: &[Key]) -> Vec<u64> {
keys.iter().map(|key| gen_key_hash(key)).collect()
}

pub fn extract_raw_key_from_process_result(pr: &ProcessResult) -> &[u8] {
match pr {
ProcessResult::MultiRes { results } => {
assert!(results.len() == 1);
match results[0] {
Err(StorageError::Txn(TxnError::Mvcc(MvccError::KeyIsLocked {
ref key, ..
}))) => key,
_ => panic!("unexpected mvcc error"),
}
}
_ => panic!("unexpected progress result"),
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -55,4 +68,21 @@ mod tests {
assert_eq!(lock.ts, ts);
assert_eq!(lock.hash, gen_key_hash(&key));
}

#[test]
fn test_extract_raw_key_from_process_result() {
let raw_key = b"foo".to_vec();
let pr = ProcessResult::MultiRes {
results: vec![Err(StorageError::from(TxnError::from(
MvccError::KeyIsLocked {
key: raw_key.clone(),
primary: vec![],
ts: 0,
ttl: 0,
txn_size: 0,
},
)))],
};
assert_eq!(raw_key, extract_raw_key_from_process_result(&pr));
}
}
8 changes: 5 additions & 3 deletions src/storage/lock_manager/waiter_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use super::deadlock::Scheduler as DetectorScheduler;
use super::metrics::*;
use super::util::extract_raw_key_from_process_result;
use super::Lock;
use crate::storage::mvcc::Error as MvccError;
use crate::storage::txn::Error as TxnError;
Expand Down Expand Up @@ -288,6 +289,7 @@ impl WaiterManager {
.borrow_mut()
.get_ready_waiters(lock_ts, hashes);
ready_waiters.sort_unstable_by_key(|waiter| waiter.start_ts);

for (i, waiter) in ready_waiters.into_iter().enumerate() {
self.detector_scheduler
.clean_up_wait_for(waiter.start_ts, waiter.lock.clone());
Expand Down Expand Up @@ -318,12 +320,12 @@ impl WaiterManager {
.remove_waiter(start_ts, lock)
.and_then(|waiter| {
let pr = ProcessResult::Failed {
err: StorageError::from(MvccError::Deadlock {
err: StorageError::from(TxnError::from(MvccError::Deadlock {
start_ts,
lock_ts: waiter.lock.ts,
key_hash: waiter.lock.hash,
lock_key: extract_raw_key_from_process_result(&waiter.pr).to_vec(),
deadlock_key_hash,
}),
})),
};
execute_callback(waiter.cb, pr);
Some(())
Expand Down
3 changes: 2 additions & 1 deletion src/storage/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use prometheus_static_metric::*;
make_static_metric! {
pub label_enum CommandKind {
prewrite,
pessimistic_lock,
acquire_pessimistic_lock,
commit,
cleanup,
rollback,
pessimistic_rollback,
scan_lock,
resolve_lock,
resolve_lock_lite,
Expand Down
53 changes: 51 additions & 2 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ pub enum Command {
keys: Vec<Key>,
start_ts: u64,
},
PessimisticRollback {
ctx: Context,
keys: Vec<Key>,
start_ts: u64,
for_update_ts: u64,
},
ScanLock {
ctx: Context,
max_ts: u64,
Expand Down Expand Up @@ -217,6 +223,19 @@ impl Display for Command {
start_ts,
ctx
),
Command::PessimisticRollback {
ref ctx,
ref keys,
start_ts,
for_update_ts,
} => write!(
f,
"kv::command::pessimistic_rollback keys({}) @ {} {} | {:?}",
keys.len(),
start_ts,
for_update_ts,
ctx
),
Command::ScanLock {
ref ctx,
max_ts,
Expand Down Expand Up @@ -313,10 +332,11 @@ impl Command {
pub fn tag(&self) -> &'static str {
match *self {
Command::Prewrite { .. } => "prewrite",
Command::AcquirePessimisticLock { .. } => "pessimistic_lock",
Command::AcquirePessimisticLock { .. } => "acquire_pessimistic_lock",
Command::Commit { .. } => "commit",
Command::Cleanup { .. } => "cleanup",
Command::Rollback { .. } => "rollback",
Command::PessimisticRollback { .. } => "pessimistic_rollback",
Command::ScanLock { .. } => "scan_lock",
Command::ResolveLock { .. } => "resolve_lock",
Command::ResolveLockLite { .. } => "resolve_lock_lite",
Expand All @@ -333,6 +353,7 @@ impl Command {
| Command::AcquirePessimisticLock { start_ts, .. }
| Command::Cleanup { start_ts, .. }
| Command::Rollback { start_ts, .. }
| Command::PessimisticRollback { start_ts, .. }
| Command::MvccByStartTs { start_ts, .. } => start_ts,
Command::Commit { lock_ts, .. } => lock_ts,
Command::ScanLock { max_ts, .. } => max_ts,
Expand All @@ -351,6 +372,7 @@ impl Command {
| Command::Commit { ref ctx, .. }
| Command::Cleanup { ref ctx, .. }
| Command::Rollback { ref ctx, .. }
| Command::PessimisticRollback { ref ctx, .. }
| Command::ScanLock { ref ctx, .. }
| Command::ResolveLock { ref ctx, .. }
| Command::ResolveLockLite { ref ctx, .. }
Expand All @@ -368,6 +390,7 @@ impl Command {
| Command::Commit { ref mut ctx, .. }
| Command::Cleanup { ref mut ctx, .. }
| Command::Rollback { ref mut ctx, .. }
| Command::PessimisticRollback { ref mut ctx, .. }
| Command::ScanLock { ref mut ctx, .. }
| Command::ResolveLock { ref mut ctx, .. }
| Command::ResolveLockLite { ref mut ctx, .. }
Expand Down Expand Up @@ -402,6 +425,7 @@ impl Command {
}
Command::Commit { ref keys, .. }
| Command::Rollback { ref keys, .. }
| Command::PessimisticRollback { ref keys, .. }
| Command::Pause { ref keys, .. } => {
for key in keys {
bytes += key.as_encoded().len();
Expand Down Expand Up @@ -962,7 +986,7 @@ impl<E: Engine> Storage<E> {
options,
};
self.schedule(cmd, StorageCb::Booleans(callback))?;
KV_COMMAND_COUNTER_VEC_STATIC.pessimistic_lock.inc();
KV_COMMAND_COUNTER_VEC_STATIC.acquire_pessimistic_lock.inc();
Ok(())
}

Expand Down Expand Up @@ -1041,6 +1065,31 @@ impl<E: Engine> Storage<E> {
Ok(())
}

/// Roll back pessimistic locks identified by `start_ts` and `for_update_ts`
pub fn async_pessimistic_rollback(
&self,
ctx: Context,
keys: Vec<Key>,
start_ts: u64,
for_update_ts: u64,
callback: Callback<Vec<Result<()>>>,
) -> Result<()> {
if !self.pessimistic_txn_enabled {
callback(Err(Error::PessimisticTxnNotEnabled));
return Ok(());
}

let cmd = Command::PessimisticRollback {
ctx,
keys,
start_ts,
for_update_ts,
};
self.schedule(cmd, StorageCb::Booleans(callback))?;
KV_COMMAND_COUNTER_VEC_STATIC.pessimistic_rollback.inc();
Ok(())
}

/// Scan locks from `start_key`, and find all locks whose timestamp is before `max_ts`.
pub fn async_scan_locks(
&self,
Expand Down
Loading

0 comments on commit d2234b5

Please sign in to comment.