Skip to content
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ pub use crate::timestamp::TimestampExt;
#[doc(inline)]
pub use crate::transaction::lowering as transaction_lowering;
#[doc(inline)]
pub use crate::transaction::CommitTTLParameters;
#[doc(inline)]
pub use crate::transaction::CheckLevel;
#[doc(inline)]
pub use crate::transaction::Client as TransactionClient;
Expand Down
7 changes: 6 additions & 1 deletion src/request/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use async_trait::async_trait;
use futures::future::try_join_all;
use futures::prelude::*;
use log::debug;
use log::error;
use log::info;
use tokio::sync::Semaphore;
use tokio::time::sleep;
Expand Down Expand Up @@ -579,6 +580,7 @@ where
}

if self.backoff.is_none() {
error!("no backoff, resolve lock error: {locks:?}");
return Err(Error::ResolveLockError(locks));
}

Expand All @@ -588,7 +590,10 @@ where
result = self.inner.execute().await?;
} else {
match clone.backoff.next_delay_duration() {
None => return Err(Error::ResolveLockError(live_locks)),
None => {
error!("backoff exhausted, resolve lock error: {live_locks:?}");
return Err(Error::ResolveLockError(live_locks))
},
Some(delay_duration) => {
sleep(delay_duration).await;
result = clone.inner.execute().await?;
Expand Down
2 changes: 1 addition & 1 deletion src/store/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl KvConnect for TikvConnect {
self.security_mgr
.connect(address, TikvClient::new)
.await
.map(|c| KvRpcClient::new(c, self.timeout))
.map(|c| KvRpcClient::new(c.max_decoding_message_size(usize::MAX), self.timeout))
}
}

Expand Down
21 changes: 17 additions & 4 deletions src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,28 @@ pub fn store_stream_for_range<PdC: PdClient>(

/// The range used for request should be the intersection of `region_range` and `range`.
fn range_intersection(region_range: (Key, Key), range: (Key, Key)) -> (Key, Key) {
let (req_lower, req_upper, reversed) = if range.0 > range.1 {
(range.1, range.0, true)
} else {
(range.0, range.1, false)
};

let (lower, upper) = region_range;
let up = if upper.is_empty() {
range.1
} else if range.1.is_empty() {
req_upper
} else if req_upper.is_empty() {
upper
} else {
min(upper, range.1)
min(upper, req_upper)
};
(max(lower, range.0), up)

let (res_lower, res_upper) = (max(lower, req_lower), up);

if reversed {
(res_upper, res_lower)
} else {
(res_lower, res_upper)
}
}

pub fn store_stream_for_ranges<PdC: PdClient>(
Expand Down
14 changes: 13 additions & 1 deletion src/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,24 @@ pub trait TimestampExt: Sized {

impl TimestampExt for Timestamp {
fn version(&self) -> u64 {
if self.physical == i64::MAX && self.logical == i64::MAX {
return u64::MAX;
}

((self.physical << PHYSICAL_SHIFT_BITS) + self.logical)
.try_into()
.expect("Overflow converting timestamp to version")
}

fn from_version(version: u64) -> Self {
if version == u64::MAX {
return Self {
physical: i64::MAX,
logical: i64::MAX,
suffix_bits: 0,
};
}

let version = version as i64;
Self {
physical: version >> PHYSICAL_SHIFT_BITS,
Expand All @@ -42,7 +54,7 @@ impl TimestampExt for Timestamp {
}

fn try_from_version(version: u64) -> Option<Self> {
if version == 0 {
if version == 0 || (version >= i64::MAX as u64 && version != u64::MAX) {
None
} else {
Some(Self::from_version(version))
Expand Down
1 change: 1 addition & 0 deletions src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub(crate) use lock::resolve_locks;
pub(crate) use lock::HasLocks;
pub use snapshot::Snapshot;
pub use transaction::CheckLevel;
pub use transaction::CommitTTLParameters;
#[doc(hidden)]
pub use transaction::HeartbeatOption;
pub use transaction::Mutation;
Expand Down
67 changes: 63 additions & 4 deletions src/transaction/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1069,6 +1069,47 @@ pub enum TransactionKind {
Pessimistic(Timestamp),
}

#[derive(Clone, PartialEq, Debug)]
pub struct CommitTTLParameters {
max_ttl: u64,
min_ttl: u64,
txn_commit_batch_size: u64,
ttl_factor: f64,
}

impl Default for CommitTTLParameters {
fn default() -> Self {
Self {
max_ttl: MAX_TTL,
min_ttl: DEFAULT_LOCK_TTL,
txn_commit_batch_size: TXN_COMMIT_BATCH_SIZE,
ttl_factor: TTL_FACTOR,
}
}
}

impl CommitTTLParameters {
pub fn max_ttl(mut self, millis: u64) -> Self {
self.max_ttl = millis;
self
}

pub fn min_ttl(mut self, millis: u64) -> Self {
self.min_ttl = millis;
self
}

pub fn txn_commit_batch_size(mut self, size: u64) -> Self {
self.txn_commit_batch_size = size;
self
}

pub fn ttl_factor(mut self, factor: f64) -> Self {
self.ttl_factor = factor;
self
}
}

/// Options for configuring a transaction.
///
/// `TransactionOptions` has a builder-style API.
Expand All @@ -1086,6 +1127,8 @@ pub struct TransactionOptions {
retry_options: RetryOptions,
/// What to do if the transaction is dropped without an attempt to commit or rollback
check_level: CheckLevel,
/// Variables related to commit TTL calculation
ttl_parameters: CommitTTLParameters,
#[doc(hidden)]
heartbeat_option: HeartbeatOption,
}
Expand All @@ -1112,6 +1155,7 @@ impl TransactionOptions {
read_only: false,
retry_options: RetryOptions::default_optimistic(),
check_level: CheckLevel::Panic,
ttl_parameters: Default::default(),
heartbeat_option: HeartbeatOption::FixedTime(DEFAULT_HEARTBEAT_INTERVAL),
}
}
Expand All @@ -1125,6 +1169,7 @@ impl TransactionOptions {
read_only: false,
retry_options: RetryOptions::default_pessimistic(),
check_level: CheckLevel::Panic,
ttl_parameters: Default::default(),
heartbeat_option: HeartbeatOption::FixedTime(DEFAULT_HEARTBEAT_INTERVAL),
}
}
Expand Down Expand Up @@ -1178,6 +1223,13 @@ impl TransactionOptions {
self
}

/// Set Commit TTL parameters.
#[must_use]
pub fn ttl_parameters(mut self, options: CommitTTLParameters) -> TransactionOptions {
self.ttl_parameters = options;
self
}

fn push_for_update_ts(&mut self, for_update_ts: Timestamp) {
match &mut self.kind {
TransactionKind::Optimistic => unreachable!(),
Expand Down Expand Up @@ -1487,11 +1539,18 @@ impl<PdC: PdClient> Committer<PdC> {
}

fn calc_txn_lock_ttl(&mut self) -> u64 {
let mut lock_ttl = DEFAULT_LOCK_TTL;
if self.write_size > TXN_COMMIT_BATCH_SIZE {
let CommitTTLParameters {
max_ttl,
min_ttl,
txn_commit_batch_size,
ttl_factor,
} = self.options.ttl_parameters;

let mut lock_ttl = min_ttl;
if self.write_size > txn_commit_batch_size {
let size_mb = self.write_size as f64 / 1024.0 / 1024.0;
lock_ttl = (TTL_FACTOR * size_mb.sqrt()) as u64;
lock_ttl = lock_ttl.clamp(DEFAULT_LOCK_TTL, MAX_TTL);
lock_ttl = (ttl_factor * size_mb.sqrt()) as u64;
lock_ttl = lock_ttl.clamp(min_ttl, max_ttl);
}
lock_ttl
}
Expand Down