Skip to content

Commit

Permalink
fix: add keep alive logic for xlinectl lock command
Browse files Browse the repository at this point in the history
Closes: #664
Signed-off-by: Phoeniix Zhao <Phoenix500526@163.com>
  • Loading branch information
Phoenix500526 committed Mar 1, 2024
1 parent a44f710 commit 2369dcb
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 78 deletions.
34 changes: 12 additions & 22 deletions crates/xline-client/src/clients/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ use xlineapi::{
};

use crate::{
clients::{lease::LeaseClient, watch::WatchClient},
clients::watch::WatchClient,
error::{Result, XlineClientError},
lease_gen::LeaseIdGenerator,
types::{
lease::LeaseGrantRequest,
lock::{LockRequest, UnlockRequest},
watch::WatchRequest,
},
Expand All @@ -35,8 +33,6 @@ use crate::{
pub struct LockClient {
/// The client running the CURP protocol, communicate with all servers.
curp_client: Arc<CurpClient>,
/// The lease client
lease_client: LeaseClient,
/// The watch client
watch_client: WatchClient,
/// Auth token
Expand All @@ -47,7 +43,6 @@ impl Debug for LockClient {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LockClient")
.field("lease_client", &self.lease_client)
.field("watch_client", &self.watch_client)
.field("token", &self.token)
.finish()
Expand All @@ -59,15 +54,9 @@ impl Debug for LockClient {
impl LockClient {
/// Creates a new `LockClient`
#[inline]
pub fn new(
curp_client: Arc<CurpClient>,
channel: Channel,
token: Option<String>,
id_gen: Arc<LeaseIdGenerator>,
) -> Self {
pub fn new(curp_client: Arc<CurpClient>, channel: Channel, token: Option<String>) -> Self {
Self {
curp_client: Arc::clone(&curp_client),
lease_client: LeaseClient::new(curp_client, channel.clone(), token.clone(), id_gen),
curp_client,
watch_client: WatchClient::new(channel, token.clone()),
token,
}
Expand All @@ -84,6 +73,10 @@ impl LockClient {
///
/// This function will return an error if the inner CURP client encountered a propose failure
///
/// # Panics
///
/// Panic if the given `LockRequest.inner.lease` less than or equal to 0
///
/// # Examples
///
/// ```no_run
Expand Down Expand Up @@ -116,14 +109,11 @@ impl LockClient {
/// ```
#[inline]
pub async fn lock(&self, request: LockRequest) -> Result<LockResponse> {
let mut lease_id = request.inner.lease;
if lease_id == 0 {
let resp = self
.lease_client
.grant(LeaseGrantRequest::new(request.ttl))
.await?;
lease_id = resp.id;
}
assert!(
request.inner.lease > 0,
"The LockRequest.lease_id should larger than 0"
);
let lease_id = request.inner.lease;
let prefix = format!(
"{}/",
String::from_utf8_lossy(&request.inner.name).into_owned()
Expand Down
7 changes: 1 addition & 6 deletions crates/xline-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,12 +261,7 @@ impl Client {
token.clone(),
Arc::clone(&id_gen),
);
let lock = LockClient::new(
Arc::clone(&curp_client),
channel.clone(),
token.clone(),
id_gen,
);
let lock = LockClient::new(Arc::clone(&curp_client), channel.clone(), token.clone());
let auth = AuthClient::new(curp_client, channel.clone(), token.clone());
let maintenance = MaintenanceClient::new(channel.clone(), token.clone());
let cluster = ClusterClient::new(channel.clone(), token.clone());
Expand Down
2 changes: 1 addition & 1 deletion crates/xline-client/src/types/lock.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub use xlineapi::{LockResponse, UnlockResponse};

/// Default session ttl
const DEFAULT_SESSION_TTL: i64 = 60;
pub const DEFAULT_SESSION_TTL: i64 = 60;

/// Request for `Lock`
#[derive(Debug, PartialEq)]
Expand Down
76 changes: 56 additions & 20 deletions crates/xline-client/tests/it/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,81 @@ use std::time::Duration;

use test_macros::abort_on_panic;
use xline_client::{
clients::LeaseClient,
error::Result,
types::lock::{LockRequest, UnlockRequest},
types::{
lease::LeaseGrantRequest,
lock::{LockRequest, UnlockRequest, DEFAULT_SESSION_TTL},
},
};

use super::common::get_cluster_client;

async fn gen_lease_id(client: LeaseClient, ttl: i64) -> i64 {
client
.grant(LeaseGrantRequest::new(ttl))
.await
.expect("grant lease should be success")
.id
}

#[tokio::test(flavor = "multi_thread")]
async fn lock_unlock_should_success_in_normal_path() -> Result<()> {
let (_cluster, client) = get_cluster_client().await.unwrap();
let client = client.lock_client();
let lock_client = client.lock_client();
let lease_id = gen_lease_id(client.lease_client(), DEFAULT_SESSION_TTL).await;

let resp = client.lock(LockRequest::new("lock-test")).await?;
let resp = lock_client
.lock(LockRequest::new("lock-test").with_lease(lease_id))
.await?;
assert!(resp.key.starts_with(b"lock-test/"));

client.unlock(UnlockRequest::new(resp.key)).await?;
lock_client.unlock(UnlockRequest::new(resp.key)).await?;
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
#[abort_on_panic]
async fn lock_contention_should_occur_when_acquire_by_two() -> Result<()> {
let (_cluster, client) = get_cluster_client().await.unwrap();
let client = client.lock_client();
let lock_client = client.lock_client();
let lease_id_1 = gen_lease_id(client.lease_client(), DEFAULT_SESSION_TTL).await;
let lease_id_2 = gen_lease_id(client.lease_client(), DEFAULT_SESSION_TTL).await;

let client_c = client.clone();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();

let resp = client.lock(LockRequest::new("lock-test")).await.unwrap();
let resp = lock_client
.lock(LockRequest::new("lock-test").with_lease(lease_id_1))
.await
.unwrap();

let handle = tokio::spawn(async move {
let res = tokio::time::timeout(
Duration::from_secs(2),
client_c.lock(LockRequest::new("lock-test")),
client_c
.lock_client()
.lock(LockRequest::new("lock-test").with_lease(lease_id_2)),
)
.await;
assert!(res.is_err());
let _ignore = tx.send(());

let lease_id_3 = gen_lease_id(client_c.lease_client(), DEFAULT_SESSION_TTL).await;
let res = tokio::time::timeout(
Duration::from_millis(200),
client_c.lock(LockRequest::new("lock-test")),
client_c
.lock_client()
.lock(LockRequest::new("lock-test").with_lease(lease_id_3)),
)
.await;
assert!(res.is_ok_and(|r| r.is_ok_and(|resp| resp.key.starts_with(b"lock-test/"))));
});

rx.recv().await.unwrap();
let _resp = client.unlock(UnlockRequest::new(resp.key)).await.unwrap();
let _resp = lock_client
.unlock(UnlockRequest::new(resp.key))
.await
.unwrap();

handle.await.unwrap();

Expand All @@ -59,16 +87,18 @@ async fn lock_contention_should_occur_when_acquire_by_two() -> Result<()> {
#[abort_on_panic]
async fn lock_should_timeout_when_ttl_is_set() -> Result<()> {
let (_cluster, client) = get_cluster_client().await.unwrap();
let client = client.lock_client();
let lock_client = client.lock_client();
let lease_id_1 = gen_lease_id(client.lease_client(), 1).await;

let _resp = client
.lock(LockRequest::new("lock-test").with_ttl(1))
let _resp = lock_client
.lock(LockRequest::new("lock-test").with_lease(lease_id_1))
.await
.unwrap();

let lease_id_2 = gen_lease_id(client.lease_client(), DEFAULT_SESSION_TTL).await;
let resp = tokio::time::timeout(
Duration::from_secs(2),
client.lock(LockRequest::new("lock-test")),
lock_client.lock(LockRequest::new("lock-test").with_lease(lease_id_2)),
)
.await
.expect("timeout when trying to lock")?;
Expand All @@ -82,26 +112,32 @@ async fn lock_should_timeout_when_ttl_is_set() -> Result<()> {
#[abort_on_panic]
async fn lock_should_unlock_after_cancelled() -> Result<()> {
let (_cluster, client) = get_cluster_client().await.unwrap();
let client = client.lock_client();
let client_c = client.clone();
let lock_client = client.lock_client();
let client_c = lock_client.clone();
let lease_id_1 = gen_lease_id(client.lease_client(), DEFAULT_SESSION_TTL).await;
let lease_id_2 = gen_lease_id(client.lease_client(), DEFAULT_SESSION_TTL).await;
let lease_id_3 = gen_lease_id(client.lease_client(), DEFAULT_SESSION_TTL).await;
// first acquire the lock
let resp = client.lock(LockRequest::new("lock-test")).await.unwrap();
let resp = lock_client
.lock(LockRequest::new("lock-test").with_lease(lease_id_1))
.await
.unwrap();

// acquire the lock again and then cancel it
let res = tokio::time::timeout(
Duration::from_secs(1),
client_c.lock(LockRequest::new("lock-test")),
client_c.lock(LockRequest::new("lock-test").with_lease(lease_id_2)),
)
.await;
assert!(res.is_err());

// unlock the first one
client.unlock(UnlockRequest::new(resp.key)).await?;
lock_client.unlock(UnlockRequest::new(resp.key)).await?;

// try lock again, it should success
let resp = tokio::time::timeout(
Duration::from_secs(1),
client.lock(LockRequest::new("lock-test")),
lock_client.lock(LockRequest::new("lock-test").with_lease(lease_id_3)),
)
.await
.expect("timeout when trying to lock")?;
Expand Down
20 changes: 15 additions & 5 deletions crates/xline-test-utils/src/bin/validation_lock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
use anyhow::Result;
use clap::{Parser, Subcommand};
use xline_client::{
types::lock::{LockRequest, UnlockRequest},
types::{
lease::LeaseGrantRequest,
lock::{LockRequest, UnlockRequest, DEFAULT_SESSION_TTL},
},
Client, ClientOptions,
};

Expand Down Expand Up @@ -40,16 +43,23 @@ async fn main() -> Result<()> {
} else {
args.endpoints
};
let client = Client::connect(endpoints, ClientOptions::default())
let client = Client::connect(endpoints, ClientOptions::default()).await?;
let lock_client = client.lock_client();
let lease_client = client.lease_client();
let lease_id = lease_client
.grant(LeaseGrantRequest::new(DEFAULT_SESSION_TTL))
.await?
.lock_client();
.id;

match args.command {
Commands::Lock { name } => {
let lock_res = client.lock(LockRequest::new(name)).await?;
let lock_res = lock_client
.lock(LockRequest::new(name).with_lease(lease_id))
.await?;
println!("{}", String::from_utf8_lossy(&lock_res.key))
}
Commands::Unlock { key } => {
let _unlock_res = client.unlock(UnlockRequest::new(key)).await?;
let _unlock_res = lock_client.unlock(UnlockRequest::new(key)).await?;
println!("unlock success");
}
};
Expand Down
32 changes: 26 additions & 6 deletions crates/xline/tests/it/lock_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use tokio::time::{self, timeout};
use xline_test_utils::{
types::{
lease::LeaseGrantRequest,
lock::{LockRequest, UnlockRequest},
lock::{LockRequest, UnlockRequest, DEFAULT_SESSION_TTL},
},
Cluster,
};
Expand All @@ -17,19 +17,34 @@ async fn test_lock() -> Result<(), Box<dyn Error>> {
cluster.start().await;
let client = cluster.client().await;
let lock_client = client.lock_client();
let lease_id_1 = client
.lease_client()
.grant(LeaseGrantRequest::new(DEFAULT_SESSION_TTL))
.await?
.id;

let lock_handle = tokio::spawn({
let c = lock_client.clone();
async move {
let res = c.lock(LockRequest::new("test")).await.unwrap();
let res = c
.lock(LockRequest::new("test").with_lease(lease_id_1))
.await
.unwrap();
time::sleep(Duration::from_secs(3)).await;
let _res = c.unlock(UnlockRequest::new(res.key)).await.unwrap();
}
});

time::sleep(Duration::from_secs(1)).await;
let now = time::Instant::now();
let res = lock_client.lock(LockRequest::new("test")).await?;
let lease_id_2 = client
.lease_client()
.grant(LeaseGrantRequest::new(DEFAULT_SESSION_TTL))
.await?
.id;
let res = lock_client
.lock(LockRequest::new("test").with_lease(lease_id_2))
.await?;
let elapsed = now.elapsed();
assert!(res.key.starts_with(b"test"));
assert!(elapsed >= Duration::from_secs(1));
Expand All @@ -46,18 +61,23 @@ async fn test_lock_timeout() -> Result<(), Box<dyn Error>> {
let client = cluster.client().await;
let lock_client = client.lock_client();

let lease_id = client
let lease_id_1 = client
.lease_client()
.grant(LeaseGrantRequest::new(1))
.await?
.id;
let _res = lock_client
.lock(LockRequest::new("test").with_lease(lease_id))
.lock(LockRequest::new("test").with_lease(lease_id_1))
.await?;
let lease_id_2 = client
.lease_client()
.grant(LeaseGrantRequest::new(DEFAULT_SESSION_TTL))
.await?
.id;

let res = timeout(
Duration::from_secs(3),
lock_client.lock(LockRequest::new("test")),
lock_client.lock(LockRequest::new("test").with_lease(lease_id_2)),
)
.await??;
assert!(res.key.starts_with(b"test"));
Expand Down
9 changes: 6 additions & 3 deletions crates/xlinectl/src/command/lease/keep_alive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub(super) async fn execute(client: &mut Client, matches: &ArgMatches) -> Result
} else {
tokio::select! {
_ = ctrl_c() => {}
result = keep_alive_loop(keeper, stream) => {
result = keep_alive_loop(keeper, stream, true) => {
return result;
}
}
Expand All @@ -53,14 +53,17 @@ pub(super) async fn execute(client: &mut Client, matches: &ArgMatches) -> Result
}

/// keep alive forever unless encounter error
async fn keep_alive_loop(
pub(crate) async fn keep_alive_loop(
mut keeper: LeaseKeeper,
mut stream: Streaming<LeaseKeepAliveResponse>,
verbose: bool,
) -> Result<()> {
loop {
keeper.keep_alive()?;
if let Some(resp) = stream.message().await? {
resp.print();
if verbose {
resp.print();
}
if resp.ttl < 0 {
return Err(XlineClientError::InvalidArgs(String::from(
"lease keepalive response has negative ttl",
Expand Down
Loading

0 comments on commit 2369dcb

Please sign in to comment.