diff --git a/src/lib.rs b/src/lib.rs index 60dc2956..7bb3cff1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -153,6 +153,8 @@ pub use crate::timestamp::TimestampExt; #[doc(inline)] pub use crate::transaction::lowering as transaction_lowering; #[doc(inline)] +pub use crate::transaction::CommitTTLParameters; +#[doc(inline)] pub use crate::transaction::CheckLevel; #[doc(inline)] pub use crate::transaction::Client as TransactionClient; diff --git a/src/request/plan.rs b/src/request/plan.rs index 369a2ff1..1ec4fe5b 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -8,6 +8,7 @@ use async_trait::async_trait; use futures::future::try_join_all; use futures::prelude::*; use log::debug; +use log::error; use log::info; use tokio::sync::Semaphore; use tokio::time::sleep; @@ -579,6 +580,7 @@ where } if self.backoff.is_none() { + error!("no backoff, resolve lock error: {locks:?}"); return Err(Error::ResolveLockError(locks)); } @@ -588,7 +590,10 @@ where result = self.inner.execute().await?; } else { match clone.backoff.next_delay_duration() { - None => return Err(Error::ResolveLockError(live_locks)), + None => { + error!("backoff exhausted, resolve lock error: {live_locks:?}"); + return Err(Error::ResolveLockError(live_locks)) + }, Some(delay_duration) => { sleep(delay_duration).await; result = clone.inner.execute().await?; diff --git a/src/store/client.rs b/src/store/client.rs index 363d4137..7423d385 100644 --- a/src/store/client.rs +++ b/src/store/client.rs @@ -35,7 +35,7 @@ impl KvConnect for TikvConnect { self.security_mgr .connect(address, TikvClient::new) .await - .map(|c| KvRpcClient::new(c, self.timeout)) + .map(|c| KvRpcClient::new(c.max_decoding_message_size(usize::MAX), self.timeout)) } } diff --git a/src/store/mod.rs b/src/store/mod.rs index f21373b4..3781770d 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -84,15 +84,28 @@ pub fn store_stream_for_range( /// The range used for request should be the intersection of `region_range` and `range`. fn range_intersection(region_range: (Key, Key), range: (Key, Key)) -> (Key, Key) { + let (req_lower, req_upper, reversed) = if range.0 > range.1 { + (range.1, range.0, true) + } else { + (range.0, range.1, false) + }; + let (lower, upper) = region_range; let up = if upper.is_empty() { - range.1 - } else if range.1.is_empty() { + req_upper + } else if req_upper.is_empty() { upper } else { - min(upper, range.1) + min(upper, req_upper) }; - (max(lower, range.0), up) + + let (res_lower, res_upper) = (max(lower, req_lower), up); + + if reversed { + (res_upper, res_lower) + } else { + (res_lower, res_upper) + } } pub fn store_stream_for_ranges( diff --git a/src/timestamp.rs b/src/timestamp.rs index 5c610e77..39c18b97 100644 --- a/src/timestamp.rs +++ b/src/timestamp.rs @@ -26,12 +26,24 @@ pub trait TimestampExt: Sized { impl TimestampExt for Timestamp { fn version(&self) -> u64 { + if self.physical == i64::MAX && self.logical == i64::MAX { + return u64::MAX; + } + ((self.physical << PHYSICAL_SHIFT_BITS) + self.logical) .try_into() .expect("Overflow converting timestamp to version") } fn from_version(version: u64) -> Self { + if version == u64::MAX { + return Self { + physical: i64::MAX, + logical: i64::MAX, + suffix_bits: 0, + }; + } + let version = version as i64; Self { physical: version >> PHYSICAL_SHIFT_BITS, @@ -42,7 +54,7 @@ impl TimestampExt for Timestamp { } fn try_from_version(version: u64) -> Option { - if version == 0 { + if version == 0 || (version >= i64::MAX as u64 && version != u64::MAX) { None } else { Some(Self::from_version(version)) diff --git a/src/transaction/mod.rs b/src/transaction/mod.rs index 5bc8f0e4..047ec61d 100644 --- a/src/transaction/mod.rs +++ b/src/transaction/mod.rs @@ -13,6 +13,7 @@ pub(crate) use lock::resolve_locks; pub(crate) use lock::HasLocks; pub use snapshot::Snapshot; pub use transaction::CheckLevel; +pub use transaction::CommitTTLParameters; #[doc(hidden)] pub use transaction::HeartbeatOption; pub use transaction::Mutation; diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index cae46179..7dbf9a30 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -1069,6 +1069,47 @@ pub enum TransactionKind { Pessimistic(Timestamp), } +#[derive(Clone, PartialEq, Debug)] +pub struct CommitTTLParameters { + max_ttl: u64, + min_ttl: u64, + txn_commit_batch_size: u64, + ttl_factor: f64, +} + +impl Default for CommitTTLParameters { + fn default() -> Self { + Self { + max_ttl: MAX_TTL, + min_ttl: DEFAULT_LOCK_TTL, + txn_commit_batch_size: TXN_COMMIT_BATCH_SIZE, + ttl_factor: TTL_FACTOR, + } + } +} + +impl CommitTTLParameters { + pub fn max_ttl(mut self, millis: u64) -> Self { + self.max_ttl = millis; + self + } + + pub fn min_ttl(mut self, millis: u64) -> Self { + self.min_ttl = millis; + self + } + + pub fn txn_commit_batch_size(mut self, size: u64) -> Self { + self.txn_commit_batch_size = size; + self + } + + pub fn ttl_factor(mut self, factor: f64) -> Self { + self.ttl_factor = factor; + self + } +} + /// Options for configuring a transaction. /// /// `TransactionOptions` has a builder-style API. @@ -1086,6 +1127,8 @@ pub struct TransactionOptions { retry_options: RetryOptions, /// What to do if the transaction is dropped without an attempt to commit or rollback check_level: CheckLevel, + /// Variables related to commit TTL calculation + ttl_parameters: CommitTTLParameters, #[doc(hidden)] heartbeat_option: HeartbeatOption, } @@ -1112,6 +1155,7 @@ impl TransactionOptions { read_only: false, retry_options: RetryOptions::default_optimistic(), check_level: CheckLevel::Panic, + ttl_parameters: Default::default(), heartbeat_option: HeartbeatOption::FixedTime(DEFAULT_HEARTBEAT_INTERVAL), } } @@ -1125,6 +1169,7 @@ impl TransactionOptions { read_only: false, retry_options: RetryOptions::default_pessimistic(), check_level: CheckLevel::Panic, + ttl_parameters: Default::default(), heartbeat_option: HeartbeatOption::FixedTime(DEFAULT_HEARTBEAT_INTERVAL), } } @@ -1178,6 +1223,13 @@ impl TransactionOptions { self } + /// Set Commit TTL parameters. + #[must_use] + pub fn ttl_parameters(mut self, options: CommitTTLParameters) -> TransactionOptions { + self.ttl_parameters = options; + self + } + fn push_for_update_ts(&mut self, for_update_ts: Timestamp) { match &mut self.kind { TransactionKind::Optimistic => unreachable!(), @@ -1487,11 +1539,18 @@ impl Committer { } fn calc_txn_lock_ttl(&mut self) -> u64 { - let mut lock_ttl = DEFAULT_LOCK_TTL; - if self.write_size > TXN_COMMIT_BATCH_SIZE { + let CommitTTLParameters { + max_ttl, + min_ttl, + txn_commit_batch_size, + ttl_factor, + } = self.options.ttl_parameters; + + let mut lock_ttl = min_ttl; + if self.write_size > txn_commit_batch_size { let size_mb = self.write_size as f64 / 1024.0 / 1024.0; - lock_ttl = (TTL_FACTOR * size_mb.sqrt()) as u64; - lock_ttl = lock_ttl.clamp(DEFAULT_LOCK_TTL, MAX_TTL); + lock_ttl = (ttl_factor * size_mb.sqrt()) as u64; + lock_ttl = lock_ttl.clamp(min_ttl, max_ttl); } lock_ttl }