Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: Add txn_heart_beat API #5407

Merged
merged 8 commits into from Sep 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 6 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/server/metrics.rs
Expand Up @@ -19,6 +19,8 @@ make_static_metric! {
kv_cleanup,
kv_batch_get,
kv_batch_rollback,
kv_txn_heart_beat,
kv_check_txn_status,
kv_scan_lock,
kv_resolve_lock,
kv_gc,
Expand Down
71 changes: 71 additions & 0 deletions src/server/service/kv.rs
Expand Up @@ -275,6 +275,38 @@ impl<T: RaftStoreRouter + 'static, E: Engine, L: LockMgr> Tikv for Service<T, E,
ctx.spawn(future);
}

fn kv_txn_heart_beat(
&mut self,
ctx: RpcContext<'_>,
req: TxnHeartBeatRequest,
sink: UnarySink<TxnHeartBeatResponse>,
) {
let timer = GRPC_MSG_HISTOGRAM_VEC
.kv_txn_heart_beat
.start_coarse_timer();
let future = future_txn_heart_beat(&self.storage, req)
.and_then(|res| sink.success(res).map_err(Error::from))
.map(|_| timer.observe_duration())
.map_err(move |e| {
debug!("kv rpc failed";
"request" => "kv_txn_heart_beat",
"err" => ?e
);
GRPC_MSG_FAIL_COUNTER.kv_txn_heart_beat.inc();
});

ctx.spawn(future);
}

fn kv_check_txn_status(
&mut self,
_ctx: RpcContext<'_>,
_req: CheckTxnStatusRequest,
_sink: UnarySink<CheckTxnStatusResponse>,
) {
unimplemented!();
}

fn kv_scan_lock(
&mut self,
ctx: RpcContext<'_>,
Expand Down Expand Up @@ -1136,6 +1168,16 @@ fn handle_batch_commands_request<E: Engine, L: LockMgr>(
.map_err(|_| GRPC_MSG_FAIL_COUNTER.kv_batch_rollback.inc());
response_batch_commands_request(id, resp, tx, timer);
}
Some(batch_commands_request::request::Cmd::TxnHeartBeat(req)) => {
let timer = GRPC_MSG_HISTOGRAM_VEC
.kv_check_txn_status
.start_coarse_timer();
let resp = future_txn_heart_beat(&storage, req)
.map(oneof!(batch_commands_response::response::Cmd::TxnHeartBeat))
.map_err(|_| GRPC_MSG_FAIL_COUNTER.kv_txn_heart_beat.inc());
response_batch_commands_request(id, resp, tx, timer);
}
Some(batch_commands_request::request::Cmd::CheckTxnStatus(_)) => unimplemented!(),
Some(batch_commands_request::request::Cmd::ScanLock(req)) => {
let timer = GRPC_MSG_HISTOGRAM_VEC.kv_scan_lock.start_coarse_timer();
let resp = future_scan_lock(&storage, req)
Expand Down Expand Up @@ -1541,6 +1583,35 @@ fn future_batch_rollback<E: Engine, L: LockMgr>(
})
}

fn future_txn_heart_beat<E: Engine, L: LockMgr>(
storage: &Storage<E, L>,
mut req: TxnHeartBeatRequest,
) -> impl Future<Item = TxnHeartBeatResponse, Error = Error> {
let primary_key = Key::from_raw(req.get_primary_lock());

let (cb, f) = paired_future_callback();
let res = storage.async_txn_heart_beat(
req.take_context(),
primary_key,
req.get_start_version(),
req.get_advise_lock_ttl(),
cb,
);

AndThenWith::new(res, f.map_err(Error::from)).map(|v| {
let mut resp = TxnHeartBeatResponse::default();
if let Some(err) = extract_region_error(&v) {
resp.set_region_error(err);
} else {
match v {
Ok((ttl, _)) => resp.set_lock_ttl(ttl),
Err(e) => resp.set_error(extract_key_error(&e)),
}
}
resp
})
}

