From eaf20d44addeba64c023b802107e7c1baffdb1ef Mon Sep 17 00:00:00 2001 From: jmhrpr Date: Mon, 21 Aug 2023 18:48:16 +0100 Subject: [PATCH 01/10] add support for max timestamp --- src/timestamp.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/timestamp.rs b/src/timestamp.rs index 5c610e77..39c18b97 100644 --- a/src/timestamp.rs +++ b/src/timestamp.rs @@ -26,12 +26,24 @@ pub trait TimestampExt: Sized { impl TimestampExt for Timestamp { fn version(&self) -> u64 { + if self.physical == i64::MAX && self.logical == i64::MAX { + return u64::MAX; + } + ((self.physical << PHYSICAL_SHIFT_BITS) + self.logical) .try_into() .expect("Overflow converting timestamp to version") } fn from_version(version: u64) -> Self { + if version == u64::MAX { + return Self { + physical: i64::MAX, + logical: i64::MAX, + suffix_bits: 0, + }; + } + let version = version as i64; Self { physical: version >> PHYSICAL_SHIFT_BITS, @@ -42,7 +54,7 @@ impl TimestampExt for Timestamp { } fn try_from_version(version: u64) -> Option { - if version == 0 { + if version == 0 || (version >= i64::MAX as u64 && version != u64::MAX) { None } else { Some(Self::from_version(version)) From 495c9a1e15e07607fea1f8a90afb22c6cc802ed1 Mon Sep 17 00:00:00 2001 From: jmhrpr Date: Tue, 21 Nov 2023 15:58:45 +0000 Subject: [PATCH 02/10] fix: correct range intersection --- src/store/mod.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/store/mod.rs b/src/store/mod.rs index a244a1bc..91440d25 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -100,15 +100,21 @@ pub fn store_stream_for_range( /// The range used for request should be the intersection of `region_range` and `range`. fn range_intersection(region_range: (Key, Key), range: (Key, Key)) -> (Key, Key) { + let (req_lower, req_upper) = if range.0 > range.1 { + (range.1, range.0) + } else { + (range.0, range.1) + }; + let (lower, upper) = region_range; let up = if upper.is_empty() { - range.1 - } else if range.1.is_empty() { + req_upper + } else if req_upper.is_empty() { upper } else { - min(upper, range.1) + min(upper, req_upper) }; - (max(lower, range.0), up) + (max(lower, req_lower), up) } pub fn store_stream_for_ranges( From 2f3b1af65e2bf366746858485614bf7114f7378c Mon Sep 17 00:00:00 2001 From: jmhrpr Date: Tue, 21 Nov 2023 18:33:26 +0000 Subject: [PATCH 03/10] swap range intersection ends when reverse scan --- src/store/mod.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/store/mod.rs b/src/store/mod.rs index 91440d25..ee513cfb 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -100,10 +100,10 @@ pub fn store_stream_for_range( /// The range used for request should be the intersection of `region_range` and `range`. fn range_intersection(region_range: (Key, Key), range: (Key, Key)) -> (Key, Key) { - let (req_lower, req_upper) = if range.0 > range.1 { - (range.1, range.0) + let (req_lower, req_upper, reversed) = if range.0 > range.1 { + (range.1, range.0, true) } else { - (range.0, range.1) + (range.0, range.1, false) }; let (lower, upper) = region_range; @@ -114,7 +114,14 @@ fn range_intersection(region_range: (Key, Key), range: (Key, Key)) -> (Key, Key) } else { min(upper, req_upper) }; - (max(lower, req_lower), up) + + let (res_lower, res_upper) = (max(lower, req_lower), up); + + if reversed { + (res_upper, res_lower) + } else { + (res_lower, res_upper) + } } pub fn store_stream_for_ranges( From 435cf0a36dbb1026c32f757f94bf86f7e713fbb1 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Sat, 25 Nov 2023 23:29:35 +0800 Subject: [PATCH 04/10] reproduce issue Signed-off-by: Ping Yu --- Makefile | 4 ++-- tests/integration_tests.rs | 36 +++++++++++++++++++++++------------- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/Makefile b/Makefile index 8014352d..aef0ad45 100644 --- a/Makefile +++ b/Makefile @@ -2,8 +2,8 @@ export RUSTFLAGS=-Dwarnings .PHONY: default check unit-test integration-tests test doc docker-pd docker-kv docker all -PD_ADDRS ?= "127.0.0.1:2379" -MULTI_REGION ?= 1 +export PD_ADDRS ?= 127.0.0.1:2379 +export MULTI_REGION ?= 1 ALL_FEATURES := integration-tests diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 71d48283..24556836 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -1028,27 +1028,37 @@ async fn txn_scan_reverse() -> Result<()> { init().await?; let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?; - let k1 = b"a1".to_vec(); - let k2 = b"a2".to_vec(); - let v1 = b"b1".to_vec(); - let v2 = b"b2".to_vec(); - - let reverse_resp = vec![ - (Key::from(k2.clone()), v2.clone()), - (Key::from(k1.clone()), v1.clone()), - ]; + // Keys in `keys` should locate in different regions. See `init()` for boundary of regions. + let keys: Vec = vec![ + 0x00000000_u32.to_be_bytes().to_vec(), + 0x40000000_u32.to_be_bytes().to_vec(), + b"a1".to_vec(), // 0x6149 + b"a2".to_vec(), // 0x614A + 0x80000000_u32.to_be_bytes().to_vec(), + 0xC0000000_u32.to_be_bytes().to_vec(), + ] + .into_iter() + .map(Into::into) + .collect(); + let values: Vec> = (0..keys.len()) + .map(|i| format!("v{}", i).into_bytes()) + .collect(); + let bound_range: BoundRange = + (keys.first().unwrap().clone()..=keys.last().unwrap().clone()).into(); // Pessimistic option is not stable in this case. Use optimistic options instead. let option = TransactionOptions::new_optimistic().drop_check(tikv_client::CheckLevel::Warn); let mut t = client.begin_with_options(option.clone()).await?; - t.put(k1.clone(), v1).await?; - t.put(k2.clone(), v2).await?; + let mut reverse_resp = Vec::with_capacity(keys.len()); + for (k, v) in keys.into_iter().zip(values.into_iter()).rev() { + t.put(k.clone(), v.clone()).await?; + reverse_resp.push((k, v)); + } t.commit().await?; let mut t2 = client.begin_with_options(option).await?; - let bound_range: BoundRange = (k1..=k2).into(); let resp = t2 - .scan_reverse(bound_range, 2) + .scan_reverse(bound_range, 100) .await? .map(|kv| (kv.0, kv.1)) .collect::)>>(); From 69862f0c37c26be8a67feeb9c7f73cb89ca96406 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Sun, 26 Nov 2023 17:12:32 +0800 Subject: [PATCH 05/10] fix reverse range Signed-off-by: Ping Yu --- src/raw/requests.rs | 4 ++ src/request/mod.rs | 1 + src/request/shard.rs | 41 +++++++++++++++++++- src/transaction/requests.rs | 3 ++ tests/integration_tests.rs | 75 ++++++++++++++++++++++++++++++++++++- 5 files changed, 120 insertions(+), 4 deletions(-) diff --git a/src/raw/requests.rs b/src/raw/requests.rs index 23bfce73..0be733cf 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -16,6 +16,7 @@ use crate::proto::kvrpcpb; use crate::proto::kvrpcpb::ApiVersion; use crate::proto::metapb; use crate::proto::tikvpb::tikv_client::TikvClient; +use crate::range_request; use crate::request::plan::ResponseWithShard; use crate::request::Collect; use crate::request::CollectSingle; @@ -23,6 +24,7 @@ use crate::request::DefaultProcessor; use crate::request::KvRequest; use crate::request::Merge; use crate::request::Process; +use crate::request::RangeRequest; use crate::request::Shardable; use crate::request::SingleKey; use crate::shardable_key; @@ -227,6 +229,7 @@ impl KvRequest for kvrpcpb::RawDeleteRangeRequest { type Response = kvrpcpb::RawDeleteRangeResponse; } +range_request!(kvrpcpb::RawDeleteRangeRequest); shardable_range!(kvrpcpb::RawDeleteRangeRequest); pub fn new_raw_scan_request( @@ -250,6 +253,7 @@ impl KvRequest for kvrpcpb::RawScanRequest { type Response = kvrpcpb::RawScanResponse; } +range_request!(kvrpcpb::RawScanRequest); // TODO: support reverse raw scan. shardable_range!(kvrpcpb::RawScanRequest); impl Merge for Collect { diff --git a/src/request/mod.rs b/src/request/mod.rs index aecaf26d..8c3a45cb 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -23,6 +23,7 @@ pub use self::plan_builder::SingleKey; pub use self::shard::Batchable; pub use self::shard::HasNextBatch; pub use self::shard::NextBatch; +pub use self::shard::RangeRequest; pub use self::shard::Shardable; use crate::backoff::Backoff; use crate::backoff::DEFAULT_REGION_BACKOFF; diff --git a/src/request/shard.rs b/src/request/shard.rs index 7c78743d..ec234239 100644 --- a/src/request/shard.rs +++ b/src/request/shard.rs @@ -12,6 +12,7 @@ use crate::request::KvRequest; use crate::request::Plan; use crate::request::ResolveLock; use crate::store::RegionStore; +use crate::store::Request; use crate::Result; macro_rules! impl_inner_shardable { @@ -204,6 +205,32 @@ macro_rules! shardable_keys { }; } +pub trait RangeRequest: Request { + fn is_reverse(&self) -> bool { + false + } +} + +#[doc(hidden)] +#[macro_export] +macro_rules! range_request { + ($type_: ty) => { + impl RangeRequest for $type_ {} + }; +} + +#[doc(hidden)] +#[macro_export] +macro_rules! reversible_range_request { + ($type_: ty) => { + impl RangeRequest for $type_ { + fn is_reverse(&self) -> bool { + self.reverse + } + } + }; +} + #[doc(hidden)] #[macro_export] macro_rules! shardable_range { @@ -215,8 +242,13 @@ macro_rules! shardable_range { &self, pd_client: &Arc, ) -> BoxStream<'static, $crate::Result<(Self::Shard, $crate::store::RegionStore)>> { - let start_key = self.start_key.clone().into(); - let end_key = self.end_key.clone().into(); + let mut start_key = self.start_key.clone().into(); + let mut end_key = self.end_key.clone().into(); + // In a reverse range request, the range is in the meaning of [end_key, start_key), i.e. end_key <= x < start_key. + // Therefore, before fetching the regions from PD, it is necessary to swap the values of start_key and end_key. + if self.is_reverse() { + std::mem::swap(&mut start_key, &mut end_key); + } $crate::store::store_stream_for_range((start_key, end_key), pd_client.clone()) } @@ -227,8 +259,13 @@ macro_rules! shardable_range { ) -> $crate::Result<()> { self.set_context(store.region_with_leader.context()?); + // In a reverse range request, the range is in the meaning of [end_key, start_key), i.e. end_key <= x < start_key. + // As a result, after obtaining start_key and end_key from PD, we need to swap their values when assigning them to the request. self.start_key = shard.0.into(); self.end_key = shard.1.into(); + if self.is_reverse() { + std::mem::swap(&mut self.start_key, &mut self.end_key); + } Ok(()) } } diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 6d3c0999..4f3e1b93 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -28,10 +28,12 @@ use crate::request::KvRequest; use crate::request::Merge; use crate::request::NextBatch; use crate::request::Process; +use crate::request::RangeRequest; use crate::request::ResponseWithShard; use crate::request::Shardable; use crate::request::SingleKey; use crate::request::{Batchable, StoreRequest}; +use crate::reversible_range_request; use crate::shardable_key; use crate::shardable_keys; use crate::shardable_range; @@ -170,6 +172,7 @@ impl KvRequest for kvrpcpb::ScanRequest { type Response = kvrpcpb::ScanResponse; } +reversible_range_request!(kvrpcpb::ScanRequest); shardable_range!(kvrpcpb::ScanRequest); impl Merge for Collect { diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 24556836..82442c4b 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -1028,12 +1028,83 @@ async fn txn_scan_reverse() -> Result<()> { init().await?; let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?; + let k1 = b"k1".to_vec(); + let k2 = b"k2".to_vec(); + let k3 = b"k3".to_vec(); + + let v1 = b"v1".to_vec(); + let v2 = b"v2".to_vec(); + let v3 = b"v3".to_vec(); + + // Pessimistic option is not stable in this case. Use optimistic options instead. + let option = TransactionOptions::new_optimistic().drop_check(tikv_client::CheckLevel::Warn); + let mut t = client.begin_with_options(option.clone()).await?; + t.put(k1.clone(), v1.clone()).await?; + t.put(k2.clone(), v2.clone()).await?; + t.put(k3.clone(), v3.clone()).await?; + t.commit().await?; + + let mut t2 = client.begin_with_options(option).await?; + { + // For [k1, k3]: + let bound_range: BoundRange = (k1.clone()..=k3.clone()).into(); + let resp = t2 + .scan_reverse(bound_range, 3) + .await? + .map(|kv| (kv.0, kv.1)) + .collect::)>>(); + assert_eq!( + resp, + vec![ + (Key::from(k3.clone()), v3.clone()), + (Key::from(k2.clone()), v2.clone()), + (Key::from(k1.clone()), v1.clone()), + ] + ); + } + { + // For [k1, k3): + let bound_range: BoundRange = (k1.clone()..k3.clone()).into(); + let resp = t2 + .scan_reverse(bound_range, 3) + .await? + .map(|kv| (kv.0, kv.1)) + .collect::)>>(); + assert_eq!( + resp, + vec![ + (Key::from(k2.clone()), v2.clone()), + (Key::from(k1.clone()), v1), + ] + ); + } + { + // For (k1, k3): + let mut start_key = k1.clone(); + start_key.push(0); + let bound_range: BoundRange = (start_key..k3).into(); + let resp = t2 + .scan_reverse(bound_range, 3) + .await? + .map(|kv| (kv.0, kv.1)) + .collect::)>>(); + assert_eq!(resp, vec![(Key::from(k2), v2),]); + } + t2.commit().await?; + + Ok(()) +} + +#[tokio::test] +#[serial] +async fn txn_scan_reverse_multi_regions() -> Result<()> { + init().await?; + let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?; + // Keys in `keys` should locate in different regions. See `init()` for boundary of regions. let keys: Vec = vec![ 0x00000000_u32.to_be_bytes().to_vec(), 0x40000000_u32.to_be_bytes().to_vec(), - b"a1".to_vec(), // 0x6149 - b"a2".to_vec(), // 0x614A 0x80000000_u32.to_be_bytes().to_vec(), 0xC0000000_u32.to_be_bytes().to_vec(), ] From ef4d6aa68ac4c8ecd74865149809d978c7d5cb94 Mon Sep 17 00:00:00 2001 From: jmhrpr <25673452+jmhrpr@users.noreply.github.com> Date: Tue, 30 Jul 2024 14:51:50 +0100 Subject: [PATCH 06/10] set max decoding message size to unlimited --- src/store/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/store/client.rs b/src/store/client.rs index 363d4137..7423d385 100644 --- a/src/store/client.rs +++ b/src/store/client.rs @@ -35,7 +35,7 @@ impl KvConnect for TikvConnect { self.security_mgr .connect(address, TikvClient::new) .await - .map(|c| KvRpcClient::new(c, self.timeout)) + .map(|c| KvRpcClient::new(c.max_decoding_message_size(usize::MAX), self.timeout)) } } From 7e9552de45f7fe2e95922ff666fc13416a5fc06d Mon Sep 17 00:00:00 2001 From: jmhrpr <25673452+jmhrpr@users.noreply.github.com> Date: Sat, 3 Aug 2024 13:31:12 +0100 Subject: [PATCH 07/10] config commit ttl calculation --- src/transaction/transaction.rs | 67 ++++++++++++++++++++++++++++++++-- 1 file changed, 63 insertions(+), 4 deletions(-) diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index e984f153..764a1bf1 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -1046,6 +1046,47 @@ pub enum TransactionKind { Pessimistic(Timestamp), } +#[derive(Clone, PartialEq, Debug)] +pub struct CommitTTLParameters { + max_ttl: u64, + min_ttl: u64, + txn_commit_batch_size: u64, + ttl_factor: f64, +} + +impl Default for CommitTTLParameters { + fn default() -> Self { + Self { + max_ttl: MAX_TTL, + min_ttl: DEFAULT_LOCK_TTL, + txn_commit_batch_size: TXN_COMMIT_BATCH_SIZE, + ttl_factor: TTL_FACTOR, + } + } +} + +impl CommitTTLParameters { + pub fn max_ttl(mut self, millis: u64) -> Self { + self.max_ttl = millis; + self + } + + pub fn min_ttl(mut self, millis: u64) -> Self { + self.min_ttl = millis; + self + } + + pub fn txn_commit_batch_size(mut self, size: u64) -> Self { + self.txn_commit_batch_size = size; + self + } + + pub fn ttl_factor(mut self, factor: f64) -> Self { + self.ttl_factor = factor; + self + } +} + /// Options for configuring a transaction. /// /// `TransactionOptions` has a builder-style API. @@ -1063,6 +1104,8 @@ pub struct TransactionOptions { retry_options: RetryOptions, /// What to do if the transaction is dropped without an attempt to commit or rollback check_level: CheckLevel, + /// Variables related to commit TTL calculation + ttl_parameters: CommitTTLParameters, #[doc(hidden)] heartbeat_option: HeartbeatOption, } @@ -1089,6 +1132,7 @@ impl TransactionOptions { read_only: false, retry_options: RetryOptions::default_optimistic(), check_level: CheckLevel::Panic, + ttl_parameters: Default::default(), heartbeat_option: HeartbeatOption::FixedTime(DEFAULT_HEARTBEAT_INTERVAL), } } @@ -1102,6 +1146,7 @@ impl TransactionOptions { read_only: false, retry_options: RetryOptions::default_pessimistic(), check_level: CheckLevel::Panic, + ttl_parameters: Default::default(), heartbeat_option: HeartbeatOption::FixedTime(DEFAULT_HEARTBEAT_INTERVAL), } } @@ -1155,6 +1200,13 @@ impl TransactionOptions { self } + /// Set Commit TTL parameters. + #[must_use] + pub fn ttl_parameters(mut self, options: CommitTTLParameters) -> TransactionOptions { + self.ttl_parameters = options; + self + } + fn push_for_update_ts(&mut self, for_update_ts: Timestamp) { match &mut self.kind { TransactionKind::Optimistic => unreachable!(), @@ -1447,11 +1499,18 @@ impl Committer { } fn calc_txn_lock_ttl(&mut self) -> u64 { - let mut lock_ttl = DEFAULT_LOCK_TTL; - if self.write_size > TXN_COMMIT_BATCH_SIZE { + let CommitTTLParameters { + max_ttl, + min_ttl, + txn_commit_batch_size, + ttl_factor, + } = self.options.ttl_parameters; + + let mut lock_ttl = min_ttl; + if self.write_size > txn_commit_batch_size { let size_mb = self.write_size as f64 / 1024.0 / 1024.0; - lock_ttl = (TTL_FACTOR * size_mb.sqrt()) as u64; - lock_ttl = lock_ttl.clamp(DEFAULT_LOCK_TTL, MAX_TTL); + lock_ttl = (ttl_factor * size_mb.sqrt()) as u64; + lock_ttl = lock_ttl.clamp(min_ttl, max_ttl); } lock_ttl } From 92898d7ddc3a6059329c480362a2f34b113cd007 Mon Sep 17 00:00:00 2001 From: jmhrpr <25673452+jmhrpr@users.noreply.github.com> Date: Sat, 3 Aug 2024 13:40:05 +0100 Subject: [PATCH 08/10] expose ttl params --- src/transaction/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/transaction/mod.rs b/src/transaction/mod.rs index 81a290fa..4b1f3149 100644 --- a/src/transaction/mod.rs +++ b/src/transaction/mod.rs @@ -13,6 +13,7 @@ pub(crate) use lock::resolve_locks; pub(crate) use lock::HasLocks; pub use snapshot::Snapshot; pub use transaction::CheckLevel; +pub use transaction::CommitTTLParameters; #[doc(hidden)] pub use transaction::HeartbeatOption; pub use transaction::Transaction; From 1fa846b9817855cc5c0b8bcfb76aff574a63345b Mon Sep 17 00:00:00 2001 From: jmhrpr <25673452+jmhrpr@users.noreply.github.com> Date: Sat, 3 Aug 2024 15:54:03 +0100 Subject: [PATCH 09/10] expose ttl params --- src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index a2acf57b..bb454dde 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -156,6 +156,8 @@ pub use crate::timestamp::TimestampExt; #[doc(inline)] pub use crate::transaction::lowering as transaction_lowering; #[doc(inline)] +pub use crate::transaction::CommitTTLParameters; +#[doc(inline)] pub use crate::transaction::CheckLevel; #[doc(inline)] pub use crate::transaction::Client as TransactionClient; From 58a645eb8ccddec3336b961dd3a32fba4601285c Mon Sep 17 00:00:00 2001 From: jmhrpr <25673452+jmhrpr@users.noreply.github.com> Date: Tue, 19 Nov 2024 00:44:10 +0000 Subject: [PATCH 10/10] logging in resolvelockerror --- src/request/plan.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/request/plan.rs b/src/request/plan.rs index 369a2ff1..1ec4fe5b 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -8,6 +8,7 @@ use async_trait::async_trait; use futures::future::try_join_all; use futures::prelude::*; use log::debug; +use log::error; use log::info; use tokio::sync::Semaphore; use tokio::time::sleep; @@ -579,6 +580,7 @@ where } if self.backoff.is_none() { + error!("no backoff, resolve lock error: {locks:?}"); return Err(Error::ResolveLockError(locks)); } @@ -588,7 +590,10 @@ where result = self.inner.execute().await?; } else { match clone.backoff.next_delay_duration() { - None => return Err(Error::ResolveLockError(live_locks)), + None => { + error!("backoff exhausted, resolve lock error: {live_locks:?}"); + return Err(Error::ResolveLockError(live_locks)) + }, Some(delay_duration) => { sleep(delay_duration).await; result = clone.inner.execute().await?;