Skip to content

Commit

Permalink
Merge ec79225 into 6043f58
Browse files Browse the repository at this point in the history
  • Loading branch information
mfelsche committed May 9, 2022
2 parents 6043f58 + ec79225 commit 738b3e8
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 13 deletions.
1 change: 1 addition & 0 deletions docker/config/docker.troy
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ flow

define connector metronome from metronome
with
metrics_interval_s = 5,
config = {
"interval": nanos::from_seconds(1)
}
Expand Down
38 changes: 35 additions & 3 deletions src/connectors/impls/metronome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl ConnectorBuilder for Builder {
"metronome".into()
}

async fn build(&self, _id: &str, raw_config: &ConnectorConfig) -> Result<Box<dyn Connector>> {
async fn build(&self, id: &str, raw_config: &ConnectorConfig) -> Result<Box<dyn Connector>> {
if let Some(raw) = &raw_config.config {
let config = Config::new(raw)?;
let origin_uri = EventOriginUri {
Expand All @@ -50,7 +50,7 @@ impl ConnectorBuilder for Builder {
origin_uri,
}))
} else {
Err(ErrorKind::MissingConfiguration(String::from("metronome")).into())
Err(ErrorKind::MissingConfiguration(id.to_string()).into())
}
}
}
Expand Down Expand Up @@ -111,7 +111,7 @@ impl Source for MetronomeSource {
*pull_id = self.id;
self.id += 1;
let data = literal!({
"onramp": "metronome",
"connector": "metronome",
"ingest_ns": now,
"id": *pull_id
});
Expand All @@ -131,3 +131,35 @@ impl Source for MetronomeSource {
false
}
}

#[cfg(test)]
mod tests {

use crate::{
config::Reconnect,
connectors::ConnectorBuilder,
errors::{Error, Kind as ErrorKind, Result},
};
#[async_std::test]
async fn missing_config() -> Result<()> {
let builder = super::Builder::default();
let connector_config = super::ConnectorConfig {
connector_type: builder.connector_type(),
codec: None,
config: None,
preprocessors: None,
postprocessors: None,
reconnect: Reconnect::None,
metrics_interval_s: Some(5),
};
assert!(matches!(
builder
.build("snot", &connector_config)
.await
.err()
.unwrap(),
Error(ErrorKind::MissingConfiguration(_), _)
));
Ok(())
}
}
2 changes: 1 addition & 1 deletion src/connectors/tests/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use testcontainers::{
};

const IMAGE: &str = "docker.vectorized.io/vectorized/redpanda";
const VERSION: &str = "v21.11.10";
const VERSION: &str = "v21.11.15";

async fn redpanda_container<'d>(docker: &'d DockerCli) -> Result<Container<'d, GenericImage>> {
let kafka_port = find_free_tcp_port().await?;
Expand Down
5 changes: 1 addition & 4 deletions src/system/flow_supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,10 @@ impl FlowSupervisor {
let rx_futures = std::iter::repeat_with(|| rx.recv()).take(alive_flows);
for result in futures::future::join_all(rx_futures).await {
match result {
Err(_) => {
error!("Error receiving from Draining process.");
}
Ok(Err(e)) => {
error!("Error during Draining: {}", e);
}
Ok(Ok(())) => {}
Err(_) | Ok(Ok(())) => {}
}
}
info!("Flows drained.");
Expand Down
2 changes: 1 addition & 1 deletion tremor-cli/tests/integration/elastic-empty/after.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@
"args": [
"rm",
"-f",
"tremor-elasticsearch-integration-blabla-unique"
"tremor-elasticsearch-integration-blabla-empty"
]
}
2 changes: 1 addition & 1 deletion tremor-cli/tests/integration/elastic-empty/before.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"args": [
"run",
"--name",
"tremor-elasticsearch-integration-blabla-unique",
"tremor-elasticsearch-integration-blabla-empty",
"-p127.0.0.1:9200:9200",
"-p9300:9300",
"-e",
Expand Down
2 changes: 1 addition & 1 deletion tremor-cli/tests/integration/elastic-verify-gd/after.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@
"args": [
"rm",
"-f",
"tremor-elasticsearch-integration-blabla-unique"
"tremor-elasticsearch-integration-blabla-gd"
]
}
2 changes: 1 addition & 1 deletion tremor-cli/tests/integration/elastic-verify-gd/before.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"args": [
"run",
"--name",
"tremor-elasticsearch-integration-blabla-unique",
"tremor-elasticsearch-integration-blabla-gd",
"-p127.0.0.1:9200:9200",
"-p9300:9300",
"-e",
Expand Down
2 changes: 1 addition & 1 deletion tremor-cli/tests/integration/system-metrics/assert.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ asserts:
- source: events.log
contains:
- |
{"id":10,"ingest_ns":null,"onramp":"metronome"}
{"id":10,"ingest_ns":null,"connector":"metronome"}
- source: fg.out.log
contains:
- |
Expand Down

0 comments on commit 738b3e8

Please sign in to comment.