Skip to content

Commit

Permalink
chore: fixup! resolve comments
Browse files Browse the repository at this point in the history
Signed-off-by: Phoeniix Zhao <Phoenix500526@163.com>
  • Loading branch information
Phoenix500526 committed May 23, 2024
1 parent b848a11 commit e0ddaf9
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 122 deletions.
10 changes: 2 additions & 8 deletions crates/xline-client/examples/lock.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use anyhow::Result;
use xline_client::{
clients::{Session, Xutex},
Client, ClientOptions,
};
use xline_client::{clients::Xutex, Client, ClientOptions};

#[tokio::main]
async fn main() -> Result<()> {
Expand All @@ -13,10 +10,7 @@ async fn main() -> Result<()> {
.await?
.lock_client();

// create a session
let session = Session::new(client).build().await?;

let mut xutex = Xutex::new(session, "lock-test");
let mut xutex = Xutex::new(client, "lock-test", None, None).await?;
// when the `xutex_guard` drop, the lock will be unlocked.
let xutex_guard = xutex.lock().await?;

Expand Down
141 changes: 57 additions & 84 deletions crates/xline-client/src/clients/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,85 +30,10 @@ pub struct Session {
client: LockClient,
/// lease id
lease_id: i64,
/// lease lifetime
ttl: i64,
/// `keep_alive` task will auto-renew the lease
keep_alive: Option<JoinHandle<Result<()>>>,
}

impl Session {
/// Creates a new session.
#[inline]
#[must_use]
pub fn new(client: LockClient) -> Self {
Self {
client,
lease_id: 0,
ttl: DEFAULT_SESSION_TTL,
keep_alive: None,
}
}

/// Set session TTL.
#[inline]
#[must_use]
pub const fn with_ttl(mut self, ttl: i64) -> Self {
self.ttl = ttl;
self
}

/// Set session lease.
#[inline]
#[must_use]
pub const fn with_lease(mut self, lease_id: i64) -> Self {
self.lease_id = lease_id;
self
}

/// Crate an auto-renew session.
///
/// # Errors
/// return errors when lease client grant failed
#[inline]
pub async fn build(mut self) -> Result<Self> {
let ttl = self.ttl;
self.lease_id = if self.lease_id == 0 {
let lease_response = self
.client
.lease_client
.grant(LeaseGrantRequest::new(ttl))
.await?;
lease_response.id
} else {
self.lease_id
};
let mut lease_client = self.client.lease_client.clone();
let lease_id = self.lease_id;
self.keep_alive = Some(tokio::spawn(async move {
/// The renew interval factor of which value equals 60% of one second.
const RENEW_INTERVAL_FACTOR: u64 = 600;
let (mut keeper, mut stream) = lease_client
.keep_alive(LeaseKeepAliveRequest::new(lease_id))
.await?;
loop {
keeper.keep_alive()?;
if let Some(resp) = stream.message().await? {
if resp.ttl < 0 {
return Err(XlineClientError::InvalidArgs(String::from(
"lease keepalive response has negative ttl",
)));
}
sleep(Duration::from_millis(
resp.ttl.unsigned_abs().overflow_mul(RENEW_INTERVAL_FACTOR),
))
.await;
}
}
}));
Ok(self)
}
}

impl Drop for Session {
#[inline]
fn drop(&mut self) {
Expand Down Expand Up @@ -171,20 +96,69 @@ impl AsyncDrop for XutexGuard {

impl Xutex {
/// Create an Xutex
///
/// # Errors
///
/// Return errors when the lease client failed to grant a lease
#[inline]
#[must_use]
pub fn new(session: Session, prefix: &str) -> Self {
Self {
pub async fn new(
client: LockClient,
prefix: &str,
ttl: Option<i64>,
lease_id: Option<i64>,
) -> Result<Self> {
let ttl = ttl.unwrap_or(DEFAULT_SESSION_TTL);
let lease_id = if let Some(id) = lease_id {
id
} else {
let lease_response = client
.lease_client
.grant(LeaseGrantRequest::new(ttl))
.await?;
lease_response.id
};
let mut lease_client = client.lease_client.clone();
let keep_alive = Some(tokio::spawn(async move {
/// The renew interval factor of which value equals 60% of one second.
const RENEW_INTERVAL_FACTOR: u64 = 600;
let (mut keeper, mut stream) = lease_client
.keep_alive(LeaseKeepAliveRequest::new(lease_id))
.await?;
loop {
keeper.keep_alive()?;
if let Some(resp) = stream.message().await? {
if resp.ttl < 0 {
return Err(XlineClientError::InvalidArgs(String::from(
"lease keepalive response has negative ttl",
)));
}
sleep(Duration::from_millis(
resp.ttl.unsigned_abs().overflow_mul(RENEW_INTERVAL_FACTOR),
))
.await;
}
}
}));

let session = Session {
client,
lease_id,
keep_alive,
};

Ok(Self {
session,
prefix: format!("{prefix}/"),
key: String::new(),
rev: -1,
header: None,
}
})
}

/// try to acquire lock
async fn try_acquire(&mut self, prefix: String, lease_id: i64) -> Result<TxnResponse> {
async fn try_acquire(&mut self) -> Result<TxnResponse> {
let lease_id = self.session.lease_id;
let prefix = self.prefix.as_str();
self.key = format!("{prefix}{lease_id:x}");
#[allow(clippy::as_conversions)] // this cast is always safe
let cmp = Compare {
Expand Down Expand Up @@ -233,6 +207,7 @@ impl Xutex {
.revision()
} else {
#[allow(clippy::indexing_slicing)]
// it's safe to do since the txn response must have two responses.
if let Some(Response::ResponseRange(ref res)) = resp.responses[0].response {
res.kvs[0].create_revision
} else {
Expand Down Expand Up @@ -260,7 +235,7 @@ impl Xutex {
/// # Examples
///
/// ```no_run
/// use xline_client::{
/// use xline_client::
/// clients::{Session, Xutex},
/// types::lock::{LockRequest, UnlockRequest},
/// Client, ClientOptions,
Expand Down Expand Up @@ -288,9 +263,7 @@ impl Xutex {
/// ```
#[inline]
pub async fn lock(&mut self) -> Result<AsyncDropper<XutexGuard>> {
let resp = self
.try_acquire(self.prefix.clone(), self.session.lease_id)
.await?;
let resp = self.try_acquire().await?;
#[allow(clippy::indexing_slicing)]
if let Some(Response::ResponseRange(ref lock_owner)) = resp.responses[1].response {
// if no key on prefix or the key is created by current Xutex, that indicates we have already held the lock
Expand Down
20 changes: 6 additions & 14 deletions crates/xline-client/tests/it/lock.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
use std::time::Duration;

use test_macros::abort_on_panic;
use xline_client::{
clients::lock::{Session, Xutex},
error::Result,
};
use xline_client::{clients::lock::Xutex, error::Result};

use super::common::get_cluster_client;

#[tokio::test(flavor = "multi_thread")]
async fn lock_unlock_should_success_in_normal_path() -> Result<()> {
let (_cluster, client) = get_cluster_client().await.unwrap();
let lock_client = client.lock_client();
let session = Session::new(lock_client).build().await?;
let mut xutex = Xutex::new(session, "lock-test");
let mut xutex = Xutex::new(lock_client, "lock-test", None, None).await?;

assert!(xutex.lock().await.is_ok());
Ok(())
Expand All @@ -27,12 +23,10 @@ async fn lock_contention_should_occur_when_acquire_by_two() -> Result<()> {
let client_c = client.lock_client();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();

let session_1 = Session::new(lock_client).build().await?;
let mut xutex_1 = Xutex::new(session_1, "lock-test");
let mut xutex_1 = Xutex::new(lock_client, "lock-test", None, None).await?;
let lock_1 = xutex_1.lock().await.unwrap();

let session_2 = Session::new(client_c).build().await?;
let mut xutex_2 = Xutex::new(session_2, "lock-test");
let mut xutex_2 = Xutex::new(client_c, "lock-test", None, None).await?;

let handle = tokio::spawn(async move {
let lock_result = tokio::time::timeout(Duration::from_secs(2), xutex_2.lock()).await;
Expand All @@ -57,10 +51,8 @@ async fn lock_should_unlock_after_cancelled() -> Result<()> {
let (_cluster, client) = get_cluster_client().await.unwrap();
let lock_client = client.lock_client();
let client_c = lock_client.clone();
let session_1 = Session::new(lock_client).build().await?;
let mut xutex_1 = Xutex::new(session_1, "lock-test");
let session_2 = Session::new(client_c).build().await?;
let mut xutex_2 = Xutex::new(session_2, "lock-test");
let mut xutex_1 = Xutex::new(lock_client, "lock-test", None, None).await?;
let mut xutex_2 = Xutex::new(client_c, "lock-test", None, None).await?;
// first acquire the lock
let lock_1 = xutex_1.lock().await.unwrap();

Expand Down
11 changes: 3 additions & 8 deletions crates/xline/tests/it/lock_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ use std::{error::Error, time::Duration};

use test_macros::abort_on_panic;
use tokio::time::{sleep, Instant};
use xline_test_utils::{
clients::{Session, Xutex},
Cluster,
};
use xline_test_utils::{clients::Xutex, Cluster};

#[tokio::test(flavor = "multi_thread")]
#[abort_on_panic]
Expand All @@ -18,8 +15,7 @@ async fn test_lock() -> Result<(), Box<dyn Error>> {
let lock_handle = tokio::spawn({
let c = lock_client.clone();
async move {
let session = Session::new(c).build().await.unwrap();
let mut xutex = Xutex::new(session, "test");
let mut xutex = Xutex::new(c, "test", None, None).await.unwrap();
let _lock = xutex.lock().await.unwrap();
sleep(Duration::from_secs(3)).await;
}
Expand All @@ -28,8 +24,7 @@ async fn test_lock() -> Result<(), Box<dyn Error>> {
sleep(Duration::from_secs(1)).await;
let now = Instant::now();

let session = Session::new(lock_client).build().await?;
let mut xutex = Xutex::new(session, "test");
let mut xutex = Xutex::new(lock_client, "test", None, None).await?;
let _lock = xutex.lock().await?;
let elapsed = now.elapsed();
assert!(elapsed >= Duration::from_secs(1));
Expand Down
2 changes: 1 addition & 1 deletion crates/xlinectl/src/command/lease/keep_alive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub(super) async fn execute(client: &mut Client, matches: &ArgMatches) -> Result
}

/// keep alive forever unless encounter error
pub(crate) async fn keep_alive_loop(
async fn keep_alive_loop(
mut keeper: LeaseKeeper,
mut stream: Streaming<LeaseKeepAliveResponse>,
verbose: bool,
Expand Down
9 changes: 2 additions & 7 deletions crates/xlinectl/src/command/lock.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
use clap::{arg, ArgMatches, Command};
use tokio::signal;
use xline_client::{
clients::{Session, Xutex},
error::Result,
Client,
};
use xline_client::{clients::Xutex, error::Result, Client};

/// Definition of `lock` command
pub(crate) fn command() -> Command {
Expand All @@ -16,8 +12,7 @@ pub(crate) fn command() -> Command {
/// Execute the command
pub(crate) async fn execute(client: &mut Client, matches: &ArgMatches) -> Result<()> {
let prefix = matches.get_one::<String>("lockname").expect("required");
let session = Session::new(client.lock_client()).build().await?;
let mut xutex = Xutex::new(session, prefix);
let mut xutex = Xutex::new(client.lock_client(), prefix, None, None).await?;

let xutex_guard = xutex.lock().await?;
println!("{}", xutex_guard.key());
Expand Down

0 comments on commit e0ddaf9

Please sign in to comment.