fn future_scan_lock<E: Engine, L: LockMgr>(
storage: &Storage<E, L>,
mut req: ScanLockRequest,
Expand Down
1 change: 1 addition & 0 deletions src/storage/metrics.rs
Expand Up @@ -11,6 +11,7 @@ make_static_metric! {
cleanup,
rollback,
pessimistic_rollback,
txn_heart_beat,
scan_lock,
resolve_lock,
resolve_lock_lite,
Expand Down
128 changes: 127 additions & 1 deletion src/storage/mod.rs
Expand Up @@ -114,6 +114,7 @@ pub enum StorageCb {
MvccInfoByKey(Callback<MvccInfo>),
MvccInfoByStartTs(Callback<Option<(Key, MvccInfo)>>),
Locks(Callback<Vec<LockInfo>>),
TxnStatus(Callback<(u64, u64)>),
}

/// Store Transaction scheduler commands.
Expand Down Expand Up @@ -195,6 +196,12 @@ pub enum Command {
start_ts: u64,
for_update_ts: u64,
},
TxnHeartBeat {
ctx: Context,
primary_key: Key,
start_ts: u64,
advise_ttl: u64,
},
/// Scan locks from `start_key`, and find all locks whose timestamp is before `max_ts`.
ScanLock {
ctx: Context,
Expand Down Expand Up @@ -344,6 +351,16 @@ impl Display for Command {
for_update_ts,
ctx
),
Command::TxnHeartBeat {
ref ctx,
ref primary_key,
start_ts,
advise_ttl,
} => write!(
f,
"kv::command::txn_heart_beat {} @ {} ttl {} | {:?}",
primary_key, start_ts, advise_ttl, ctx
),
Command::ScanLock {
ref ctx,
max_ts,
Expand Down Expand Up @@ -449,6 +466,7 @@ impl Command {
Command::Cleanup { .. } => CommandKind::cleanup,
Command::Rollback { .. } => CommandKind::rollback,
Command::PessimisticRollback { .. } => CommandKind::pessimistic_rollback,
Command::TxnHeartBeat { .. } => CommandKind::txn_heart_beat,
Command::ScanLock { .. } => CommandKind::scan_lock,
Command::ResolveLock { .. } => CommandKind::resolve_lock,
Command::ResolveLockLite { .. } => CommandKind::resolve_lock_lite,
Expand All @@ -466,7 +484,8 @@ impl Command {
| Command::Cleanup { start_ts, .. }
| Command::Rollback { start_ts, .. }
| Command::PessimisticRollback { start_ts, .. }
| Command::MvccByStartTs { start_ts, .. } => start_ts,
| Command::MvccByStartTs { start_ts, .. }
| Command::TxnHeartBeat { start_ts, .. } => start_ts,
Command::Commit { lock_ts, .. } => lock_ts,
Command::ScanLock { max_ts, .. } => max_ts,
Command::ResolveLockLite { start_ts, .. } => start_ts,
Expand All @@ -485,6 +504,7 @@ impl Command {
| Command::Cleanup { ref ctx, .. }
| Command::Rollback { ref ctx, .. }
| Command::PessimisticRollback { ref ctx, .. }
| Command::TxnHeartBeat { ref ctx, .. }
| Command::ScanLock { ref ctx, .. }
| Command::ResolveLock { ref ctx, .. }
| Command::ResolveLockLite { ref ctx, .. }
Expand All @@ -503,6 +523,7 @@ impl Command {
| Command::Cleanup { ref mut ctx, .. }
| Command::Rollback { ref mut ctx, .. }
| Command::PessimisticRollback { ref mut ctx, .. }
| Command::TxnHeartBeat { ref mut ctx, .. }
| Command::ScanLock { ref mut ctx, .. }
| Command::ResolveLock { ref mut ctx, .. }
| Command::ResolveLockLite { ref mut ctx, .. }
Expand Down Expand Up @@ -558,6 +579,11 @@ impl Command {
Command::Cleanup { ref key, .. } => {
bytes += key.as_encoded().len();
}
Command::TxnHeartBeat {
ref primary_key, ..
} => {
bytes += primary_key.as_encoded().len();
}
_ => {}
}
bytes
Expand Down Expand Up @@ -1246,6 +1272,28 @@ impl<E: Engine, L: LockMgr> Storage<E, L> {
Ok(())
}

/// Check the specified primary key and enlarge it's TTL if necessary. Returns the new TTL.
///
/// Schedules a [`Command::TxnHeartBeat`]
pub fn async_txn_heart_beat(
&self,
ctx: Context,
primary_key: Key,
start_ts: u64,
advise_ttl: u64,
callback: Callback<(u64, u64)>,
) -> Result<()> {
let cmd = Command::TxnHeartBeat {
ctx,
primary_key,
start_ts,
advise_ttl,
};
self.schedule(cmd, StorageCb::TxnStatus(callback))?;
KV_COMMAND_COUNTER_VEC_STATIC.txn_heart_beat.inc();
Ok(())
}

/// Scan locks from `start_key`, and find all locks whose timestamp is before `max_ts`.
///
/// Schedules a [`Command::ScanLock`].
Expand Down Expand Up @@ -4202,4 +4250,82 @@ mod tests {
.unwrap();
rx.recv().unwrap();
}

#[test]
fn test_txn_heart_beat() {
let storage = TestStorageBuilder::new().build().unwrap();
let (tx, rx) = channel();

let k = Key::from_raw(b"k");
let v = b"v".to_vec();

// No lock.
storage
.async_txn_heart_beat(
Context::default(),
k.clone(),
10,
100,
expect_fail_callback(tx.clone(), 0, |e| match e {
Error::Txn(txn::Error::Mvcc(mvcc::Error::TxnLockNotFound { .. })) => (),
e => panic!("unexpected error chain: {:?}", e),
}),
)
.unwrap();
rx.recv().unwrap();

let mut options = Options::default();
options.lock_ttl = 100;
storage
.async_prewrite(
Context::default(),
vec![Mutation::Put((k.clone(), v))],
k.as_encoded().to_vec(),
10,
options,
expect_ok_callback(tx.clone(), 0),
)
.unwrap();
rx.recv().unwrap();

// `advise_ttl` = 90, which is less than current ttl 100. The lock's ttl will remains 100.
storage
.async_txn_heart_beat(
Context::default(),
k.clone(),
10,
90,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about define
advice_ttl=90; expect_ttl=100
to make the test case more easily to understand?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may be still difficult to read since you need to find the variable's definition or assignment to know what value it is. I think I'll add comments near these arguments.

expect_value_callback(tx.clone(), 0, (100, 0)),
)
.unwrap();
rx.recv().unwrap();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also get the lock directly and check its TTL?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function to get lock which is scan_lock, doesn't get the TTL. So I didn't check it directly.

// `advise_ttl` = 110, which is greater than current ttl. The lock's ttl will be updated to
// 110.
storage
.async_txn_heart_beat(
Context::default(),
k.clone(),
10,
110,
expect_value_callback(tx.clone(), 0, (110, 0)),
)
.unwrap();
rx.recv().unwrap();

// Lock not match. Nothing happens except throwing an error.
storage
.async_txn_heart_beat(
Context::default(),
k.clone(),
11,
150,
expect_fail_callback(tx.clone(), 0, |e| match e {
Error::Txn(txn::Error::Mvcc(mvcc::Error::TxnLockNotFound { .. })) => (),
e => panic!("unexpected error chain: {:?}", e),
}),
)
.unwrap();
rx.recv().unwrap();
}
}
30 changes: 30 additions & 0 deletions src/storage/mvcc/mod.rs
Expand Up @@ -537,6 +537,36 @@ pub mod tests {
assert!(txn.rollback(Key::from_raw(key)).is_err());
}

pub fn must_txn_heart_beat<E: Engine>(
engine: &E,
primary_key: &[u8],
start_ts: u64,
advise_ttl: u64,
expect_ttl: u64,
) {
let ctx = Context::default();
let snapshot = engine.snapshot(&ctx).unwrap();
let mut txn = MvccTxn::new(snapshot, start_ts, true).unwrap();
let ttl = txn
.txn_heart_beat(Key::from_raw(primary_key), advise_ttl)
.unwrap();
write(engine, &ctx, txn.into_modifies());
assert_eq!(ttl, expect_ttl);
}

pub fn must_txn_heart_beat_err<E: Engine>(
engine: &E,
primary_key: &[u8],
start_ts: u64,
advise_ttl: u64,
) {
let ctx = Context::default();
let snapshot = engine.snapshot(&ctx).unwrap();
let mut txn = MvccTxn::new(snapshot, start_ts, true).unwrap();
txn.txn_heart_beat(Key::from_raw(primary_key), advise_ttl)
.unwrap_err();
}

pub fn must_gc<E: Engine>(engine: &E, key: &[u8], safe_point: u64) {
let ctx = Context::default();
let snapshot = engine.snapshot(&ctx).unwrap();
Expand Down