Skip to content

Commit

Permalink
Merge pull request #207 from szarykott/async_source
Browse files Browse the repository at this point in the history
Add AsyncSource with tests, docs and examples
  • Loading branch information
matthiasbeyer committed Jul 10, 2021
2 parents 11602b0 + 33c6432 commit 74a0a80
Show file tree
Hide file tree
Showing 9 changed files with 516 additions and 38 deletions.
14 changes: 11 additions & 3 deletions .github/workflows/msrv.yml
Expand Up @@ -9,7 +9,7 @@ jobs:
strategy:
matrix:
rust:
- 1.44.0
- 1.46.0
- stable
- beta
- nightly
Expand Down Expand Up @@ -44,7 +44,7 @@ jobs:
strategy:
matrix:
rust:
- 1.44.0
- 1.46.0
- stable
- beta
- nightly
Expand All @@ -59,10 +59,18 @@ jobs:
override: true

- name: Run cargo test
if: matrix.rust != 'nightly'
if: matrix.rust != 'nightly' && matrix.rust != '1.46.0'
uses: actions-rs/cargo@v1
with:
command: test

- name: Run cargo test (nightly)
if: matrix.rust == '1.46.0'
continue-on-error: true
uses: actions-rs/cargo@v1
with:
command: test
args: --tests

- name: Run cargo test (nightly)
if: matrix.rust == 'nightly'
Expand Down
5 changes: 5 additions & 0 deletions Cargo.toml
Expand Up @@ -23,6 +23,7 @@ ini = ["rust-ini"]
json5 = ["json5_rs"]

[dependencies]
async-trait = "0.1.50"
lazy_static = "1.0"
serde = "1.0.8"
nom = "6"
Expand All @@ -39,3 +40,7 @@ json5_rs = { version = "0.3", optional = true, package = "json5" }
serde_derive = "1.0.8"
float-cmp = "0.8"
chrono = { version = "0.4", features = ["serde"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros", "fs", "io-util", "time"]}
warp = "0.3.1"
futures = "0.3.15"
reqwest = "0.11.3"
72 changes: 72 additions & 0 deletions examples/async_source/main.rs
@@ -0,0 +1,72 @@
use std::{collections::HashMap, error::Error};

use config::{builder::AsyncState, AsyncSource, ConfigBuilder, ConfigError, FileFormat};

use async_trait::async_trait;
use futures::{select, FutureExt};
use warp::Filter;

// Example below presents sample configuration server and client.
//
// Server serves simple configuration on HTTP endpoint.
// Client consumes it using custom HTTP AsyncSource built on top of reqwest.

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
select! {
r = run_server().fuse() => r,
r = run_client().fuse() => r
}
}

async fn run_server() -> Result<(), Box<dyn Error>> {
let service = warp::path("configuration").map(|| r#"{ "value" : 123 }"#);

println!("Running server on localhost:5001");

warp::serve(service).bind(([127, 0, 0, 1], 5001)).await;

Ok(())
}

async fn run_client() -> Result<(), Box<dyn Error>> {
// Good enough for an example to allow server to start
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;

let config = ConfigBuilder::<AsyncState>::default()
.add_async_source(HttpSource {
uri: "http://localhost:5001/configuration".into(),
format: FileFormat::Json,
})
.build()
.await?;

println!("Config value is {}", config.get::<String>("value")?);

Ok(())
}

// Actual implementation of AsyncSource can be found below

#[derive(Debug)]
struct HttpSource {
uri: String,
format: FileFormat,
}

#[async_trait]
impl AsyncSource for HttpSource {
async fn collect(&self) -> Result<HashMap<String, config::Value>, ConfigError> {
reqwest::get(&self.uri)
.await
.map_err(|e| ConfigError::Foreign(Box::new(e)))? // error conversion is possible from custom AsyncSource impls
.text()
.await
.map_err(|e| ConfigError::Foreign(Box::new(e)))
.and_then(|text| {
self.format
.parse(Some(&self.uri), &text)
.map_err(|e| ConfigError::Foreign(e))
})
}
}

0 comments on commit 74a0a80

Please sign in to comment.