Skip to content

Commit

Permalink
fix(tests): Handle early return from influxdb create database (#5928)
Browse files Browse the repository at this point in the history
* Fix tracing in prometheus remote_write sink integration tests

* Use the nanosecond timestamp to create influx database name

* Don't bother dropping the old database, the name is unique

* fix(tests): Handle early return from influxdb create database

Some times, the influxdb v1 server will return early from creating the
database, providing an OK response code before the database is actually
created. Subsequent sink calls will then encounter a NOT_FOUND response
when trying to write to the database, resulting in a spurious failure.
This tests for database existence after creating it by doing an empty
write, and looping until that returns NO_CONTENT.

Signed-off-by: Bruce Guenter <bruce@timber.io>
  • Loading branch information
Bruce Guenter committed Jan 8, 2021
1 parent 2cce30c commit 08bb58f
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 18 deletions.
52 changes: 34 additions & 18 deletions src/sinks/influxdb/mod.rs
Expand Up @@ -309,19 +309,16 @@ pub(in crate::sinks) fn encode_uri(
#[allow(dead_code)]
pub mod test_util {
use super::*;
use chrono::offset::TimeZone;
use chrono::{offset::TimeZone, Utc};
use std::fs::File;
use std::io::Read;
use std::sync::atomic::{AtomicUsize, Ordering};

pub(crate) const ORG: &str = "my-org";
pub(crate) const BUCKET: &str = "my-bucket";
pub(crate) const TOKEN: &str = "my-token";

static DATABASE_NUM: AtomicUsize = AtomicUsize::new(0);

pub(crate) fn next_database() -> String {
format!("testdb{}", DATABASE_NUM.fetch_add(1, Ordering::Relaxed))
format!("testdb{}", Utc::now().timestamp_nanos())
}

pub(crate) fn ts() -> DateTime<Utc> {
Expand Down Expand Up @@ -369,20 +366,22 @@ pub mod test_util {
(measurement, split[0], split[1].to_string(), split[2])
}

pub(crate) async fn query_v1(endpoint: &str, query: &str) -> reqwest::Response {
fn client() -> reqwest::Client {
let mut test_ca = Vec::<u8>::new();
File::open("tests/data/Vector_CA.crt")
.unwrap()
.read_to_end(&mut test_ca)
.unwrap();
let test_ca = reqwest::Certificate::from_pem(&test_ca).unwrap();

let client = reqwest::Client::builder()
reqwest::Client::builder()
.add_root_certificate(test_ca)
.build()
.unwrap();
.unwrap()
}

client
pub(crate) async fn query_v1(endpoint: &str, query: &str) -> reqwest::Response {
client()
.get(&format!("{}/query", endpoint))
.query(&[("q", query)])
.send()
Expand All @@ -392,25 +391,42 @@ pub mod test_util {

pub(crate) async fn onboarding_v1(endpoint: &str) -> String {
let database = next_database();
query_v1(endpoint, &format!("drop database {}", database)).await;
let status = query_v1(endpoint, &format!("create database {}", database))
.await
.status();
assert!(
status == http::StatusCode::OK,
format!("UnexpectedStatus: {}", status)
);
assert_eq!(status, http::StatusCode::OK, "UnexpectedStatus: {}", status);
// Some times InfluxDB will return OK before it can actually
// accept writes to the database, leading to test failures. Test
// this with empty writes and loop if it reports the database
// does not exist yet.
crate::test_util::wait_for(|| {
let write_url = format!("{}/write?db={}", endpoint, &database);
async move {
match client()
.post(&write_url)
.header("Content-Type", "text/plain")
.header("Authorization", &format!("Token {}", TOKEN))
.body("")
.send()
.await
.unwrap()
.status()
{
http::StatusCode::NO_CONTENT => true,
http::StatusCode::NOT_FOUND => false,
status => panic!("Unexpected status: {}", status),
}
}
})
.await;
database
}

pub(crate) async fn cleanup_v1(endpoint: &str, database: &str) {
let status = query_v1(endpoint, &format!("drop database {}", database))
.await
.status();
assert!(
status == http::StatusCode::OK,
format!("UnexpectedStatus: {}", status)
);
assert_eq!(status, http::StatusCode::OK, "UnexpectedStatus: {}", status);
}

pub(crate) async fn onboarding_v2() {
Expand Down
2 changes: 2 additions & 0 deletions src/sinks/prometheus/remote_write.rs
Expand Up @@ -218,6 +218,8 @@ mod integration_tests {
}

async fn insert_metrics(url: &str) {
crate::test_util::trace_init();

let database = onboarding_v1(url).await;

let cx = SinkContext::new_test();
Expand Down

0 comments on commit 08bb58f

Please sign in to comment.