Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add keep alive logic for xlinectl lock command #666

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 12 additions & 22 deletions crates/xline-client/src/clients/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ use xlineapi::{
};

use crate::{
clients::{lease::LeaseClient, watch::WatchClient},
clients::watch::WatchClient,
error::{Result, XlineClientError},
lease_gen::LeaseIdGenerator,
types::{
lease::LeaseGrantRequest,
lock::{LockRequest, UnlockRequest},
watch::WatchRequest,
},
Expand All @@ -35,8 +33,6 @@ use crate::{
pub struct LockClient {
/// The client running the CURP protocol, communicate with all servers.
curp_client: Arc<CurpClient>,
/// The lease client
lease_client: LeaseClient,
/// The watch client
watch_client: WatchClient,
/// Auth token
Expand All @@ -47,7 +43,6 @@ impl Debug for LockClient {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LockClient")
.field("lease_client", &self.lease_client)
.field("watch_client", &self.watch_client)
.field("token", &self.token)
.finish()
Expand All @@ -59,15 +54,9 @@ impl Debug for LockClient {
impl LockClient {
/// Creates a new `LockClient`
#[inline]
pub fn new(
curp_client: Arc<CurpClient>,
channel: Channel,
token: Option<String>,
id_gen: Arc<LeaseIdGenerator>,
) -> Self {
pub fn new(curp_client: Arc<CurpClient>, channel: Channel, token: Option<String>) -> Self {
Self {
curp_client: Arc::clone(&curp_client),
lease_client: LeaseClient::new(curp_client, channel.clone(), token.clone(), id_gen),
curp_client,
watch_client: WatchClient::new(channel, token.clone()),
token,
}
Expand All @@ -84,6 +73,10 @@ impl LockClient {
///
/// This function will return an error if the inner CURP client encountered a propose failure
///
/// # Panics
///
/// Panic if the given `LockRequest.inner.lease` less than or equal to 0
///
bsbds marked this conversation as resolved.
Show resolved Hide resolved
/// # Examples
///
/// ```no_run
Expand Down Expand Up @@ -116,14 +109,11 @@ impl LockClient {
/// ```
#[inline]
pub async fn lock(&self, request: LockRequest) -> Result<LockResponse> {
let mut lease_id = request.inner.lease;
if lease_id == 0 {
let resp = self
.lease_client
.grant(LeaseGrantRequest::new(request.ttl))
.await?;
lease_id = resp.id;
}
assert!(
request.inner.lease > 0,
"The LockRequest.lease_id should larger than 0"
);
let lease_id = request.inner.lease;
let prefix = format!(
"{}/",
String::from_utf8_lossy(&request.inner.name).into_owned()
Expand Down
7 changes: 1 addition & 6 deletions crates/xline-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,7 @@ impl Client {
token.clone(),
Arc::clone(&id_gen),
);
let lock = LockClient::new(
Arc::clone(&curp_client),
channel.clone(),
token.clone(),
id_gen,
);
let lock = LockClient::new(Arc::clone(&curp_client), channel.clone(), token.clone());
let auth = AuthClient::new(curp_client, channel.clone(), token.clone());
let maintenance = MaintenanceClient::new(channel.clone(), token.clone());
let cluster = ClusterClient::new(channel.clone(), token.clone());
Expand Down
2 changes: 1 addition & 1 deletion crates/xline-client/src/types/lock.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub use xlineapi::{LockResponse, UnlockResponse};

/// Default session ttl
const DEFAULT_SESSION_TTL: i64 = 60;
pub const DEFAULT_SESSION_TTL: i64 = 60;

/// Request for `Lock`
#[derive(Debug, PartialEq)]
Expand Down
76 changes: 56 additions & 20 deletions crates/xline-client/tests/it/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,81 @@ use std::time::Duration;

use test_macros::abort_on_panic;
use xline_client::{
clients::LeaseClient,
error::Result,
types::lock::{LockRequest, UnlockRequest},
types::{
lease::LeaseGrantRequest,
lock::{LockRequest, UnlockRequest, DEFAULT_SESSION_TTL},
},
};

use super::common::get_cluster_client;

async fn gen_lease_id(client: LeaseClient, ttl: i64) -> i64 {
client
.grant(LeaseGrantRequest::new(ttl))
.await
.expect("grant lease should be success")
.id
}

#[tokio::test(flavor = "multi_thread")]
async fn lock_unlock_should_success_in_normal_path() -> Result<()> {
let (_cluster, client) = get_cluster_client().await.unwrap();
let client = client.lock_client();
let lock_client = client.lock_client();
let lease_id = gen_lease_id(client.lease_client(), DEFAULT_SESSION_TTL).await;

let resp = client.lock(LockRequest::new("lock-test")).await?;
let resp = lock_client
.lock(LockRequest::new("lock-test").with_lease(lease_id))
.await?;
assert!(resp.key.starts_with(b"lock-test/"));

client.unlock(UnlockRequest::new(resp.key)).await?;
lock_client.unlock(UnlockRequest::new(resp.key)).await?;
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
#[abort_on_panic]
async fn lock_contention_should_occur_when_acquire_by_two() -> Result<()> {
let (_cluster, client) = get_cluster_client().await.unwrap();
let client = client.lock_client();
let lock_client = client.lock_client();
let lease_id_1 = gen_lease_id(client.lease_client(), DEFAULT_SESSION_TTL).await;
let lease_id_2 = gen_lease_id(client.lease_client(), DEFAULT_SESSION_TTL).await;

let client_c = client.clone();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();

let resp = client.lock(LockRequest::new("lock-test")).await.unwrap();
let resp = lock_client
.lock(LockRequest::new("lock-test").with_lease(lease_id_1))
.await
.unwrap();

let handle = tokio::spawn(async move {
let res = tokio::time::timeout(
Duration::from_secs(2),
client_c.lock(LockRequest::new("lock-test")),
client_c
.lock_client()
.lock(LockRequest::new("lock-test").with_lease(lease_id_2)),
)
.await;
assert!(res.is_err());
let _ignore = tx.send(());

let lease_id_3 = gen_lease_id(client_c.lease_client(), DEFAULT_SESSION_TTL).await;
let res = tokio::time::timeout(
Duration::from_millis(200),
client_c.lock(LockRequest::new("lock-test")),
client_c
.lock_client()
.lock(LockRequest::new("lock-test").with_lease(lease_id_3)),
)
.await;
assert!(res.is_ok_and(|r| r.is_ok_and(|resp| resp.key.starts_with(b"lock-test/"))));
});

rx.recv().await.unwrap();
let _resp = client.unlock(UnlockRequest::new(resp.key)).await.unwrap();
let _resp = lock_client
.unlock(UnlockRequest::new(resp.key))
.await
.unwrap();

handle.await.unwrap();

Expand All @@ -59,16 +87,18 @@ async fn lock_contention_should_occur_when_acquire_by_two() -> Result<()> {
#[abort_on_panic]
async fn lock_should_timeout_when_ttl_is_set() -> Result<()> {
let (_cluster, client) = get_cluster_client().await.unwrap();
let client = client.lock_client();
let lock_client = client.lock_client();
let lease_id_1 = gen_lease_id(client.lease_client(), 1).await;

let _resp = client
.lock(LockRequest::new("lock-test").with_ttl(1))
let _resp = lock_client
.lock(LockRequest::new("lock-test").with_lease(lease_id_1))
.await
.unwrap();

let lease_id_2 = gen_lease_id(client.lease_client(), DEFAULT_SESSION_TTL).await;
let resp = tokio::time::timeout(
Duration::from_secs(2),
client.lock(LockRequest::new("lock-test")),
lock_client.lock(LockRequest::new("lock-test").with_lease(lease_id_2)),
)
.await
.expect("timeout when trying to lock")?;
Expand All @@ -82,26 +112,32 @@ async fn lock_should_timeout_when_ttl_is_set() -> Result<()> {
#[abort_on_panic]
async fn lock_should_unlock_after_cancelled() -> Result<()> {
let (_cluster, client) = get_cluster_client().await.unwrap();
let client = client.lock_client();
let client_c = client.clone();
let lock_client = client.lock_client();
let client_c = lock_client.clone();
let lease_id_1 = gen_lease_id(client.lease_client(), DEFAULT_SESSION_TTL).await;
let lease_id_2 = gen_lease_id(client.lease_client(), DEFAULT_SESSION_TTL).await;
let lease_id_3 = gen_lease_id(client.lease_client(), DEFAULT_SESSION_TTL).await;
// first acquire the lock
let resp = client.lock(LockRequest::new("lock-test")).await.unwrap();
let resp = lock_client
.lock(LockRequest::new("lock-test").with_lease(lease_id_1))
.await
.unwrap();

// acquire the lock again and then cancel it
let res = tokio::time::timeout(
Duration::from_secs(1),
client_c.lock(LockRequest::new("lock-test")),
client_c.lock(LockRequest::new("lock-test").with_lease(lease_id_2)),
)
.await;
assert!(res.is_err());

// unlock the first one
client.unlock(UnlockRequest::new(resp.key)).await?;
lock_client.unlock(UnlockRequest::new(resp.key)).await?;

// try lock again, it should success
let resp = tokio::time::timeout(
Duration::from_secs(1),
client.lock(LockRequest::new("lock-test")),
lock_client.lock(LockRequest::new("lock-test").with_lease(lease_id_3)),
)
.await
.expect("timeout when trying to lock")?;
Expand Down
20 changes: 15 additions & 5 deletions crates/xline-test-utils/src/bin/validation_lock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
use anyhow::Result;
use clap::{Parser, Subcommand};
use xline_client::{
types::lock::{LockRequest, UnlockRequest},
types::{
lease::LeaseGrantRequest,
lock::{LockRequest, UnlockRequest, DEFAULT_SESSION_TTL},
},
Client, ClientOptions,
};

Expand Down Expand Up @@ -40,16 +43,23 @@ async fn main() -> Result<()> {
} else {
args.endpoints
};
let client = Client::connect(endpoints, ClientOptions::default())
let client = Client::connect(endpoints, ClientOptions::default()).await?;
let lock_client = client.lock_client();
let lease_client = client.lease_client();
let lease_id = lease_client
.grant(LeaseGrantRequest::new(DEFAULT_SESSION_TTL))
.await?
.lock_client();
.id;

match args.command {
Commands::Lock { name } => {
let lock_res = client.lock(LockRequest::new(name)).await?;
let lock_res = lock_client
.lock(LockRequest::new(name).with_lease(lease_id))
.await?;
println!("{}", String::from_utf8_lossy(&lock_res.key))
}
Commands::Unlock { key } => {
let _unlock_res = client.unlock(UnlockRequest::new(key)).await?;
let _unlock_res = lock_client.unlock(UnlockRequest::new(key)).await?;
println!("unlock success");
}
};
Expand Down
32 changes: 26 additions & 6 deletions crates/xline/tests/it/lock_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use tokio::time::{self, timeout};
use xline_test_utils::{
types::{
lease::LeaseGrantRequest,
lock::{LockRequest, UnlockRequest},
lock::{LockRequest, UnlockRequest, DEFAULT_SESSION_TTL},
},
Cluster,
};
Expand All @@ -17,19 +17,34 @@ async fn test_lock() -> Result<(), Box<dyn Error>> {
cluster.start().await;
let client = cluster.client().await;
let lock_client = client.lock_client();
let lease_id_1 = client
.lease_client()
.grant(LeaseGrantRequest::new(DEFAULT_SESSION_TTL))
.await?
.id;

let lock_handle = tokio::spawn({
let c = lock_client.clone();
async move {
let res = c.lock(LockRequest::new("test")).await.unwrap();
let res = c
.lock(LockRequest::new("test").with_lease(lease_id_1))
.await
.unwrap();
time::sleep(Duration::from_secs(3)).await;
let _res = c.unlock(UnlockRequest::new(res.key)).await.unwrap();
}
});

time::sleep(Duration::from_secs(1)).await;
let now = time::Instant::now();
let res = lock_client.lock(LockRequest::new("test")).await?;
let lease_id_2 = client
.lease_client()
.grant(LeaseGrantRequest::new(DEFAULT_SESSION_TTL))
.await?
.id;
let res = lock_client
.lock(LockRequest::new("test").with_lease(lease_id_2))
.await?;
let elapsed = now.elapsed();
assert!(res.key.starts_with(b"test"));
assert!(elapsed >= Duration::from_secs(1));
Expand All @@ -46,18 +61,23 @@ async fn test_lock_timeout() -> Result<(), Box<dyn Error>> {
let client = cluster.client().await;
let lock_client = client.lock_client();

let lease_id = client
let lease_id_1 = client
.lease_client()
.grant(LeaseGrantRequest::new(1))
.await?
.id;
let _res = lock_client
.lock(LockRequest::new("test").with_lease(lease_id))
.lock(LockRequest::new("test").with_lease(lease_id_1))
.await?;
let lease_id_2 = client
.lease_client()
.grant(LeaseGrantRequest::new(DEFAULT_SESSION_TTL))
.await?
.id;

let res = timeout(
Duration::from_secs(3),
lock_client.lock(LockRequest::new("test")),
lock_client.lock(LockRequest::new("test").with_lease(lease_id_2)),
)
.await??;
assert!(res.key.starts_with(b"test"));
Expand Down
9 changes: 6 additions & 3 deletions crates/xlinectl/src/command/lease/keep_alive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub(super) async fn execute(client: &mut Client, matches: &ArgMatches) -> Result
} else {
tokio::select! {
_ = ctrl_c() => {}
result = keep_alive_loop(keeper, stream) => {
result = keep_alive_loop(keeper, stream, true) => {
return result;
}
}
Expand All @@ -54,14 +54,17 @@ pub(super) async fn execute(client: &mut Client, matches: &ArgMatches) -> Result
}

/// keep alive forever unless encounter error
async fn keep_alive_loop(
pub(crate) async fn keep_alive_loop(
mut keeper: LeaseKeeper,
mut stream: Streaming<LeaseKeepAliveResponse>,
verbose: bool,
) -> Result<()> {
loop {
keeper.keep_alive()?;
if let Some(resp) = stream.message().await? {
resp.print();
if verbose {
resp.print();
}
if resp.ttl < 0 {
return Err(XlineClientError::InvalidArgs(String::from(
"lease keepalive response has negative ttl",
Expand Down
Loading
Loading