From 6f66dd16d2c6cd7d28aac4aff53e3e1977ba293f Mon Sep 17 00:00:00 2001 From: Matthias Wahl Date: Fri, 6 May 2022 10:09:43 +0200 Subject: [PATCH 1/8] Use unique docker container names per integration test Signed-off-by: Matthias Wahl --- tremor-cli/tests/integration/elastic-empty/after.yaml | 2 +- tremor-cli/tests/integration/elastic-empty/before.yaml | 2 +- tremor-cli/tests/integration/elastic-verify-gd/after.yaml | 2 +- tremor-cli/tests/integration/elastic-verify-gd/before.yaml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tremor-cli/tests/integration/elastic-empty/after.yaml b/tremor-cli/tests/integration/elastic-empty/after.yaml index f46dcd215c..079640c3ce 100644 --- a/tremor-cli/tests/integration/elastic-empty/after.yaml +++ b/tremor-cli/tests/integration/elastic-empty/after.yaml @@ -4,6 +4,6 @@ "args": [ "rm", "-f", - "tremor-elasticsearch-integration-blabla-unique" + "tremor-elasticsearch-integration-blabla-empty" ] } \ No newline at end of file diff --git a/tremor-cli/tests/integration/elastic-empty/before.yaml b/tremor-cli/tests/integration/elastic-empty/before.yaml index ead463bd39..696d07f9c1 100644 --- a/tremor-cli/tests/integration/elastic-empty/before.yaml +++ b/tremor-cli/tests/integration/elastic-empty/before.yaml @@ -4,7 +4,7 @@ "args": [ "run", "--name", - "tremor-elasticsearch-integration-blabla-unique", + "tremor-elasticsearch-integration-blabla-empty", "-p127.0.0.1:9200:9200", "-p9300:9300", "-e", diff --git a/tremor-cli/tests/integration/elastic-verify-gd/after.yaml b/tremor-cli/tests/integration/elastic-verify-gd/after.yaml index f46dcd215c..5c4e3f945e 100644 --- a/tremor-cli/tests/integration/elastic-verify-gd/after.yaml +++ b/tremor-cli/tests/integration/elastic-verify-gd/after.yaml @@ -4,6 +4,6 @@ "args": [ "rm", "-f", - "tremor-elasticsearch-integration-blabla-unique" + "tremor-elasticsearch-integration-blabla-gd" ] } \ No newline at end of file diff --git a/tremor-cli/tests/integration/elastic-verify-gd/before.yaml b/tremor-cli/tests/integration/elastic-verify-gd/before.yaml index ead463bd39..7ad80a0c26 100644 --- a/tremor-cli/tests/integration/elastic-verify-gd/before.yaml +++ b/tremor-cli/tests/integration/elastic-verify-gd/before.yaml @@ -4,7 +4,7 @@ "args": [ "run", "--name", - "tremor-elasticsearch-integration-blabla-unique", + "tremor-elasticsearch-integration-blabla-gd", "-p127.0.0.1:9200:9200", "-p9300:9300", "-e", From b45019d4ba99904a8222c43acb0ce10aac270924 Mon Sep 17 00:00:00 2001 From: Matthias Wahl Date: Fri, 6 May 2022 10:11:04 +0200 Subject: [PATCH 2/8] Add metrics interval for connector too in docker default config. Signed-off-by: Matthias Wahl --- docker/config/docker.troy | 1 + 1 file changed, 1 insertion(+) diff --git a/docker/config/docker.troy b/docker/config/docker.troy index 291a63f3bc..16e2c624cb 100644 --- a/docker/config/docker.troy +++ b/docker/config/docker.troy @@ -23,6 +23,7 @@ flow define connector metronome from metronome with + metrics_interval_s = 5, config = { "interval": nanos::from_seconds(1) } From 8bdfdc94a642a24693254696bd29022a523d8b7d Mon Sep 17 00:00:00 2001 From: Matthias Wahl Date: Fri, 6 May 2022 10:11:48 +0200 Subject: [PATCH 3/8] Change metronome payload to remove wording and improve error msg Signed-off-by: Matthias Wahl --- src/connectors/impls/metronome.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/connectors/impls/metronome.rs b/src/connectors/impls/metronome.rs index 0ccde90c30..0c734d9785 100644 --- a/src/connectors/impls/metronome.rs +++ b/src/connectors/impls/metronome.rs @@ -35,7 +35,7 @@ impl ConnectorBuilder for Builder { "metronome".into() } - async fn build(&self, _id: &str, raw_config: &ConnectorConfig) -> Result> { + async fn build(&self, id: &str, raw_config: &ConnectorConfig) -> Result> { if let Some(raw) = &raw_config.config { let config = Config::new(raw)?; let origin_uri = EventOriginUri { @@ -50,7 +50,7 @@ impl ConnectorBuilder for Builder { origin_uri, })) } else { - Err(ErrorKind::MissingConfiguration(String::from("metronome")).into()) + Err(ErrorKind::MissingConfiguration(id.to_string()).into()) } } } @@ -101,7 +101,7 @@ impl Source for MetronomeSource { self.next = nanotime() + self.interval_ns; Ok(true) } - async fn pull_data(&mut self, pull_id: &mut u64, _ctx: &SourceContext) -> Result { + async fn pull_data(&mut self, pull_id: &mut u64, ctx: &SourceContext) -> Result { let now = nanotime(); // we need to wait here before we continue to fulfill the interval conditions if now < self.next { @@ -111,7 +111,7 @@ impl Source for MetronomeSource { *pull_id = self.id; self.id += 1; let data = literal!({ - "onramp": "metronome", + "connector": "metronome", "ingest_ns": now, "id": *pull_id }); From cafe73359bdb4ad83344c42194dac552918ece57 Mon Sep 17 00:00:00 2001 From: Matthias Wahl Date: Fri, 6 May 2022 10:12:13 +0200 Subject: [PATCH 4/8] Update redpanda docker image to include fixed jepsen stuff Signed-off-by: Matthias Wahl --- src/connectors/tests/kafka.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connectors/tests/kafka.rs b/src/connectors/tests/kafka.rs index ef73e72482..de1d3800e0 100644 --- a/src/connectors/tests/kafka.rs +++ b/src/connectors/tests/kafka.rs @@ -22,7 +22,7 @@ use testcontainers::{ }; const IMAGE: &str = "docker.vectorized.io/vectorized/redpanda"; -const VERSION: &str = "v21.11.10"; +const VERSION: &str = "v21.11.15"; async fn redpanda_container<'d>(docker: &'d DockerCli) -> Result> { let kafka_port = find_free_tcp_port().await?; From 840b7fd7ea4d3fb923996da3ee8c6e06ef9923a0 Mon Sep 17 00:00:00 2001 From: Matthias Wahl Date: Fri, 6 May 2022 10:13:05 +0200 Subject: [PATCH 5/8] Remove misleading error message upon failed drain process. Users don't necessarily need to know, they will get a timeout error message anyways. Signed-off-by: Matthias Wahl --- src/system/flow_supervisor.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/system/flow_supervisor.rs b/src/system/flow_supervisor.rs index bb72402210..025db35d19 100644 --- a/src/system/flow_supervisor.rs +++ b/src/system/flow_supervisor.rs @@ -173,13 +173,10 @@ impl FlowSupervisor { let rx_futures = std::iter::repeat_with(|| rx.recv()).take(alive_flows); for result in futures::future::join_all(rx_futures).await { match result { - Err(_) => { - error!("Error receiving from Draining process."); - } Ok(Err(e)) => { error!("Error during Draining: {}", e); } - Ok(Ok(())) => {} + Err(_) | Ok(Ok(())) => {} } } info!("Flows drained."); From 9a5a4bec9f8879d7706b1332a6983dcbd705e190 Mon Sep 17 00:00:00 2001 From: Matthias Wahl Date: Fri, 6 May 2022 11:18:33 +0200 Subject: [PATCH 6/8] Fix warning Signed-off-by: Matthias Wahl --- src/connectors/impls/metronome.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connectors/impls/metronome.rs b/src/connectors/impls/metronome.rs index 0c734d9785..972ca3e5c5 100644 --- a/src/connectors/impls/metronome.rs +++ b/src/connectors/impls/metronome.rs @@ -101,7 +101,7 @@ impl Source for MetronomeSource { self.next = nanotime() + self.interval_ns; Ok(true) } - async fn pull_data(&mut self, pull_id: &mut u64, ctx: &SourceContext) -> Result { + async fn pull_data(&mut self, pull_id: &mut u64, _ctx: &SourceContext) -> Result { let now = nanotime(); // we need to wait here before we continue to fulfill the interval conditions if now < self.next { From 8b40854be1aa55984ae11d81a14c6ab52a8191eb Mon Sep 17 00:00:00 2001 From: Matthias Wahl Date: Fri, 6 May 2022 12:13:08 +0200 Subject: [PATCH 7/8] Fix system-metrics integration test assertion Signed-off-by: Matthias Wahl --- tremor-cli/tests/integration/system-metrics/assert.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tremor-cli/tests/integration/system-metrics/assert.yaml b/tremor-cli/tests/integration/system-metrics/assert.yaml index cb188a885c..c1ed841540 100644 --- a/tremor-cli/tests/integration/system-metrics/assert.yaml +++ b/tremor-cli/tests/integration/system-metrics/assert.yaml @@ -4,7 +4,7 @@ asserts: - source: events.log contains: - | - {"id":10,"ingest_ns":null,"onramp":"metronome"} + {"id":10,"ingest_ns":null,"connector":"metronome"} - source: fg.out.log contains: - | From ec79225f745a005a91473b64d75de6d6bec467d9 Mon Sep 17 00:00:00 2001 From: Matthias Wahl Date: Mon, 9 May 2022 10:21:26 +0200 Subject: [PATCH 8/8] Cover changed line. I hate you coverage. Signed-off-by: Matthias Wahl --- src/connectors/impls/metronome.rs | 32 +++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/src/connectors/impls/metronome.rs b/src/connectors/impls/metronome.rs index 972ca3e5c5..59840b0b29 100644 --- a/src/connectors/impls/metronome.rs +++ b/src/connectors/impls/metronome.rs @@ -131,3 +131,35 @@ impl Source for MetronomeSource { false } } + +#[cfg(test)] +mod tests { + + use crate::{ + config::Reconnect, + connectors::ConnectorBuilder, + errors::{Error, Kind as ErrorKind, Result}, + }; + #[async_std::test] + async fn missing_config() -> Result<()> { + let builder = super::Builder::default(); + let connector_config = super::ConnectorConfig { + connector_type: builder.connector_type(), + codec: None, + config: None, + preprocessors: None, + postprocessors: None, + reconnect: Reconnect::None, + metrics_interval_s: Some(5), + }; + assert!(matches!( + builder + .build("snot", &connector_config) + .await + .err() + .unwrap(), + Error(ErrorKind::MissingConfiguration(_), _) + )); + Ok(()) + } +}