Skip to content

Commit

Permalink
RS-261: Add each_n and each_s query parameters (#11)
Browse files Browse the repository at this point in the history
* add each_n and each_s query parameters

* add tests

* test different api

* fix feature list

* update README
  • Loading branch information
atimin committed May 17, 2024
1 parent 46f21da commit c42de00
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 19 deletions.
12 changes: 10 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ jobs:
sdk_tests:
name: Client SDK Tests
runs-on: ubuntu-latest
strategy:
matrix:
reductstore_version: [ "main", "latest" ]
include:
- reductstore_version: main
features: "default test-api-110"
- reductstore_version: latest
features: "default"
needs:
- rust_fmt
steps:
Expand All @@ -42,9 +50,9 @@ jobs:
- name: Run Database
run: docker run --network=host -v ${PWD}/misc:/misc
--env RS_API_TOKEN=TOKEN
--env RS_LICENSE_PATH=/misc/lic.key -d ${{env.REGISTRY_IMAGE}}
--env RS_LICENSE_PATH=/misc/lic.key -d reduct/store:${{ matrix.reductstore_version }}
- name: Run Client SDK tests
run: RS_API_TOKEN=TOKEN cargo test -- --test-threads=1
run: RS_API_TOKEN=TOKEN cargo test --features ${{ matrix.features }} -- --test-threads=1

sdk_examples:
name: Client SDK Examples
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- RS-261: Add `each_n` and `each_s` query parameters, [PR-11](https://github.com/reductstore/reduct-rs/pull/11)

### Changed

- Use IntoBody and `IntoBytes to write data, [PR-10](https://github.com/reductstore/reduct-rs/pull/10)
Expand Down
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ readme = "README.md"
keywords = ["database", "time-series", "client", "sdk", "reductstore"]
categories = ["database"]

[features]
default = []
test-api-110 = [] # Test API 1.10


[lib]
crate-type = ["lib"]
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ database for unstructured data.

## Features

* Supports the [ReductStore HTTP API v1.8](https://reduct.store/docs/http-api)
* Supports the [ReductStore HTTP API v1.10](https://reduct.store/docs/http-api)
* Built on top of [reqwest](https://github.com/seanmonstar/reqwest)
* Asynchronous API

Expand Down
30 changes: 30 additions & 0 deletions src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,36 @@ mod tests {
assert!(query.next().await.is_none());
}

#[rstest]
#[tokio::test]
#[cfg_attr(not(feature = "test-api-110"), ignore)]
async fn test_query_each_second(#[future] bucket: Bucket) {
let bucket: Bucket = bucket.await;
let query = bucket.query("entry-2").each_s(0.002).send().await.unwrap();

pin_mut!(query);
let rec = query.next().await.unwrap().unwrap();
assert_eq!(rec.timestamp_us(), 2000);
let rec = query.next().await.unwrap().unwrap();
assert_eq!(rec.timestamp_us(), 4000);
assert!(query.next().await.is_none());
}

#[rstest]
#[tokio::test]
#[cfg_attr(not(feature = "test-api-110"), ignore)]
async fn test_query_each_minute(#[future] bucket: Bucket) {
let bucket: Bucket = bucket.await;
let query = bucket.query("entry-2").each_n(2).send().await.unwrap();

pin_mut!(query);
let rec = query.next().await.unwrap().unwrap();
assert_eq!(rec.timestamp_us(), 2000);
let rec = query.next().await.unwrap().unwrap();
assert_eq!(rec.timestamp_us(), 4000);
assert!(query.next().await.is_none());
}

#[rstest]
#[tokio::test]
async fn test_limit_query(#[future] bucket: Bucket) {
Expand Down
15 changes: 15 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,21 @@ pub(crate) mod tests {
.await
.unwrap();

bucket
.write_record("entry-2")
.timestamp_us(3000)
.data("0")
.send()
.await
.unwrap();
bucket
.write_record("entry-2")
.timestamp_us(4000)
.data("0")
.send()
.await
.unwrap();

let bucket = client
.create_bucket("test-bucket-2")
.settings(bucket_settings)
Expand Down
63 changes: 49 additions & 14 deletions src/record/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ pub struct QueryBuilder {
stop: Option<u64>,
include: Option<Labels>,
exclude: Option<Labels>,
each_s: Option<f64>,
each_n: Option<u64>,
limit: Option<u64>,

ttl: Option<Duration>,
continuous: bool,
head_only: bool,
limit: Option<u64>,

bucket: String,
entry: String,
Expand All @@ -44,10 +47,13 @@ impl QueryBuilder {
stop: None,
include: None,
exclude: None,
each_s: None,
each_n: None,
limit: None,

ttl: None,
continuous: false,
head_only: false,
limit: None,

bucket,
entry,
Expand Down Expand Up @@ -117,6 +123,27 @@ impl QueryBuilder {
self
}

/// Set S, to return a record every S seconds.
/// default: return all records
pub fn each_s(mut self, each_s: f64) -> Self {
self.each_s = Some(each_s);
self
}

/// Set N, to return every N records.
/// default: return all records
pub fn each_n(mut self, each_n: u64) -> Self {
self.each_n = Some(each_n);
self
}

/// Set a limit for the query.
/// default: unlimited
pub fn limit(mut self, limit: u64) -> Self {
self.limit = Some(limit);
self
}

/// Set TTL for the query.
pub fn ttl(mut self, ttl: Duration) -> Self {
self.ttl = Some(ttl);
Expand All @@ -136,27 +163,20 @@ impl QueryBuilder {
self
}

/// Set a limit for the query.
/// default: unlimited
pub fn limit(mut self, limit: u64) -> Self {
self.limit = Some(limit);
self
}

/// Send the query request.
pub async fn send(
self,
) -> Result<impl Stream<Item = Result<Record, ReductError>>, ReductError> {
let mut url = format!("/b/{}/{}/q?", self.bucket, self.entry);
// filter parameters
if let Some(start) = self.start {
url.push_str(&format!("start={}", start));
}

if let Some(stop) = self.stop {
url.push_str(&format!("&stop={}", stop));
}
if let Some(ttl) = self.ttl {
url.push_str(&format!("&ttl={}", ttl.as_secs()));
}

if let Some(include) = self.include {
for (key, value) in include {
url.push_str(&format!("&include-{}={}", key, value));
Expand All @@ -167,13 +187,28 @@ impl QueryBuilder {
url.push_str(&format!("&exclude-{}={}", key, value));
}
}
if self.continuous {
url.push_str("&continuous=true");

if let Some(each_s) = self.each_s {
url.push_str(&format!("&each_s={}", each_s));
}

if let Some(each_n) = self.each_n {
url.push_str(&format!("&each_n={}", each_n));
}

if let Some(limit) = self.limit {
url.push_str(&format!("&limit={}", limit));
}

// control parameters
if self.continuous {
url.push_str("&continuous=true");
}

if let Some(ttl) = self.ttl {
url.push_str(&format!("&ttl={}", ttl.as_secs()));
}

let response = self
.client
.send_and_receive_json::<(), QueryInfo>(Method::GET, &url, None)
Expand Down
4 changes: 2 additions & 2 deletions src/record/write_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ impl WriteRecordBuilder {
/// # Arguments
///
/// * `data` - The data to write.
pub fn data<B>(mut self, data: B) -> Self
pub fn data<D>(mut self, data: D) -> Self
where
B: Into<Body>,
D: Into<Body>,
{
self.data = Some(data.into());
self
Expand Down

0 comments on commit c42de00

Please sign in to comment.