Skip to content

Commit

Permalink
feat(new sink): Adding greptimedb metrics sink (#17198)
Browse files Browse the repository at this point in the history
This patch adds [greptimedb](https://github.com/greptimeteam/greptimedb)
sink.

For now, we use greptimedb's git repository for adding its client. We
will eventually split it from repo and publish it to crates.io, I hope
this is not a road blocker at the moment.

TODOs:

- [x] website docs
- [x] spellcheck cleanup
- [x] integration test verification

---------

Co-authored-by: Doug Smith <dsmith3197@users.noreply.github.com>
  • Loading branch information
sunng87 and dsmith3197 committed Jul 11, 2023
1 parent 5b1219f commit da36a5e
Show file tree
Hide file tree
Showing 22 changed files with 1,465 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .github/actions/spelling/allow.txt
Expand Up @@ -296,6 +296,9 @@ gpg
gql
grafana
graphiql
greptime
greptimecloud
greptimedb
gvisor
gws
hadoop
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/changes.yml
Expand Up @@ -70,6 +70,8 @@ on:
value: ${{ jobs.int_tests.outputs.fluent }}
gcp:
value: ${{ jobs.int_tests.outputs.gcp }}
greptimedb:
value: ${{ jobs.int_tests.outputs.greptimedb }}
humio:
value: ${{ jobs.int_tests.outputs.humio }}
http-client:
Expand Down Expand Up @@ -194,6 +196,7 @@ jobs:
eventstoredb: ${{ steps.filter.outputs.eventstoredb }}
fluent: ${{ steps.filter.outputs.fluent }}
gcp: ${{ steps.filter.outputs.gcp }}
greptimedb: ${{ steps.filter.outputs.greptimedb }}
humio: ${{ steps.filter.outputs.humio }}
http-client: ${{ steps.filter.outputs.http-client }}
influxdb: ${{ steps.filter.outputs.influxdb }}
Expand Down
8 changes: 8 additions & 0 deletions .github/workflows/integration-comment.yml
Expand Up @@ -221,6 +221,14 @@ jobs:
max_attempts: 3
command: bash scripts/ci-integration-test.sh gcp

- name: greptimedb
if: ${{ contains(github.event.comment.body, '/ci-run-integration-greptimedb') || contains(github.event.comment.body, '/ci-run-all') }}
uses: nick-fields/retry@v2
with:
timeout_minutes: 30
max_attempts: 3
command: bash scripts/ci-integration-test.sh greptimedb

- name: humio
if: ${{ contains(github.event.comment.body, '/ci-run-integration-humio') || contains(github.event.comment.body, '/ci-run-all') }}
uses: nick-fields/retry@v2
Expand Down
9 changes: 9 additions & 0 deletions .github/workflows/integration.yml
Expand Up @@ -67,6 +67,7 @@ jobs:
|| needs.changes.outputs.eventstoredb == 'true'
|| needs.changes.outputs.fluent == 'true'
|| needs.changes.outputs.gcp == 'true'
|| needs.changes.outputs.greptimedb == 'true'
|| needs.changes.outputs.humio == 'true'
|| needs.changes.outputs.http-client == 'true'
|| needs.changes.outputs.influxdb == 'true'
Expand Down Expand Up @@ -251,6 +252,14 @@ jobs:
max_attempts: 3
command: bash scripts/ci-integration-test.sh gcp

- if: ${{ github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.greptimedb == 'true' }}
name: greptimedb
uses: nick-fields/retry@v2
with:
timeout_minutes: 30
max_attempts: 3
command: bash scripts/ci-integration-test.sh greptimedb

- if: ${{ github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.humio == 'true' }}
name: humio
uses: nick-fields/retry@v2
Expand Down
34 changes: 34 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions Cargo.toml
Expand Up @@ -233,6 +233,9 @@ tui = { version = "0.19.0", optional = true, default-features = false, features
hex = { version = "0.4.3", default-features = false, optional = true }
sha2 = { version = "0.10.7", default-features = false, optional = true }

# GreptimeDB
greptimedb-client = { git = "https://github.com/GreptimeTeam/greptimedb-client-rust.git", rev = "bc32362adf0df17a41a95bae4221d6d8f1775656", optional = true }

# External libs
arc-swap = { version = "1.6", default-features = false, optional = true }
async-compression = { version = "0.4.1", default-features = false, features = ["tokio", "gzip", "zstd"], optional = true }
Expand Down Expand Up @@ -646,6 +649,7 @@ sinks-metrics = [
"sinks-blackhole",
"sinks-console",
"sinks-datadog_metrics",
"sinks-greptimedb",
"sinks-humio",
"sinks-influxdb",
"sinks-kafka",
Expand Down Expand Up @@ -679,6 +683,7 @@ sinks-datadog_traces = ["protobuf-build", "dep:rmpv", "dep:rmp-serde", "dep:serd
sinks-elasticsearch = ["aws-core", "transforms-metric_to_log"]
sinks-file = ["dep:async-compression"]
sinks-gcp = ["dep:base64", "gcp"]
sinks-greptimedb = ["dep:greptimedb-client"]
sinks-honeycomb = []
sinks-http = []
sinks-humio = ["sinks-splunk_hec", "transforms-metric_to_log"]
Expand Down Expand Up @@ -739,6 +744,7 @@ all-integration-tests = [
"gcp-cloud-storage-integration-tests",
"gcp-integration-tests",
"gcp-pubsub-integration-tests",
"greptimedb-integration-tests",
"http-client-integration-tests",
"humio-integration-tests",
"influxdb-integration-tests",
Expand Down Expand Up @@ -800,6 +806,7 @@ fluent-integration-tests = ["docker", "sources-fluent"]
gcp-cloud-storage-integration-tests = ["sinks-gcp"]
gcp-integration-tests = ["sinks-gcp"]
gcp-pubsub-integration-tests = ["sinks-gcp", "sources-gcp_pubsub"]
greptimedb-integration-tests = ["sinks-greptimedb"]
humio-integration-tests = ["sinks-humio"]
http-client-integration-tests = ["sources-http_client"]
influxdb-integration-tests = ["sinks-influxdb"]
Expand Down
2 changes: 2 additions & 0 deletions LICENSE-3rdparty.csv
Expand Up @@ -224,6 +224,8 @@ graphql-parser,https://github.com/graphql-rust/graphql-parser,MIT OR Apache-2.0,
graphql_client,https://github.com/graphql-rust/graphql-client,Apache-2.0 OR MIT,Tom Houlé <tom@tomhoule.com>
graphql_client_codegen,https://github.com/graphql-rust/graphql-client,Apache-2.0 OR MIT,Tom Houlé <tom@tomhoule.com>
graphql_query_derive,https://github.com/graphql-rust/graphql-client,Apache-2.0 OR MIT,Tom Houlé <tom@tomhoule.com>
greptime-proto,https://github.com/GreptimeTeam/greptime-proto,Apache-2.0,The greptime-proto Authors
greptimedb-client,https://github.com/GreptimeTeam/greptimedb-client-rust,Apache-2.0,The greptimedb-client Authors
grok,https://github.com/daschl/grok,Apache-2.0,Michael Nitschinger <michael@nitschinger.at>
h2,https://github.com/hyperium/h2,MIT,"Carl Lerche <me@carllerche.com>, Sean McArthur <sean@seanmonstar.com>"
hash_hasher,https://github.com/Fraser999/Hash-Hasher,Apache-2.0 OR MIT,Fraser Hutchison <fraser.hutchison@maidsafe.net>
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Expand Up @@ -332,7 +332,7 @@ test-behavior: test-behavior-transforms test-behavior-formats test-behavior-conf
test-integration: ## Runs all integration tests
test-integration: test-integration-amqp test-integration-appsignal test-integration-aws test-integration-axiom test-integration-azure test-integration-chronicle test-integration-clickhouse
test-integration: test-integration-databend test-integration-docker-logs test-integration-elasticsearch
test-integration: test-integration-eventstoredb test-integration-fluent test-integration-gcp test-integration-humio test-integration-http-client test-integration-influxdb
test-integration: test-integration-eventstoredb test-integration-fluent test-integration-gcp test-integration-greptimedb test-integration-humio test-integration-http-client test-integration-influxdb
test-integration: test-integration-kafka test-integration-logstash test-integration-loki test-integration-mongodb test-integration-nats
test-integration: test-integration-nginx test-integration-opentelemetry test-integration-postgres test-integration-prometheus test-integration-pulsar
test-integration: test-integration-redis test-integration-splunk test-integration-dnstap test-integration-datadog-agent test-integration-datadog-logs
Expand Down
8 changes: 8 additions & 0 deletions scripts/integration/greptimedb/compose.yaml
@@ -0,0 +1,8 @@
version: '3'

services:
greptimedb:
image: docker.io/greptime/greptimedb:${CONFIG_VERSION}
command: "standalone start --http-addr=0.0.0.0:4000 --rpc-addr=0.0.0.0:4001"
healthcheck:
test: "curl -f localhost:4000/health || exit 1"
12 changes: 12 additions & 0 deletions scripts/integration/greptimedb/test.yaml
@@ -0,0 +1,12 @@
features:
- greptimedb-integration-tests

test_filter: '::greptimedb::'

runner:
env:
GREPTIMEDB_ENDPOINT: greptimedb:4001
GREPTIMEDB_HTTP: http://greptimedb:4000

matrix:
version: ['latest']
48 changes: 48 additions & 0 deletions src/sinks/greptimedb/batch.rs
@@ -0,0 +1,48 @@
use vector_core::{
event::{Metric, MetricValue},
stream::batcher::limiter::ItemBatchSize,
};

use super::request_builder::{
DISTRIBUTION_QUANTILES, DISTRIBUTION_STAT_FIELD_COUNT, SUMMARY_STAT_FIELD_COUNT,
};

const F64_BYTE_SIZE: usize = 8;
const I64_BYTE_SIZE: usize = 8;

#[derive(Default)]
pub(super) struct GreptimeDBBatchSizer;

impl GreptimeDBBatchSizer {
pub(super) fn estimated_size_of(&self, item: &Metric) -> usize {
// Metric name.
item.series().name().name().len()
// Metric namespace, with an additional 1 to account for the namespace separator.
+ item.series().name().namespace().map(|s| s.len() + 1).unwrap_or(0)
// Metric tags, with an additional 1 per tag to account for the tag key/value separator.
+ item.series().tags().map(|t| {
t.iter_all().map(|(k, v)| {
k.len() + 1 + v.map(|v| v.len()).unwrap_or(0)
})
.sum()
})
.unwrap_or(0)
// timestamp
+ I64_BYTE_SIZE
+
// value size
match item.value() {
MetricValue::Counter { .. } | MetricValue::Gauge { .. } | MetricValue::Set { ..} => F64_BYTE_SIZE,
MetricValue::Distribution { .. } => F64_BYTE_SIZE * (DISTRIBUTION_QUANTILES.len() + DISTRIBUTION_STAT_FIELD_COUNT),
MetricValue::AggregatedHistogram { buckets, .. } => F64_BYTE_SIZE * (buckets.len() + SUMMARY_STAT_FIELD_COUNT),
MetricValue::AggregatedSummary { quantiles, .. } => F64_BYTE_SIZE * (quantiles.len() + SUMMARY_STAT_FIELD_COUNT),
MetricValue::Sketch { .. } => F64_BYTE_SIZE * (DISTRIBUTION_QUANTILES.len() + DISTRIBUTION_STAT_FIELD_COUNT),
}
}
}

impl ItemBatchSize<Metric> for GreptimeDBBatchSizer {
fn size(&self, item: &Metric) -> usize {
self.estimated_size_of(item)
}
}
88 changes: 88 additions & 0 deletions src/sinks/greptimedb/integration_tests.rs
@@ -0,0 +1,88 @@
use chrono::{DateTime, Duration, Utc};
use futures::stream;
use vector_core::event::{Event, Metric, MetricKind, MetricValue};
use vector_core::metric_tags;

use crate::sinks::util::test::load_sink;
use crate::{
config::{SinkConfig, SinkContext},
test_util::{
components::{run_and_assert_sink_compliance, SINK_TAGS},
trace_init,
},
};

use super::GreptimeDBConfig;

#[tokio::test]
async fn test_greptimedb_sink() {
trace_init();
let cfg = format!(
r#"endpoint= "{}"
"#,
std::env::var("GREPTIMEDB_ENDPOINT").unwrap_or_else(|_| "localhost:4001".to_owned())
);

let (config, _) = load_sink::<GreptimeDBConfig>(&cfg).unwrap();
let (sink, _hc) = config.build(SinkContext::default()).await.unwrap();

let query_client = query_client();

// Drop the table and data inside
let _ = query_client
.get(&format!(
"{}/v1/sql",
std::env::var("GREPTIMEDB_HTTP").unwrap_or_else(|_| "http://localhost:4000".to_owned())
))
.query(&[("sql", "DROP TABLE ns_my_counter")])
.send()
.await
.unwrap();

let base_time = Utc::now();
let events: Vec<_> = (0..10).map(|idx| create_event(idx, base_time)).collect();
run_and_assert_sink_compliance(sink, stream::iter(events), &SINK_TAGS).await;

let query_response = query_client
.get(&format!(
"{}/v1/sql",
std::env::var("GREPTIMEDB_HTTP").unwrap_or_else(|_| "http://localhost:4000".to_owned())
))
.query(&[("sql", "SELECT region, val FROM ns_my_counter")])
.send()
.await
.unwrap()
.text()
.await
.expect("Fetch json from greptimedb failed");
let result: serde_json::Value =
serde_json::from_str(&query_response).expect("Invalid json returned from greptimedb query");
assert_eq!(
result
.pointer("/output/0/records/rows")
.and_then(|v| v.as_array())
.expect("Error getting greptimedb response array")
.len(),
10
)
}

fn query_client() -> reqwest::Client {
reqwest::Client::builder().build().unwrap()
}

fn create_event(i: i32, base_time: DateTime<Utc>) -> Event {
Event::Metric(
Metric::new(
"my_counter".to_owned(),
MetricKind::Incremental,
MetricValue::Counter { value: i as f64 },
)
.with_namespace(Some("ns"))
.with_tags(Some(metric_tags!(
"region" => "us-west-1",
"production" => "true",
)))
.with_timestamp(Some(base_time + Duration::seconds(i as i64))),
)
}

0 comments on commit da36a5e

Please sign in to comment.