Skip to content

Commit

Permalink
Merge 0c174c6 into 58da7c4
Browse files Browse the repository at this point in the history
  • Loading branch information
Licenser committed May 10, 2022
2 parents 58da7c4 + 0c174c6 commit d4ccf07
Show file tree
Hide file tree
Showing 15 changed files with 288 additions and 11 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,5 @@ tremor-script/current_counterexample.eqc
/docs
/tremor-script/docs
lalrpop-docgen
.vscode/configurationCache.log
.vscode/dryrun.log
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## Unreleased

### Fixes
- http headers to allow strings and arrays

## [0.12.0-rc.2]

### New features
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ aws-smithy-http = "0.41"
# gcp
googapis = { version = "0.6", default-features = false, features = [
"google-pubsub-v1",
"google-cloud-bigquery-storage-v1"
"google-cloud-bigquery-storage-v1",
] }
gouth = { version = "0.2" }
http = "0.2.7"
Expand Down
10 changes: 8 additions & 2 deletions src/connectors/impls/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,15 @@ use crate::connectors::utils::tls::{tls_client_config, TLSClientConfig};
const CONNECTOR_TYPE: &str = "http_client";
const DEFAULT_CODEC: &str = "json";

#[derive(Deserialize, Debug, Clone)]
#[serde(transparent)]
pub(super) struct Header(
#[serde(with = "either::serde_untagged")] pub(super) Either<Vec<String>, String>,
);

#[derive(Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct Config {
pub(crate) struct Config {
/// Target URL
#[serde(default = "Default::default")]
pub(super) url: Url,
Expand All @@ -48,7 +54,7 @@ pub struct Config {
pub(super) concurrency: usize,
/// Default HTTP headers
#[serde(default = "Default::default")]
pub(super) headers: HashMap<String, Vec<String>>,
pub(super) headers: HashMap<String, Header>,
/// Default HTTP method
#[serde(default = "default_method")]
pub(super) method: Method,
Expand Down
46 changes: 43 additions & 3 deletions src/connectors/impls/http/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use super::client;
use super::utils::{FixedBodyReader, RequestId, StreamingBodyReader};
use crate::connectors::{prelude::*, utils::mime::MimeCodecMap};
use async_std::channel::{unbounded, Sender};
use either::Either;
use http_types::headers::HeaderValues;
use http_types::Response;
use http_types::{
Expand Down Expand Up @@ -49,7 +50,7 @@ impl HttpRequestBuilder {
meta: Option<&Value>,
codec_map: &MimeCodecMap,
config: &client::Config,
configured_codec: &String,
configured_codec: &str,
) -> Result<Self> {
let request_meta = meta.get("request");
let method = if let Some(method_v) = request_meta.get("method") {
Expand All @@ -75,8 +76,15 @@ impl HttpRequestBuilder {

// first insert config headers
for (config_header_name, config_header_values) in &config.headers {
for header_value in config_header_values {
request.append_header(config_header_name.as_str(), header_value.as_str());
match &config_header_values.0 {
Either::Left(config_header_values) => {
for header_value in config_header_values {
request.append_header(config_header_name.as_str(), header_value.as_str());
}
}
Either::Right(header_value) => {
request.append_header(config_header_name.as_str(), header_value.as_str());
}
}
}
// build headers
Expand Down Expand Up @@ -312,3 +320,35 @@ pub(super) fn extract_response_meta(response: &Response) -> Value<'static> {
.map(|version| meta.try_insert("version", version.to_string()));
meta
}

#[cfg(test)]
mod test {
use super::*;
#[async_std::test]
async fn builder() -> Result<()> {
let request_id = RequestId::new(42);
let meta = None;
let codec_map = MimeCodecMap::default();
let c = literal!({"headers": {
"cake": ["black forst", "cheese"],
"pie": "key lime"
}});
let mut s = EventSerializer::new(
None,
CodecReq::Optional("json"),
vec![],
&ConnectorType("http".into()),
"http",
)?;
let config = client::Config::new(&c)?;
let configured_codec = "json";

let mut b =
HttpRequestBuilder::new(request_id, meta, &codec_map, &config, configured_codec)?;

let r = b.finalize(&mut s).await?.unwrap();
assert_eq!(r.header("pie").unwrap().iter().count(), 1);
assert_eq!(r.header("cake").unwrap().iter().count(), 2);
Ok(())
}
}
4 changes: 2 additions & 2 deletions src/connectors/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ pub(crate) fn builder(
) -> Result<SinkManagerBuilder> {
// resolve codec and processors
let postprocessor_configs = config.postprocessors.clone().unwrap_or_default();
let serializer = EventSerializer::build(
let serializer = EventSerializer::new(
config.codec.clone(),
connector_codec_requirement,
postprocessor_configs,
Expand Down Expand Up @@ -440,7 +440,7 @@ pub(crate) struct EventSerializer {
}

impl EventSerializer {
fn build(
pub(crate) fn new(
codec_config: Option<CodecConfig>,
default_codec: CodecReq,
postprocessor_configs: Vec<PostprocessorConfig>,
Expand Down
4 changes: 2 additions & 2 deletions src/connectors/sink/concurrency_cap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ pub(crate) struct ConcurrencyCap {
}

impl ConcurrencyCap {
pub(crate) fn new(max: usize, reply_tx: Sender<AsyncSinkReply>) -> Self {
pub(crate) fn new(cap: usize, reply_tx: Sender<AsyncSinkReply>) -> Self {
Self {
cap: max,
cap,
reply_tx,
counter: Arc::new(AtomicUsize::new(0)),
}
Expand Down
2 changes: 1 addition & 1 deletion src/connectors/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ where
Control::Continue
}
CbAction::Open => {
debug!("{ctx} Circuit Breaker: Open.");
info!("{ctx} Circuit Breaker: Open.");
self.cb_open_received += 1;
ctx.swallow_err(self.source.on_cb_open(ctx).await, "on_cb_open failed");
// avoid a race condition where the necessary start routine wasnt executed
Expand Down

0 comments on commit d4ccf07

Please sign in to comment.