diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index db2f1e1a0..42c75156d 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -136,11 +136,11 @@ jobs: -e ACTIONS_CACHE_URL=${ACTIONS_CACHE_URL} \ -e ACTIONS_RUNTIME_TOKEN=${ACTIONS_RUNTIME_TOKEN} \ ghcr.io/xline-kv/build-env:latest \ - cargo build --release --bin xline --bin benchmark --bin validation_lock_client + cargo build --release --bin xline --bin benchmark sudo apt-get install -y --force-yes expect cd scripts - cp ../target/release/{xline,benchmark,validation_lock_client} . + cp ../target/release/{xline,benchmark} . ldd ./xline ldd ./benchmark cp ../fixtures/{private,public}.pem . diff --git a/.github/workflows/validation.yml b/.github/workflows/validation.yml index fe5e87137..4e6f4b6e6 100644 --- a/.github/workflows/validation.yml +++ b/.github/workflows/validation.yml @@ -35,6 +35,6 @@ jobs: cp ../fixtures/{private,public}.pem . docker build . -t ghcr.io/xline-kv/xline:latest docker pull gcr.io/etcd-development/etcd:v3.5.5 - binaries: 'xline,benchmark,validation_lock_client' + binaries: 'xline,benchmark' script_name: 'validation_test.sh' uploadLogs: true diff --git a/Cargo.lock b/Cargo.lock index a8ae70629..60208036e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -137,6 +137,56 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-dropper" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d901072ae4dcdca2201b98beb02d31fb4b6b2472fbd0e870b12ec15b8b35b2d2" +dependencies = [ + "async-dropper-derive", + "async-dropper-simple", + "async-trait", + "futures", + "tokio", +] + +[[package]] +name = "async-dropper-derive" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a35cf17a37761f1c88b8e770b5956820fe84c12854165b6f930c604ea186e47e" +dependencies = [ + "async-trait", + "proc-macro2", + "quote", + "syn 2.0.63", + "tokio", +] + +[[package]] +name = "async-dropper-simple" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7c4748dfe8cd3d625ec68fc424fa80c134319881185866f9e173af9e5d8add8" +dependencies = [ + "async-scoped", + "async-trait", + "futures", + "rustc_version", + "tokio", +] + +[[package]] +name = "async-scoped" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4042078ea593edffc452eef14e99fdb2b120caa4ad9618bcdeabc4a023b98740" +dependencies = [ + "futures", + "pin-project", + "tokio", +] + [[package]] name = "async-stream" version = "0.3.5" @@ -2487,6 +2537,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + [[package]] name = "rustix" version = "0.38.34" @@ -2559,6 +2618,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "semver" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" + [[package]] name = "serde" version = "1.0.203" @@ -3836,6 +3901,8 @@ name = "xline-client" version = "0.1.0" dependencies = [ "anyhow", + "async-dropper", + "async-trait", "clippy-utilities", "curp", "futures", @@ -3857,8 +3924,6 @@ dependencies = [ name = "xline-test-utils" version = "0.1.0" dependencies = [ - "anyhow", - "clap", "futures", "madsim-tokio", "madsim-tonic", diff --git a/crates/xline-client/Cargo.toml b/crates/xline-client/Cargo.toml index aedcb2030..72c591457 100644 --- a/crates/xline-client/Cargo.toml +++ b/crates/xline-client/Cargo.toml @@ -12,6 +12,8 @@ keywords = ["Client", "Xline", "RPC"] [dependencies] anyhow = "1.0.83" +async-dropper = { version = "0.3.1", features = ["tokio", "simple"] } +async-trait = "0.1.80" clippy-utilities = "0.2.0" curp = { path = "../curp" } futures = "0.3.25" diff --git a/crates/xline-client/examples/lock.rs b/crates/xline-client/examples/lock.rs index aaeef861c..a0bb84f79 100644 --- a/crates/xline-client/examples/lock.rs +++ b/crates/xline-client/examples/lock.rs @@ -1,6 +1,7 @@ use anyhow::Result; use xline_client::{ - types::lock::{LockRequest, UnlockRequest}, + clients::Xutex, + types::kv::{Compare, CompareResult, PutRequest, TxnOp}, Client, ClientOptions, }; @@ -9,19 +10,24 @@ async fn main() -> Result<()> { // the name and address of all curp members let curp_members = ["10.0.0.1:2379", "10.0.0.2:2379", "10.0.0.3:2379"]; - let client = Client::connect(curp_members, ClientOptions::default()) - .await? - .lock_client(); + let client = Client::connect(curp_members, ClientOptions::default()).await?; - // acquire a lock - let resp = client.lock(LockRequest::new("lock-test")).await?; + let lock_client = client.lock_client(); + let kv_client = client.kv_client(); - let key = resp.key; + let mut xutex = Xutex::new(lock_client, "lock-test", None, None).await?; + // when the `xutex_guard` drop, the lock will be unlocked. + let xutex_guard = xutex.lock_unsafe().await?; - println!("lock key: {:?}", String::from_utf8_lossy(&key)); + let txn_req = xutex_guard + .txn_check_locked_key() + .when([Compare::value("key2", CompareResult::Equal, "value2")]) + .and_then([TxnOp::put( + PutRequest::new("key2", "value3").with_prev_kv(true), + )]) + .or_else(&[]); - // release the lock - client.unlock(UnlockRequest::new(key)).await?; + let _resp = kv_client.txn(txn_req).await?; Ok(()) } diff --git a/crates/xline-client/src/clients/lock.rs b/crates/xline-client/src/clients/lock.rs index c20395e96..9a7b4d624 100644 --- a/crates/xline-client/src/clients/lock.rs +++ b/crates/xline-client/src/clients/lock.rs @@ -1,77 +1,240 @@ -use std::{ - fmt::Debug, - pin::Pin, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, -}; +use std::{fmt::Debug, sync::Arc, time::Duration}; +use async_dropper::{AsyncDrop, AsyncDropper}; +use async_trait::async_trait; use clippy_utilities::OverflowArithmetic; -use futures::{Future, FutureExt}; +use tokio::{task::JoinHandle, time::sleep}; use tonic::transport::Channel; use xlineapi::{ command::{Command, CommandResponse, KeyRange, SyncResponse}, - Compare, CompareResult, CompareTarget, DeleteRangeRequest, DeleteRangeResponse, EventType, - LockResponse, PutRequest, RangeRequest, RangeResponse, Request, RequestOp, RequestWrapper, - Response, ResponseHeader, SortOrder, SortTarget, TargetUnion, TxnRequest, TxnResponse, - UnlockResponse, + Compare, CompareResult, CompareTarget, DeleteRangeRequest, EventType, PutRequest, RangeRequest, + RangeResponse, Request, RequestOp, RequestWrapper, Response, ResponseHeader, SortOrder, + SortTarget, TargetUnion, TxnRequest, TxnResponse, }; use crate::{ - clients::{lease::LeaseClient, watch::WatchClient}, + clients::{lease::LeaseClient, watch::WatchClient, DEFAULT_SESSION_TTL}, error::{Result, XlineClientError}, lease_gen::LeaseIdGenerator, types::{ - lease::LeaseGrantRequest, - lock::{LockRequest, UnlockRequest}, + kv::TxnRequest as KvTxnRequest, + lease::{LeaseGrantRequest, LeaseKeepAliveRequest}, watch::WatchRequest, }, CurpClient, }; -/// Client for Lock operations. -#[derive(Clone)] -pub struct LockClient { - /// The client running the CURP protocol, communicate with all servers. - curp_client: Arc, - /// The lease client - lease_client: LeaseClient, - /// The watch client - watch_client: WatchClient, - /// Auth token - token: Option, +/// Session represents a lease kept alive for the lifetime of a client. +#[derive(Debug)] +pub struct Session { + /// The lock client that used to create the session + client: LockClient, + /// lease id + lease_id: i64, + /// `keep_alive` task will auto-renew the lease + keep_alive: Option>>, } -impl Debug for LockClient { +impl Drop for Session { #[inline] - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("LockClient") - .field("lease_client", &self.lease_client) - .field("watch_client", &self.watch_client) - .field("token", &self.token) - .finish() + fn drop(&mut self) { + if let Some(keep_alive) = self.keep_alive.take() { + keep_alive.abort(); + } } } -// These methods primarily originate from xline lock server, -// see also: `xline/src/server/lock_server.rs` -impl LockClient { - /// Creates a new `LockClient` +/// Xutex(Xline Mutex) implements the sync lock with xline +#[derive(Debug)] +pub struct Xutex { + /// Lock session + session: Session, + /// Lock + prefix: String, + /// Lock key + key: String, + /// The revision of lock key + rev: i64, + /// Request header + header: Option, +} + +/// An RAII implementation of a “scoped lock” of an `Xutex` +#[derive(Default, Debug)] +pub struct XutexGuard { + /// The lock client that used to unlock `key` when `XutexGuard` is dropped + client: Option, + /// The key that the lock held + key: String, +} + +impl XutexGuard { + /// Create a new `XutexGuard` + fn new(client: LockClient, key: String) -> AsyncDropper { + AsyncDropper::new(Self { + client: Some(client), + key, + }) + } + + /// Get the key of the Xutex #[inline] - pub fn new( - curp_client: Arc, - channel: Channel, - token: Option, - id_gen: Arc, - ) -> Self { - Self { - curp_client: Arc::clone(&curp_client), - lease_client: LeaseClient::new(curp_client, channel.clone(), token.clone(), id_gen), - watch_client: WatchClient::new(channel, token.clone()), - token, + #[must_use] + pub fn key(&self) -> &str { + self.key.as_str() + } + + /// Return a `TxnRequest` which will perform the success ops when the locked key is exist. + /// This method is syntactic sugar + #[inline] + #[must_use] + pub fn txn_check_locked_key(&self) -> KvTxnRequest { + let mut txn_request = KvTxnRequest::new(); + #[allow(clippy::as_conversions)] + let cmp = Compare { + result: CompareResult::Greater as i32, + target: CompareTarget::Create as i32, + key: self.key().into(), + range_end: Vec::new(), + target_union: Some(TargetUnion::CreateRevision(0)), + }; + txn_request.inner.compare.push(cmp); + txn_request + } +} + +#[async_trait] +impl AsyncDrop for XutexGuard { + #[inline] + async fn async_drop(&mut self) { + if let Some(ref client) = self.client { + let _ignore = client.delete_key(self.key.as_bytes()).await; } } +} + +impl Xutex { + /// Create an Xutex + /// + /// # Errors + /// + /// Return errors when the lease client failed to grant a lease + #[inline] + pub async fn new( + client: LockClient, + prefix: &str, + ttl: Option, + lease_id: Option, + ) -> Result { + let ttl = ttl.unwrap_or(DEFAULT_SESSION_TTL); + let lease_id = if let Some(id) = lease_id { + id + } else { + let lease_response = client + .lease_client + .grant(LeaseGrantRequest::new(ttl)) + .await?; + lease_response.id + }; + let mut lease_client = client.lease_client.clone(); + let keep_alive = Some(tokio::spawn(async move { + /// The renew interval factor of which value equals 60% of one second. + const RENEW_INTERVAL_FACTOR: u64 = 600; + let (mut keeper, mut stream) = lease_client + .keep_alive(LeaseKeepAliveRequest::new(lease_id)) + .await?; + loop { + keeper.keep_alive()?; + if let Some(resp) = stream.message().await? { + if resp.ttl < 0 { + return Err(XlineClientError::InvalidArgs(String::from( + "lease keepalive response has negative ttl", + ))); + } + sleep(Duration::from_millis( + resp.ttl.unsigned_abs().overflow_mul(RENEW_INTERVAL_FACTOR), + )) + .await; + } + } + })); + + let session = Session { + client, + lease_id, + keep_alive, + }; + + Ok(Self { + session, + prefix: format!("{prefix}/"), + key: String::new(), + rev: -1, + header: None, + }) + } + + /// try to acquire lock + async fn try_acquire(&mut self) -> Result { + let lease_id = self.session.lease_id; + let prefix = self.prefix.as_str(); + self.key = format!("{prefix}{lease_id:x}"); + #[allow(clippy::as_conversions)] // this cast is always safe + let cmp = Compare { + result: CompareResult::Equal as i32, + target: CompareTarget::Create as i32, + key: self.key.as_bytes().to_vec(), + range_end: vec![], + target_union: Some(TargetUnion::CreateRevision(0)), + }; + let put = RequestOp { + request: Some(Request::RequestPut(PutRequest { + key: self.key.as_bytes().to_vec(), + value: vec![], + lease: lease_id, + ..Default::default() + })), + }; + let get = RequestOp { + request: Some(Request::RequestRange(RangeRequest { + key: self.key.as_bytes().to_vec(), + ..Default::default() + })), + }; + let range_end = KeyRange::get_prefix(prefix.as_bytes()); + #[allow(clippy::as_conversions)] // this cast is always safe + let get_owner = RequestOp { + request: Some(Request::RequestRange(RangeRequest { + key: prefix.as_bytes().to_vec(), + range_end, + sort_order: SortOrder::Ascend as i32, + sort_target: SortTarget::Create as i32, + limit: 1, + ..Default::default() + })), + }; + let acquire_txn = TxnRequest { + compare: vec![cmp], + success: vec![put, get_owner.clone()], + failure: vec![get, get_owner], + }; + let (cmd_res, sync_res) = self.session.client.propose(acquire_txn, false).await?; + let resp = Into::::into(cmd_res.into_inner()); + self.rev = if resp.succeeded { + sync_res + .unwrap_or_else(|| unreachable!("sync_res always has value when use slow path")) + .revision() + } else { + #[allow(clippy::indexing_slicing)] + // it's safe to do since the txn response must have two responses. + if let Some(Response::ResponseRange(ref res)) = resp.responses[0].response { + res.kvs[0].create_revision + } else { + unreachable!("The first response in txn responses should be a RangeResponse when txn failed: {:?}", resp); + } + }; + Ok(resp) + } /// Acquires a distributed shared lock on a given named lock. /// On success, it will return a unique key that exists so long as the @@ -80,14 +243,25 @@ impl LockClient { /// lock ownership. The lock is held until Unlock is called on the key or the /// lease associate with the owner expires. /// + /// NOTES. Due to the inherent insecurity of distributed locks, it is difficult to balance efficiency + /// and correctness. You cannot have your cake and eat it too. That’s why this method is named `lock_unsafe`. + /// The term 'unsafe' in this context has a different meaning compared to 'unsafe' in Rust. On the grounds of + /// safety, we recommend users use transactions(`txn_check_locked_key`) to operate Xline while holding the lock. + /// FYI: [How to do distributed locking](https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html) + /// /// # Errors /// /// This function will return an error if the inner CURP client encountered a propose failure /// + /// # Panics + /// + /// Panic if the given `LockRequest.inner.lease` less than or equal to 0 + /// /// # Examples /// /// ```no_run - /// use xline_client::{ + /// use xline_client:: + /// clients::{Session, Xutex}, /// types::lock::{LockRequest, UnlockRequest}, /// Client, ClientOptions, /// }; @@ -102,147 +276,125 @@ impl LockClient { /// .await? /// .lock_client(); /// - /// // acquire a lock - /// let resp = client - /// .lock(LockRequest::new("lock-test")) - /// .await?; - /// - /// let key = resp.key; - /// - /// println!("lock key: {:?}", String::from_utf8_lossy(&key)); + /// // acquire a lock session + /// let session = Session::new(client.lock_client()).build().await?; + /// let mut xutex = Xutex::new(session, "lock-test"); + /// let lock = xutex.lock().await?; /// + /// let txn_req = xutex_guard + /// .txn_check_locked_key() + /// .when([Compare::value("key2", CompareResult::Equal, "value2")]) + /// .and_then([TxnOp::put( + /// PutRequest::new("key2", "value3").with_prev_kv(true), + /// )]) + /// .or_else(&[]); + + /// let _resp = kv_client.txn(txn_req).await?; + /// println!("lock key: {:?}", String::from_utf8_lossy(xutex.key().as_bytes(); + /// // the lock will be released when the lock session is dropped. /// Ok(()) /// } /// ``` #[inline] - pub async fn lock(&self, request: LockRequest) -> Result { - let mut lease_id = request.inner.lease; - if lease_id == 0 { - let resp = self - .lease_client - .grant(LeaseGrantRequest::new(request.ttl)) - .await?; - lease_id = resp.id; + pub async fn lock_unsafe(&mut self) -> Result> { + if self + .session + .keep_alive + .as_ref() + .is_some_and(JoinHandle::is_finished) + { + return Err(XlineClientError::LeaseError(String::from( + "Lock renew task exists unexpectedly", + ))); } - let prefix = format!( - "{}/", - String::from_utf8_lossy(&request.inner.name).into_owned() - ); - let key = format!("{prefix}{lease_id:x}"); - let lock_success = AtomicBool::new(false); - let lock_inner = self.lock_inner(prefix, key.clone(), lease_id, &lock_success); - tokio::pin!(lock_inner); - - LockFuture { - key, - lock_success: &lock_success, - lock_client: self, - lock_inner, + let resp = self.try_acquire().await?; + #[allow(clippy::indexing_slicing)] + if let Some(Response::ResponseRange(ref lock_owner)) = resp.responses[1].response { + // if no key on prefix or the key is created by current Xutex, that indicates we have already held the lock + if lock_owner + .kvs + .get(0) + .map_or(false, |kv| kv.create_revision == self.rev) + { + self.header = resp.header; + return Ok(XutexGuard::new( + self.session.client.clone(), + self.key.clone(), + )); + } + } else { + unreachable!("owner_resp should be a Get response") } - .await - } - - /// The inner lock logic - async fn lock_inner( - &self, - prefix: String, - key: String, - lease_id: i64, - lock_success: &AtomicBool, - ) -> Result { - let txn = Self::create_acquire_txn(&prefix, lease_id); - let (cmd_res, sync_res) = self.propose(txn, false).await?; - let mut txn_res = Into::::into(cmd_res.into_inner()); - let my_rev = sync_res - .unwrap_or_else(|| unreachable!("sync_res always has value when use slow path")) - .revision(); - let owner_res = txn_res - .responses - .swap_remove(1) - .response - .and_then(|r| { - if let Response::ResponseRange(res) = r { - Some(res) - } else { - None - } - }) - .unwrap_or_else(|| unreachable!("owner_resp should be a Get response")); - let owner_key = owner_res.kvs; - let header = if owner_key - .get(0) - .map_or(false, |kv| kv.create_revision == my_rev) - { - owner_res.header - } else { - self.wait_delete(prefix, my_rev).await?; - let range_req = RangeRequest { - key: key.as_bytes().to_vec(), - ..Default::default() - }; - let result = self.propose(range_req, true).await; - match result { - Ok(res) => { - let res = Into::::into(res.0.into_inner()); - if res.kvs.is_empty() { - return Err(XlineClientError::RpcError(String::from("session expired"))); - } - res.header - } - Err(e) => { - let _ignore = self.delete_key(key.as_bytes()).await; - return Err(e); + self.session + .client + .wait_delete(self.prefix.clone(), self.rev) + .await?; + // make sure the session is no expired, and the owner key still exists. + let range_req = RangeRequest { + key: self.key.as_bytes().to_vec(), + ..Default::default() + }; + match self.session.client.propose(range_req, true).await { + Ok((cmd_res, _sync_res)) => { + let res = Into::::into(cmd_res.into_inner()); + if res.kvs.is_empty() { + return Err(XlineClientError::RpcError(String::from("session expired"))); } + self.header = res.header; + Ok(XutexGuard::new( + self.session.client.clone(), + self.key.clone(), + )) } - }; + Err(e) => { + self.session.client.delete_key(self.key.as_bytes()).await?; + Err(e) + } + } + } +} - // The `Release` ordering ensures that this store will not be reordered. - lock_success.store(true, Ordering::Release); +/// Client for Lock operations. +#[derive(Clone)] +pub struct LockClient { + /// The client running the CURP protocol, communicate with all servers. + curp_client: Arc, + /// The lease client + lease_client: LeaseClient, + /// The watch client + watch_client: WatchClient, + /// Auth token + token: Option, +} - Ok(LockResponse { - header, - key: key.into_bytes(), - }) +impl Debug for LockClient { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LockClient") + .field("watch_client", &self.watch_client) + .field("token", &self.token) + .finish() } +} - /// Takes a key returned by Lock and releases the hold on lock. The - /// next Lock caller waiting for the lock will then be woken up and given - /// ownership of the lock. - /// - /// # Errors - /// - /// This function will return an error if the inner CURP client encountered a propose failure - /// - /// # Examples - /// - /// ```no_run - /// use xline_client::{ - /// types::lock::{LockRequest, UnlockRequest}, - /// Client, ClientOptions, - /// }; - /// use anyhow::Result; - /// - /// #[tokio::main] - /// async fn main() -> Result<()> { - /// // the name and address of all curp members - /// let curp_members = ["10.0.0.1:2379", "10.0.0.2:2379", "10.0.0.3:2379"]; - /// - /// let mut client = Client::connect(curp_members, ClientOptions::default()) - /// .await? - /// .lock_client(); - /// - /// // acquire a lock first - /// - /// client.unlock(UnlockRequest::new("lock_key")).await?; - /// - /// Ok(()) - /// } - /// ``` +// These methods primarily originate from xline lock server, +// see also: `xline/src/server/lock_server.rs` +impl LockClient { + /// Creates a new `LockClient` #[inline] - pub async fn unlock(&self, request: UnlockRequest) -> Result { - let header = self.delete_key(&request.inner.key).await?; - Ok(UnlockResponse { header }) + pub fn new( + curp_client: Arc, + channel: Channel, + token: Option, + id_gen: Arc, + ) -> Self { + Self { + curp_client: Arc::clone(&curp_client), + lease_client: LeaseClient::new(curp_client, channel.clone(), token.clone(), id_gen), + watch_client: WatchClient::new(channel, token.clone()), + token, + } } /// Propose request and get result with fast/slow path @@ -262,50 +414,6 @@ impl LockClient { .map_err(Into::into) } - /// Create txn for try acquire lock - fn create_acquire_txn(prefix: &str, lease_id: i64) -> TxnRequest { - let key = format!("{prefix}{lease_id:x}"); - #[allow(clippy::as_conversions)] // this cast is always safe - let cmp = Compare { - result: CompareResult::Equal as i32, - target: CompareTarget::Create as i32, - key: key.as_bytes().to_vec(), - range_end: vec![], - target_union: Some(TargetUnion::CreateRevision(0)), - }; - let put = RequestOp { - request: Some(Request::RequestPut(PutRequest { - key: key.as_bytes().to_vec(), - value: vec![], - lease: lease_id, - ..Default::default() - })), - }; - let get = RequestOp { - request: Some(Request::RequestRange(RangeRequest { - key: key.as_bytes().to_vec(), - ..Default::default() - })), - }; - let range_end = KeyRange::get_prefix(prefix.as_bytes()); - #[allow(clippy::as_conversions)] // this cast is always safe - let get_owner = RequestOp { - request: Some(Request::RequestRange(RangeRequest { - key: prefix.as_bytes().to_vec(), - range_end, - sort_order: SortOrder::Ascend as i32, - sort_target: SortTarget::Create as i32, - limit: 1, - ..Default::default() - })), - }; - TxnRequest { - compare: vec![cmp], - success: vec![put, get_owner.clone()], - failure: vec![get, get_owner], - } - } - /// Wait until last key deleted async fn wait_delete(&self, pfx: String, my_rev: i64) -> Result<()> { let rev = my_rev.overflow_sub(1); @@ -344,65 +452,12 @@ impl LockClient { } /// Delete key - async fn delete_key(&self, key: &[u8]) -> Result> { + async fn delete_key(&self, key: &[u8]) -> Result<()> { let del_req = DeleteRangeRequest { key: key.into(), ..Default::default() }; - let (cmd_res, _sync_res) = self.propose(del_req, true).await?; - let res = Into::::into(cmd_res.into_inner()); - Ok(res.header) - } -} - -/// The future that will do the lock operation -/// This exists because we need to do some clean up after the lock operation has failed or being cancelled -struct LockFuture<'a> { - /// The key associated with the lock - key: String, - /// Whether the acquire attempt is success - lock_success: &'a AtomicBool, - /// The lock client - lock_client: &'a LockClient, - /// The inner lock future - lock_inner: Pin<&'a mut (dyn Future> + Send)>, -} - -impl Debug for LockFuture<'_> { - #[inline] - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "key: {}, lock_success: {:?}, lock_client:{:?}", - self.key, self.lock_success, self.lock_client - ) - } -} - -impl Future for LockFuture<'_> { - type Output = Result; - - #[inline] - fn poll( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll { - self.lock_inner.poll_unpin(cx) - } -} - -impl Drop for LockFuture<'_> { - #[inline] - fn drop(&mut self) { - // We can safely use `Relaxed` ordering here as the if condition makes sure it - // won't be reordered. - if self.lock_success.load(Ordering::Relaxed) { - return; - } - let lock_client = self.lock_client.clone(); - let key = self.key.clone(); - let _ignore = tokio::spawn(async move { - let _ignore = lock_client.delete_key(key.as_bytes()).await; - }); + let (_cmd_res, _sync_res) = self.propose(del_req, true).await?; + Ok(()) } } diff --git a/crates/xline-client/src/clients/mod.rs b/crates/xline-client/src/clients/mod.rs index 836c72d6d..8a2ce51b3 100644 --- a/crates/xline-client/src/clients/mod.rs +++ b/crates/xline-client/src/clients/mod.rs @@ -3,7 +3,7 @@ pub use cluster::ClusterClient; pub use election::ElectionClient; pub use kv::KvClient; pub use lease::LeaseClient; -pub use lock::LockClient; +pub use lock::{LockClient, Session, Xutex}; pub use maintenance::MaintenanceClient; pub use watch::WatchClient; @@ -18,8 +18,11 @@ mod kv; /// Lease client. mod lease; /// Lock client. -mod lock; +pub mod lock; /// Maintenance client. mod maintenance; /// Watch client. mod watch; + +/// Default session ttl +pub const DEFAULT_SESSION_TTL: i64 = 60; diff --git a/crates/xline-client/src/types/lock.rs b/crates/xline-client/src/types/lock.rs deleted file mode 100644 index f150946d8..000000000 --- a/crates/xline-client/src/types/lock.rs +++ /dev/null @@ -1,77 +0,0 @@ -pub use xlineapi::{LockResponse, UnlockResponse}; - -/// Default session ttl -const DEFAULT_SESSION_TTL: i64 = 60; - -/// Request for `Lock` -#[derive(Debug, PartialEq)] -pub struct LockRequest { - /// The inner request - pub(crate) inner: xlineapi::LockRequest, - /// The ttl of the lease that attached to the lock - pub(crate) ttl: i64, -} - -impl LockRequest { - /// Creates a new `LockRequest` - #[inline] - #[must_use] - pub fn new(name: impl Into>) -> Self { - Self { - inner: xlineapi::LockRequest { - name: name.into(), - lease: 0, - }, - ttl: DEFAULT_SESSION_TTL, - } - } - - /// Set lease. - #[inline] - #[must_use] - pub const fn with_lease(mut self, lease: i64) -> Self { - self.inner.lease = lease; - self - } - - /// Set session TTL. - /// Will be ignored when lease id is set - #[inline] - #[must_use] - pub const fn with_ttl(mut self, ttl: i64) -> Self { - self.ttl = ttl; - self - } -} - -impl From for xlineapi::LockRequest { - #[inline] - fn from(req: LockRequest) -> Self { - req.inner - } -} - -/// Request for `Unlock` -#[derive(Debug)] -pub struct UnlockRequest { - /// The inner request - pub(crate) inner: xlineapi::UnlockRequest, -} - -impl UnlockRequest { - /// Creates a new `UnlockRequest` - #[inline] - #[must_use] - pub fn new(key: impl Into>) -> Self { - Self { - inner: xlineapi::UnlockRequest { key: key.into() }, - } - } -} - -impl From for xlineapi::UnlockRequest { - #[inline] - fn from(req: UnlockRequest) -> Self { - req.inner - } -} diff --git a/crates/xline-client/src/types/mod.rs b/crates/xline-client/src/types/mod.rs index 3b06de4e9..a3abb3b5f 100644 --- a/crates/xline-client/src/types/mod.rs +++ b/crates/xline-client/src/types/mod.rs @@ -6,8 +6,6 @@ pub mod cluster; pub mod kv; /// Lease type definitions pub mod lease; -/// Lock type definitions. -pub mod lock; /// Maintenance type definitions. pub mod maintenance; /// Watch type definitions. diff --git a/crates/xline-client/tests/it/lock.rs b/crates/xline-client/tests/it/lock.rs index 279b60bc5..897c69f1c 100644 --- a/crates/xline-client/tests/it/lock.rs +++ b/crates/xline-client/tests/it/lock.rs @@ -1,22 +1,17 @@ use std::time::Duration; use test_macros::abort_on_panic; -use xline_client::{ - error::Result, - types::lock::{LockRequest, UnlockRequest}, -}; +use xline_client::{clients::lock::Xutex, error::Result}; use super::common::get_cluster_client; #[tokio::test(flavor = "multi_thread")] async fn lock_unlock_should_success_in_normal_path() -> Result<()> { let (_cluster, client) = get_cluster_client().await.unwrap(); - let client = client.lock_client(); + let lock_client = client.lock_client(); + let mut xutex = Xutex::new(lock_client, "lock-test", None, None).await?; - let resp = client.lock(LockRequest::new("lock-test")).await?; - assert!(resp.key.starts_with(b"lock-test/")); - - client.unlock(UnlockRequest::new(resp.key)).await?; + assert!(xutex.lock_unsafe().await.is_ok()); Ok(()) } @@ -24,89 +19,55 @@ async fn lock_unlock_should_success_in_normal_path() -> Result<()> { #[abort_on_panic] async fn lock_contention_should_occur_when_acquire_by_two() -> Result<()> { let (_cluster, client) = get_cluster_client().await.unwrap(); - let client = client.lock_client(); - let client_c = client.clone(); + let lock_client = client.lock_client(); + let client_c = client.lock_client(); let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - let resp = client.lock(LockRequest::new("lock-test")).await.unwrap(); + let mut xutex_1 = Xutex::new(lock_client, "lock-test", None, None).await?; + let lock_1 = xutex_1.lock_unsafe().await.unwrap(); + + let mut xutex_2 = Xutex::new(client_c, "lock-test", None, None).await?; let handle = tokio::spawn(async move { - let res = tokio::time::timeout( - Duration::from_secs(2), - client_c.lock(LockRequest::new("lock-test")), - ) - .await; - assert!(res.is_err()); + let lock_result = tokio::time::timeout(Duration::from_secs(2), xutex_2.lock_unsafe()).await; + assert!(lock_result.is_err()); let _ignore = tx.send(()); - let res = tokio::time::timeout( - Duration::from_millis(200), - client_c.lock(LockRequest::new("lock-test")), - ) - .await; - assert!(res.is_ok_and(|r| r.is_ok_and(|resp| resp.key.starts_with(b"lock-test/")))); + let lock_result = + tokio::time::timeout(Duration::from_millis(200), xutex_2.lock_unsafe()).await; + assert!(lock_result.is_ok()); }); rx.recv().await.unwrap(); - let _resp = client.unlock(UnlockRequest::new(resp.key)).await.unwrap(); + std::mem::drop(lock_1); handle.await.unwrap(); Ok(()) } -#[tokio::test(flavor = "multi_thread")] -#[abort_on_panic] -async fn lock_should_timeout_when_ttl_is_set() -> Result<()> { - let (_cluster, client) = get_cluster_client().await.unwrap(); - let client = client.lock_client(); - - let _resp = client - .lock(LockRequest::new("lock-test").with_ttl(1)) - .await - .unwrap(); - - let resp = tokio::time::timeout( - Duration::from_secs(2), - client.lock(LockRequest::new("lock-test")), - ) - .await - .expect("timeout when trying to lock")?; - - assert!(resp.key.starts_with(b"lock-test/")); - - Ok(()) -} - #[tokio::test(flavor = "multi_thread")] #[abort_on_panic] async fn lock_should_unlock_after_cancelled() -> Result<()> { let (_cluster, client) = get_cluster_client().await.unwrap(); - let client = client.lock_client(); - let client_c = client.clone(); + let lock_client = client.lock_client(); + let client_c = lock_client.clone(); + let mut xutex_1 = Xutex::new(lock_client, "lock-test", None, None).await?; + let mut xutex_2 = Xutex::new(client_c, "lock-test", None, None).await?; // first acquire the lock - let resp = client.lock(LockRequest::new("lock-test")).await.unwrap(); + let lock_1 = xutex_1.lock_unsafe().await.unwrap(); // acquire the lock again and then cancel it - let res = tokio::time::timeout( - Duration::from_secs(1), - client_c.lock(LockRequest::new("lock-test")), - ) - .await; + let res = tokio::time::timeout(Duration::from_secs(1), xutex_2.lock_unsafe()).await; assert!(res.is_err()); // unlock the first one - client.unlock(UnlockRequest::new(resp.key)).await?; + std::mem::drop(lock_1); // try lock again, it should success - let resp = tokio::time::timeout( - Duration::from_secs(1), - client.lock(LockRequest::new("lock-test")), - ) - .await - .expect("timeout when trying to lock")?; - - assert!(resp.key.starts_with(b"lock-test/")); + let _session = tokio::time::timeout(Duration::from_secs(1), xutex_2.lock_unsafe()) + .await + .expect("timeout when trying to lock")?; Ok(()) } diff --git a/crates/xline-test-utils/Cargo.toml b/crates/xline-test-utils/Cargo.toml index 0b419f001..8c8c40e5d 100644 --- a/crates/xline-test-utils/Cargo.toml +++ b/crates/xline-test-utils/Cargo.toml @@ -11,8 +11,6 @@ license = "Apache-2.0" readme = "README.md" [dependencies] -anyhow = "1.0.83" -clap = { version = "4.5.7", features = ["derive"] } futures = "0.3.30" rand = "0.8.5" tokio = { version = "0.2.25", package = "madsim-tokio", features = [ diff --git a/crates/xline-test-utils/src/bin/validation_lock_client.rs b/crates/xline-test-utils/src/bin/validation_lock_client.rs deleted file mode 100644 index ab8f6ccec..000000000 --- a/crates/xline-test-utils/src/bin/validation_lock_client.rs +++ /dev/null @@ -1,87 +0,0 @@ -//! this binary is only used for the validation of lock service - -use anyhow::Result; -use clap::{Parser, Subcommand}; -use xline_client::{ - types::lock::{LockRequest, UnlockRequest}, - Client, ClientOptions, -}; - -#[derive(Parser, Debug, Clone, PartialEq, Eq)] -struct ClientArgs { - #[clap(short, long, value_delimiter = ',')] - endpoints: Vec, - #[clap(subcommand)] - command: Commands, -} - -/// Types of sub command -#[derive(Subcommand, Debug, Clone, PartialEq, Eq)] -pub enum Commands { - /// Lock args - Lock { - /// Lock name - #[clap(value_parser)] - name: String, - }, - /// UnLock args - Unlock { - /// Lock name - #[clap(value_parser)] - key: String, - }, -} - -#[tokio::main] -async fn main() -> Result<()> { - let args: ClientArgs = ClientArgs::parse(); - let endpoints = if args.endpoints.is_empty() { - vec!["http://127.0.0.1:2379".to_owned()] - } else { - args.endpoints - }; - let client = Client::connect(endpoints, ClientOptions::default()) - .await? - .lock_client(); - match args.command { - Commands::Lock { name } => { - let lock_res = client.lock(LockRequest::new(name)).await?; - println!("{}", String::from_utf8_lossy(&lock_res.key)) - } - Commands::Unlock { key } => { - let _unlock_res = client.unlock(UnlockRequest::new(key)).await?; - println!("unlock success"); - } - }; - Ok(()) -} - -#[cfg(test)] -mod test { - use super::*; - #[test] - fn it_works() { - let args: ClientArgs = ClientArgs::parse_from([ - "lock_client", - "--endpoints", - "http://127.0.0.1:1234", - "lock", - "test", - ]); - assert_eq!(args.endpoints, vec!["http://127.0.0.1:1234"]); - assert_eq!( - args.command, - Commands::Lock { - name: "test".to_owned() - } - ); - let args2: ClientArgs = ClientArgs::parse_from(["lock_client", "unlock", "test"]); - assert_eq!(args2.endpoints, Vec::::new()); - assert_eq!( - args2.command, - Commands::Unlock { - key: "test".to_owned() - } - ); - } -} diff --git a/crates/xline-test-utils/src/lib.rs b/crates/xline-test-utils/src/lib.rs index e1bfd24de..624b7f32b 100644 --- a/crates/xline-test-utils/src/lib.rs +++ b/crates/xline-test-utils/src/lib.rs @@ -18,7 +18,7 @@ use xline_client::types::auth::{ AuthRoleAddRequest, AuthRoleGrantPermissionRequest, AuthUserAddRequest, AuthUserGrantRoleRequest, Permission, PermissionType, }; -pub use xline_client::{types, Client, ClientOptions}; +pub use xline_client::{clients, types, Client, ClientOptions}; /// Cluster pub struct Cluster { diff --git a/crates/xline/tests/it/lock_test.rs b/crates/xline/tests/it/lock_test.rs index 97b17d6f4..d89231f03 100644 --- a/crates/xline/tests/it/lock_test.rs +++ b/crates/xline/tests/it/lock_test.rs @@ -1,14 +1,8 @@ use std::{error::Error, time::Duration}; use test_macros::abort_on_panic; -use tokio::time::{self, timeout}; -use xline_test_utils::{ - types::{ - lease::LeaseGrantRequest, - lock::{LockRequest, UnlockRequest}, - }, - Cluster, -}; +use tokio::time::{sleep, Instant}; +use xline_test_utils::{clients::Xutex, Cluster}; #[tokio::test(flavor = "multi_thread")] #[abort_on_panic] @@ -21,46 +15,20 @@ async fn test_lock() -> Result<(), Box> { let lock_handle = tokio::spawn({ let c = lock_client.clone(); async move { - let res = c.lock(LockRequest::new("test")).await.unwrap(); - time::sleep(Duration::from_secs(3)).await; - let _res = c.unlock(UnlockRequest::new(res.key)).await.unwrap(); + let mut xutex = Xutex::new(c, "test", None, None).await.unwrap(); + let _lock = xutex.lock_unsafe().await.unwrap(); + sleep(Duration::from_secs(3)).await; } }); - time::sleep(Duration::from_secs(1)).await; - let now = time::Instant::now(); - let res = lock_client.lock(LockRequest::new("test")).await?; + sleep(Duration::from_secs(1)).await; + let now = Instant::now(); + + let mut xutex = Xutex::new(lock_client, "test", None, None).await?; + let _lock = xutex.lock_unsafe().await?; let elapsed = now.elapsed(); - assert!(res.key.starts_with(b"test")); assert!(elapsed >= Duration::from_secs(1)); let _ignore = lock_handle.await; Ok(()) } - -#[tokio::test(flavor = "multi_thread")] -#[abort_on_panic] -async fn test_lock_timeout() -> Result<(), Box> { - let mut cluster = Cluster::new(3).await; - cluster.start().await; - let client = cluster.client().await; - let lock_client = client.lock_client(); - - let lease_id = client - .lease_client() - .grant(LeaseGrantRequest::new(1)) - .await? - .id; - let _res = lock_client - .lock(LockRequest::new("test").with_lease(lease_id)) - .await?; - - let res = timeout( - Duration::from_secs(3), - lock_client.lock(LockRequest::new("test")), - ) - .await??; - assert!(res.key.starts_with(b"test")); - - Ok(()) -} diff --git a/crates/xlinectl/src/command/lease/keep_alive.rs b/crates/xlinectl/src/command/lease/keep_alive.rs index 7e65c0339..67a208b21 100644 --- a/crates/xlinectl/src/command/lease/keep_alive.rs +++ b/crates/xlinectl/src/command/lease/keep_alive.rs @@ -44,7 +44,7 @@ pub(super) async fn execute(client: &mut Client, matches: &ArgMatches) -> Result } else { tokio::select! { _ = ctrl_c() => {} - result = keep_alive_loop(keeper, stream) => { + result = keep_alive_loop(keeper, stream, true) => { return result; } } @@ -57,11 +57,14 @@ pub(super) async fn execute(client: &mut Client, matches: &ArgMatches) -> Result async fn keep_alive_loop( mut keeper: LeaseKeeper, mut stream: Streaming, + verbose: bool, ) -> Result<()> { loop { keeper.keep_alive()?; if let Some(resp) = stream.message().await? { - resp.print(); + if verbose { + resp.print(); + } if resp.ttl < 0 { return Err(XlineClientError::InvalidArgs(String::from( "lease keepalive response has negative ttl", diff --git a/crates/xlinectl/src/command/lease/mod.rs b/crates/xlinectl/src/command/lease/mod.rs index 7a3e06fe0..ef0287644 100644 --- a/crates/xlinectl/src/command/lease/mod.rs +++ b/crates/xlinectl/src/command/lease/mod.rs @@ -6,7 +6,7 @@ use crate::handle_matches; /// `grant` command mod grant; /// `keep_alive` command -mod keep_alive; +pub(crate) mod keep_alive; /// `list` command mod list; /// `revoke` command diff --git a/crates/xlinectl/src/command/lock.rs b/crates/xlinectl/src/command/lock.rs index 219aae1f0..09214b1af 100644 --- a/crates/xlinectl/src/command/lock.rs +++ b/crates/xlinectl/src/command/lock.rs @@ -1,12 +1,6 @@ use clap::{arg, ArgMatches, Command}; use tokio::signal; -use xline_client::{ - error::Result, - types::lock::{LockRequest, UnlockRequest}, - Client, -}; - -use crate::utils::printer::Printer; +use xline_client::{clients::Xutex, error::Result, Client}; /// Definition of `lock` command pub(crate) fn command() -> Command { @@ -15,46 +9,15 @@ pub(crate) fn command() -> Command { .arg(arg!( "name of the lock")) } -/// Build request from matches -pub(crate) fn build_request(matches: &ArgMatches) -> LockRequest { - let name = matches.get_one::("lockname").expect("required"); - LockRequest::new(name.as_bytes()) -} - /// Execute the command pub(crate) async fn execute(client: &mut Client, matches: &ArgMatches) -> Result<()> { - let req = build_request(matches); - - let resp = client.lock_client().lock(req).await?; - - resp.print(); + let prefix = matches.get_one::("lockname").expect("required"); + let mut xutex = Xutex::new(client.lock_client(), prefix, None, None).await?; + let xutex_guard = xutex.lock_unsafe().await?; + println!("{}", xutex_guard.key()); signal::ctrl_c().await.expect("failed to listen for event"); - - println!("releasing the lock"); - - let unlock_req = UnlockRequest::new(resp.key); - let _unlock_resp = client.lock_client().unlock(unlock_req).await?; - + // let res = lock_resp.unlock().await; + println!("releasing the lock, "); Ok(()) } - -#[cfg(test)] -mod tests { - use super::*; - use crate::test_case_struct; - - test_case_struct!(LockRequest); - - #[test] - fn command_parse_should_be_valid() { - let test_cases = vec![TestCase::new( - vec!["lock", "my_lock"], - Some(LockRequest::new("my_lock")), - )]; - - for case in test_cases { - case.run_test(); - } - } -} diff --git a/scripts/validation_test.sh b/scripts/validation_test.sh index f53a8d9c7..1fda4cf67 100755 --- a/scripts/validation_test.sh +++ b/scripts/validation_test.sh @@ -5,9 +5,6 @@ source $DIR/log.sh QUICK_START="${DIR}/quick_start.sh" ETCDCTL="docker exec -i client etcdctl --endpoints=http://172.20.0.3:2379,http://172.20.0.4:2379" -LOCK_CLIENT="docker exec -i client /mnt/validation_lock_client --endpoints=http://172.20.0.3:2379,http://172.20.0.4:2379,http://172.20.0.5:2379" -export LOG_PATH=/var/log/xline -export LOG_LEVEL=debug bash ${QUICK_START} @@ -278,17 +275,6 @@ lock_validation() { log::info "lock validation test passed" } -lock_rpc_validation() { - log::info "lock rpc validation test running..." - - run "${LOCK_CLIENT} lock mutex" - check_positive "mutex.*" - run "${LOCK_CLIENT} unlock ${res}" - check_positive "unlock success" - - log::info "lock rpc validation test passed" -} - # validate maintenance requests maintenance_validation() { # snapshot save request only works on one endpoint @@ -341,6 +327,5 @@ watch_validation lease_validation auth_validation lock_validation -lock_rpc_validation maintenance_validation cluster_validation