Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RS-101: Support Replication API in Client SDK #391

Merged
merged 5 commits into from Jan 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- reduct-cli: `bucket` command to manage buckets, [PR-367](https://github.com/reductstore/reductstore/pull/367)
- reductstore: Data replication, [PR-377](https://github.com/reductstore/reductstore/pull/377)
- reductstore: CRUD API and diagnostics for replication, [PR-380](https://github.com/reductstore/reductstore/pull/380)
- reduct-rs: Implement replication API, [PR-391](https://github.com/reductstore/reductstore/pull/391)

### Fixed

Expand Down
2 changes: 1 addition & 1 deletion reduct_base/src/msg/replication_api.rs
Expand Up @@ -51,7 +51,7 @@ pub struct ReplicationList {

/// Replication settings
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct ReplicationFullInfo {
pub struct FullReplicationInfo {
/// Info
pub info: ReplicationInfo,
/// Settings
Expand Down
253 changes: 248 additions & 5 deletions reduct_rs/src/client.rs
@@ -1,4 +1,4 @@
// Copyright 2023 ReductStore
// Copyright 2023-2024 ReductStore
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
Expand All @@ -13,6 +13,11 @@ use crate::http_client::HttpClient;
use crate::Bucket;
use reduct_base::error::{ErrorCode, ReductError};

use reduct_base::msg::replication_api::{
FullReplicationInfo, ReplicationInfo, ReplicationList, ReplicationSettings,
};

use crate::replication::ReplicationBuilder;
use reduct_base::msg::server_api::{BucketInfoList, ServerInfo};
use reduct_base::msg::token_api::{Permissions, Token, TokenCreateResponse, TokenList};

Expand Down Expand Up @@ -140,7 +145,7 @@ impl ReductClient {
/// The server info
pub async fn server_info(&self) -> Result<ServerInfo> {
self.http_client
.send_and_receive_json::<(), ServerInfo>(reqwest::Method::GET, "/info", None)
.send_and_receive_json::<(), ServerInfo>(Method::GET, "/info", None)
.await
}

Expand Down Expand Up @@ -205,7 +210,7 @@ impl ReductClient {
/// The token or HttpError
pub async fn me(&self) -> Result<Token> {
self.http_client
.send_and_receive_json::<(), Token>(reqwest::Method::GET, "/me", None)
.send_and_receive_json::<(), Token>(Method::GET, "/me", None)
.await
}

Expand All @@ -223,7 +228,7 @@ impl ReductClient {
let token = self
.http_client
.send_and_receive_json::<Permissions, TokenCreateResponse>(
reqwest::Method::POST,
Method::POST,
&format!("/tokens/{}", name),
Some(permissions),
)
Expand Down Expand Up @@ -256,10 +261,117 @@ impl ReductClient {
pub async fn list_tokens(&self) -> Result<Vec<Token>> {
let list = self
.http_client
.send_and_receive_json::<(), TokenList>(reqwest::Method::GET, "/tokens", None)
.send_and_receive_json::<(), TokenList>(Method::GET, "/tokens", None)
.await?;
Ok(list.tokens)
}

/// Get list of replications
///
/// # Returns
///
/// The list of replications or an error
pub async fn list_replications(&self) -> Result<Vec<ReplicationInfo>> {
let list = self
.http_client
.send_and_receive_json::<(), ReplicationList>(Method::GET, "/replications", None)
.await?;
Ok(list.replications)
}

/// Get full replication info
///
/// # Arguments
///
/// * `name` - The name of the replication
///
/// # Returns
///
/// The replication info or an error
pub async fn get_replication(&self, name: &str) -> Result<FullReplicationInfo> {
let info = self
.http_client
.send_and_receive_json::<(), FullReplicationInfo>(
Method::GET,
&format!("/replications/{}", name),
None,
)
.await?;
Ok(info)
}

/// Create a replication
///
/// # Arguments
///
/// * `name` - The name of the replication
///
/// # Returns
///
/// a replication builder to set the replication settings
///
/// # Example
///
/// ```no_run
/// use reduct_rs::ReductClient;
///
/// #[tokio::main]
/// async fn main() {
/// let client = ReductClient::builder()
/// .url("http://127.0.0.1:8383")
/// .api_token("my-api-token")
/// .build();
///
/// client.create_replication("test-replication")
/// .src_bucket("test-bucket-1")
/// .dst_bucket("test-bucket-2")
/// .dst_host("https://play.reduct.store")
/// .dst_token("reductstore")
/// .send()
/// .await
/// .unwrap();
/// }
///
pub fn create_replication(&self, name: &str) -> ReplicationBuilder {
ReplicationBuilder::new(name.to_string(), Arc::clone(&self.http_client))
}

/// Update a replication
///
/// # Arguments
///
/// * `name` - The name of the replication
/// * `settings` - The replication settings
///
/// # Returns
///
/// Ok if the replication was updated, otherwise an error
pub async fn update_replication(
&self,
name: &str,
settings: ReplicationSettings,
) -> Result<()> {
self.http_client
.send_json(Method::PUT, &format!("/replications/{}", name), settings)
.await
}

/// Delete a replication
///
/// # Arguments
///
/// * `name` - The name of the replication
///
/// # Returns
///
/// Ok if the replication was deleted, otherwise an error
pub async fn delete_replication(&self, name: &str) -> Result<()> {
let request = self
.http_client
.request(Method::DELETE, &format!("/replications/{}", name));
self.http_client.send_request(request).await?;
Ok(())
}
}

#[cfg(test)]
Expand Down Expand Up @@ -377,6 +489,131 @@ pub(crate) mod tests {
}
}

mod replication_api {
use super::*;
use reduct_base::msg::diagnostics::Diagnostics;
use reduct_base::msg::replication_api::ReplicationSettings;

#[rstest]
#[tokio::test]
async fn test_list_replications(#[future] client: ReductClient) {
let replications = client.await.list_replications().await.unwrap();
assert!(replications.is_empty());
}

#[rstest]
#[tokio::test]
async fn test_create_replication(
#[future] client: ReductClient,
settings: ReplicationSettings,
) {
let client = client.await;
client
.create_replication("test-replication")
.src_bucket(settings.src_bucket.as_str())
.dst_bucket(settings.dst_bucket.as_str())
.dst_host(settings.dst_host.as_str())
.dst_token(settings.dst_token.as_str())
.entries(settings.entries.clone())
.include(settings.include.clone())
.exclude(settings.exclude.clone())
.send()
.await
.unwrap();
let replications = client.list_replications().await.unwrap();
assert_eq!(replications.len(), 1);
}

#[rstest]
#[tokio::test]
async fn test_get_replication(
#[future] client: ReductClient,
settings: ReplicationSettings,
) {
let client = client.await;
client
.create_replication("test-replication")
.set_settings(settings.clone())
.send()
.await
.unwrap();
let replication = client.get_replication("test-replication").await.unwrap();
assert_eq!(
replication.info,
ReplicationInfo {
name: "test-replication".to_string(),
is_active: false,
is_provisioned: false,
pending_records: 0,
}
);

assert_eq!(
replication.settings,
ReplicationSettings {
dst_token: "***".to_string(),
..settings
}
);
assert_eq!(replication.diagnostics, Diagnostics::default());
}

#[rstest]
#[tokio::test]
async fn test_update_replication(
#[future] client: ReductClient,
settings: ReplicationSettings,
) {
let client = client.await;
client
.create_replication("test-replication")
.set_settings(settings.clone())
.send()
.await
.unwrap();
let replication = client.get_replication("test-replication").await.unwrap();

assert_eq!(
replication.settings,
ReplicationSettings {
dst_token: "***".to_string(),
..settings
}
);
}

#[rstest]
#[tokio::test]
async fn test_delete_replication(
#[future] client: ReductClient,
settings: ReplicationSettings,
) {
let client = client.await;
client
.create_replication("test-replication")
.set_settings(settings.clone())
.send()
.await
.unwrap();
client.delete_replication("test-replication").await.unwrap();
let replications = client.list_replications().await.unwrap();
assert!(replications.is_empty());
}

#[fixture]
fn settings() -> ReplicationSettings {
ReplicationSettings {
src_bucket: "test-bucket-1".to_string(),
dst_bucket: "test-bucket-2".to_string(),
dst_host: "http://127.0.0.1:8383".to_string(),
dst_token: std::env::var("RS_API_TOKEN").unwrap_or("".to_string()),
entries: vec![],
include: Labels::default(),
exclude: Labels::default(),
}
}
}

#[fixture]
pub(crate) fn bucket_settings() -> BucketSettings {
BucketSettings {
Expand Down Expand Up @@ -407,6 +644,12 @@ pub(crate) mod tests {
}
}

for replication in client.list_replications().await.unwrap() {
if replication.name.starts_with("test-replication") {
client.delete_replication(&replication.name).await.unwrap();
}
}

let bucket = client
.create_bucket("test-bucket-1")
.settings(bucket_settings.clone())
Expand Down
1 change: 1 addition & 0 deletions reduct_rs/src/lib.rs
Expand Up @@ -7,6 +7,7 @@ mod bucket;
mod client;
mod http_client;
mod record;
mod replication;

pub use bucket::Bucket;
pub use client::ReductClient;
Expand Down