diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c4ad43b..72234b6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -33,6 +33,14 @@ jobs: sdk_tests: name: Client SDK Tests runs-on: ubuntu-latest + strategy: + matrix: + reductstore_version: [ "main", "latest" ] + include: + - reductstore_version: main + features: "default test-api-110" + - reductstore_version: latest + features: "default" needs: - rust_fmt steps: @@ -42,9 +50,9 @@ jobs: - name: Run Database run: docker run --network=host -v ${PWD}/misc:/misc --env RS_API_TOKEN=TOKEN - --env RS_LICENSE_PATH=/misc/lic.key -d ${{env.REGISTRY_IMAGE}} + --env RS_LICENSE_PATH=/misc/lic.key -d reduct/store:${{ matrix.reductstore_version }} - name: Run Client SDK tests - run: RS_API_TOKEN=TOKEN cargo test -- --test-threads=1 + run: RS_API_TOKEN=TOKEN cargo test --features ${{ matrix.features }} -- --test-threads=1 sdk_examples: name: Client SDK Examples diff --git a/CHANGELOG.md b/CHANGELOG.md index b16aa54..421d706 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- RS-261: Add `each_n` and `each_s` query parameters, [PR-11](https://github.com/reductstore/reduct-rs/pull/11) + ### Changed - Use IntoBody and `IntoBytes to write data, [PR-10](https://github.com/reductstore/reduct-rs/pull/10) diff --git a/Cargo.toml b/Cargo.toml index 9e25344..ff416ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,10 @@ readme = "README.md" keywords = ["database", "time-series", "client", "sdk", "reductstore"] categories = ["database"] +[features] +default = [] +test-api-110 = [] # Test API 1.10 + [lib] crate-type = ["lib"] diff --git a/README.md b/README.md index 3e7ba61..92a344c 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ database for unstructured data. ## Features -* Supports the [ReductStore HTTP API v1.8](https://reduct.store/docs/http-api) +* Supports the [ReductStore HTTP API v1.10](https://reduct.store/docs/http-api) * Built on top of [reqwest](https://github.com/seanmonstar/reqwest) * Asynchronous API diff --git a/src/bucket.rs b/src/bucket.rs index 8a1ca87..d5dc83b 100644 --- a/src/bucket.rs +++ b/src/bucket.rs @@ -503,6 +503,36 @@ mod tests { assert!(query.next().await.is_none()); } + #[rstest] + #[tokio::test] + #[cfg_attr(not(feature = "test-api-110"), ignore)] + async fn test_query_each_second(#[future] bucket: Bucket) { + let bucket: Bucket = bucket.await; + let query = bucket.query("entry-2").each_s(0.002).send().await.unwrap(); + + pin_mut!(query); + let rec = query.next().await.unwrap().unwrap(); + assert_eq!(rec.timestamp_us(), 2000); + let rec = query.next().await.unwrap().unwrap(); + assert_eq!(rec.timestamp_us(), 4000); + assert!(query.next().await.is_none()); + } + + #[rstest] + #[tokio::test] + #[cfg_attr(not(feature = "test-api-110"), ignore)] + async fn test_query_each_minute(#[future] bucket: Bucket) { + let bucket: Bucket = bucket.await; + let query = bucket.query("entry-2").each_n(2).send().await.unwrap(); + + pin_mut!(query); + let rec = query.next().await.unwrap().unwrap(); + assert_eq!(rec.timestamp_us(), 2000); + let rec = query.next().await.unwrap().unwrap(); + assert_eq!(rec.timestamp_us(), 4000); + assert!(query.next().await.is_none()); + } + #[rstest] #[tokio::test] async fn test_limit_query(#[future] bucket: Bucket) { diff --git a/src/client.rs b/src/client.rs index cadf0fb..32e390e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -730,6 +730,21 @@ pub(crate) mod tests { .await .unwrap(); + bucket + .write_record("entry-2") + .timestamp_us(3000) + .data("0") + .send() + .await + .unwrap(); + bucket + .write_record("entry-2") + .timestamp_us(4000) + .data("0") + .send() + .await + .unwrap(); + let bucket = client .create_bucket("test-bucket-2") .settings(bucket_settings) diff --git a/src/record/query.rs b/src/record/query.rs index 4a8d3d1..9b69be1 100644 --- a/src/record/query.rs +++ b/src/record/query.rs @@ -27,10 +27,13 @@ pub struct QueryBuilder { stop: Option, include: Option, exclude: Option, + each_s: Option, + each_n: Option, + limit: Option, + ttl: Option, continuous: bool, head_only: bool, - limit: Option, bucket: String, entry: String, @@ -44,10 +47,13 @@ impl QueryBuilder { stop: None, include: None, exclude: None, + each_s: None, + each_n: None, + limit: None, + ttl: None, continuous: false, head_only: false, - limit: None, bucket, entry, @@ -117,6 +123,27 @@ impl QueryBuilder { self } + /// Set S, to return a record every S seconds. + /// default: return all records + pub fn each_s(mut self, each_s: f64) -> Self { + self.each_s = Some(each_s); + self + } + + /// Set N, to return every N records. + /// default: return all records + pub fn each_n(mut self, each_n: u64) -> Self { + self.each_n = Some(each_n); + self + } + + /// Set a limit for the query. + /// default: unlimited + pub fn limit(mut self, limit: u64) -> Self { + self.limit = Some(limit); + self + } + /// Set TTL for the query. pub fn ttl(mut self, ttl: Duration) -> Self { self.ttl = Some(ttl); @@ -136,27 +163,20 @@ impl QueryBuilder { self } - /// Set a limit for the query. - /// default: unlimited - pub fn limit(mut self, limit: u64) -> Self { - self.limit = Some(limit); - self - } - /// Send the query request. pub async fn send( self, ) -> Result>, ReductError> { let mut url = format!("/b/{}/{}/q?", self.bucket, self.entry); + // filter parameters if let Some(start) = self.start { url.push_str(&format!("start={}", start)); } + if let Some(stop) = self.stop { url.push_str(&format!("&stop={}", stop)); } - if let Some(ttl) = self.ttl { - url.push_str(&format!("&ttl={}", ttl.as_secs())); - } + if let Some(include) = self.include { for (key, value) in include { url.push_str(&format!("&include-{}={}", key, value)); @@ -167,13 +187,28 @@ impl QueryBuilder { url.push_str(&format!("&exclude-{}={}", key, value)); } } - if self.continuous { - url.push_str("&continuous=true"); + + if let Some(each_s) = self.each_s { + url.push_str(&format!("&each_s={}", each_s)); } + + if let Some(each_n) = self.each_n { + url.push_str(&format!("&each_n={}", each_n)); + } + if let Some(limit) = self.limit { url.push_str(&format!("&limit={}", limit)); } + // control parameters + if self.continuous { + url.push_str("&continuous=true"); + } + + if let Some(ttl) = self.ttl { + url.push_str(&format!("&ttl={}", ttl.as_secs())); + } + let response = self .client .send_and_receive_json::<(), QueryInfo>(Method::GET, &url, None) diff --git a/src/record/write_record.rs b/src/record/write_record.rs index 1be2f0b..3141e63 100644 --- a/src/record/write_record.rs +++ b/src/record/write_record.rs @@ -84,9 +84,9 @@ impl WriteRecordBuilder { /// # Arguments /// /// * `data` - The data to write. - pub fn data(mut self, data: B) -> Self + pub fn data(mut self, data: D) -> Self where - B: Into, + D: Into, { self.data = Some(data.into()); self