Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(new sink): Adding greptimedb metrics sink #17198

Merged
merged 37 commits into from Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
e82ed07
feat: add greptimedb sink (wip)
sunng87 Feb 27, 2023
5e951fb
feat: configure greptimedb as git dependency
sunng87 Mar 7, 2023
4efa4c0
feat: transforming vector metric to greptimedb insert request
sunng87 Mar 15, 2023
52094b8
feat: address issues in latest greptimedb client
sunng87 Mar 21, 2023
a080f30
feat: add grpc auth support
sunng87 Mar 21, 2023
9e8d354
refactor: default value for catalog and schema, rename column fn
sunng87 Mar 21, 2023
7c13cea
refactor: small tweaks
sunng87 Mar 22, 2023
6f7add6
test: add integration test wip
sunng87 Mar 24, 2023
c51c2ef
feat: update greptimedb grpc client, dbname and healthcheck for grpc
sunng87 Mar 31, 2023
e766b62
feat: change default timestmap column name to ts
sunng87 Apr 6, 2023
2c5cafc
test: add integration tests for greptimedb
sunng87 Apr 11, 2023
64704f7
test: correct column count in test cases
sunng87 May 4, 2023
945cf90
test: resolve lint and test errors
sunng87 May 4, 2023
9c37da8
chore: switch to compact greptimedb rust client
sunng87 May 5, 2023
a888bf9
docs: add website docs
sunng87 May 22, 2023
2bfbce3
chore: update 3rd-party license file
sunng87 May 22, 2023
b4774ff
fix: add greptimedb-integration-tests feature
sunng87 May 24, 2023
31107a4
chore: remove network section from integration tests
sunng87 May 25, 2023
2dc19f5
fix: resolve new lint warnings
sunng87 Jun 2, 2023
352bd13
test: fix compose file by adding command
sunng87 Jun 7, 2023
b320d61
feat: use latest client protocol
sunng87 Jun 6, 2023
26a7a61
refactor: use stream sink apis
sunng87 Jun 11, 2023
828bebc
fix: integration tests
sunng87 Jun 12, 2023
8fb9943
feat: add retry logic back
sunng87 Jun 12, 2023
542a820
fix: change default column name to val instead of value
sunng87 Jun 15, 2023
2d39446
refactor: minor tunes
sunng87 Jun 26, 2023
156a35e
docs: update cue files
sunng87 Jun 26, 2023
3cdeb51
feat: add some tlsconfig support for custom ca and client cert
sunng87 Jul 1, 2023
930f97d
docs: add greptime.md for website
sunng87 Jul 1, 2023
1e6b6df
Merge remote-tracking branch 'upstream/master' into feature/greptimedb
sunng87 Jul 1, 2023
d590c7b
refactor: adopt latest master changes
sunng87 Jul 1, 2023
7cde107
fix: avoid panic for tlsconfig error
sunng87 Jul 3, 2023
74ba665
Merge branch 'master' into feature/greptimedb
sunng87 Jul 4, 2023
c16cc6d
Merge branch 'master' into feature/greptimedb
sunng87 Jul 6, 2023
2968854
Apply suggestions from code review
sunng87 Jul 8, 2023
289f4a8
Update src/sinks/greptimedb/service.rs
sunng87 Jul 8, 2023
e247b68
refactor: address review issues
sunng87 Jul 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/actions/spelling/allow.txt
Expand Up @@ -295,6 +295,9 @@ gpg
gql
grafana
graphiql
greptime
greptimecloud
greptimedb
gvisor
gws
hadoop
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/changes.yml
Expand Up @@ -70,6 +70,8 @@ on:
value: ${{ jobs.int_tests.outputs.fluent }}
gcp:
value: ${{ jobs.int_tests.outputs.gcp }}
greptimedb:
value: ${{ jobs.int_tests.outputs.greptimedb }}
humio:
value: ${{ jobs.int_tests.outputs.humio }}
http-client:
Expand Down Expand Up @@ -194,6 +196,7 @@ jobs:
eventstoredb: ${{ steps.filter.outputs.eventstoredb }}
fluent: ${{ steps.filter.outputs.fluent }}
gcp: ${{ steps.filter.outputs.gcp }}
greptimedb: ${{ steps.filter.outputs.greptimedb }}
humio: ${{ steps.filter.outputs.humio }}
http-client: ${{ steps.filter.outputs.http-client }}
influxdb: ${{ steps.filter.outputs.influxdb }}
Expand Down
8 changes: 8 additions & 0 deletions .github/workflows/integration-comment.yml
Expand Up @@ -221,6 +221,14 @@ jobs:
max_attempts: 3
command: bash scripts/ci-integration-test.sh gcp

- name: greptimedb
if: ${{ contains(github.event.comment.body, '/ci-run-integration-greptimedb') || contains(github.event.comment.body, '/ci-run-all') }}
uses: nick-fields/retry@v2
with:
timeout_minutes: 30
max_attempts: 3
command: bash scripts/ci-integration-test.sh greptimedb

- name: humio
if: ${{ contains(github.event.comment.body, '/ci-run-integration-humio') || contains(github.event.comment.body, '/ci-run-all') }}
uses: nick-fields/retry@v2
Expand Down
9 changes: 9 additions & 0 deletions .github/workflows/integration.yml
Expand Up @@ -67,6 +67,7 @@ jobs:
|| needs.changes.outputs.eventstoredb == 'true'
|| needs.changes.outputs.fluent == 'true'
|| needs.changes.outputs.gcp == 'true'
|| needs.changes.outputs.greptimedb == 'true'
|| needs.changes.outputs.humio == 'true'
|| needs.changes.outputs.http-client == 'true'
|| needs.changes.outputs.influxdb == 'true'
Expand Down Expand Up @@ -233,6 +234,14 @@ jobs:
max_attempts: 3
command: bash scripts/ci-integration-test.sh gcp

- if: ${{ github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.greptimedb == 'true' }}
name: greptimedb
uses: nick-fields/retry@v2
with:
timeout_minutes: 30
max_attempts: 3
command: bash scripts/ci-integration-test.sh greptimedb

- if: ${{ github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.humio == 'true' }}
name: humio
uses: nick-fields/retry@v2
Expand Down
34 changes: 34 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions Cargo.toml
Expand Up @@ -233,6 +233,9 @@ tui = { version = "0.19.0", optional = true, default-features = false, features
hex = { version = "0.4.3", default-features = false, optional = true }
sha2 = { version = "0.10.7", default-features = false, optional = true }

# GreptimeDB
greptimedb-client = { git = "https://github.com/GreptimeTeam/greptimedb-client-rust.git", rev = "bc32362adf0df17a41a95bae4221d6d8f1775656", optional = true }

# External libs
arc-swap = { version = "1.6", default-features = false, optional = true }
async-compression = { version = "0.4.0", default-features = false, features = ["tokio", "gzip", "zstd"], optional = true }
Expand Down Expand Up @@ -646,6 +649,7 @@ sinks-metrics = [
"sinks-blackhole",
"sinks-console",
"sinks-datadog_metrics",
"sinks-greptimedb",
"sinks-humio",
"sinks-influxdb",
"sinks-kafka",
Expand Down Expand Up @@ -679,6 +683,7 @@ sinks-datadog_traces = ["protobuf-build", "dep:rmpv", "dep:rmp-serde", "dep:serd
sinks-elasticsearch = ["aws-core", "transforms-metric_to_log"]
sinks-file = ["dep:async-compression"]
sinks-gcp = ["dep:base64", "gcp"]
sinks-greptimedb = ["dep:greptimedb-client"]
sinks-honeycomb = []
sinks-http = []
sinks-humio = ["sinks-splunk_hec", "transforms-metric_to_log"]
Expand Down Expand Up @@ -739,6 +744,7 @@ all-integration-tests = [
"gcp-cloud-storage-integration-tests",
"gcp-integration-tests",
"gcp-pubsub-integration-tests",
"greptimedb-integration-tests",
"http-client-integration-tests",
"humio-integration-tests",
"influxdb-integration-tests",
Expand Down Expand Up @@ -800,6 +806,7 @@ fluent-integration-tests = ["docker", "sources-fluent"]
gcp-cloud-storage-integration-tests = ["sinks-gcp"]
gcp-integration-tests = ["sinks-gcp"]
gcp-pubsub-integration-tests = ["sinks-gcp", "sources-gcp_pubsub"]
greptimedb-integration-tests = ["sinks-greptimedb"]
humio-integration-tests = ["sinks-humio"]
http-client-integration-tests = ["sources-http_client"]
influxdb-integration-tests = ["sinks-influxdb"]
Expand Down
2 changes: 2 additions & 0 deletions LICENSE-3rdparty.csv
Expand Up @@ -225,6 +225,8 @@ graphql-parser,https://github.com/graphql-rust/graphql-parser,MIT OR Apache-2.0,
graphql_client,https://github.com/graphql-rust/graphql-client,Apache-2.0 OR MIT,Tom Houlé <tom@tomhoule.com>
graphql_client_codegen,https://github.com/graphql-rust/graphql-client,Apache-2.0 OR MIT,Tom Houlé <tom@tomhoule.com>
graphql_query_derive,https://github.com/graphql-rust/graphql-client,Apache-2.0 OR MIT,Tom Houlé <tom@tomhoule.com>
greptime-proto,https://github.com/GreptimeTeam/greptime-proto,Apache-2.0,The greptime-proto Authors
greptimedb-client,https://github.com/GreptimeTeam/greptimedb-client-rust,Apache-2.0,The greptimedb-client Authors
grok,https://github.com/daschl/grok,Apache-2.0,Michael Nitschinger <michael@nitschinger.at>
h2,https://github.com/hyperium/h2,MIT,"Carl Lerche <me@carllerche.com>, Sean McArthur <sean@seanmonstar.com>"
hash_hasher,https://github.com/Fraser999/Hash-Hasher,Apache-2.0 OR MIT,Fraser Hutchison <fraser.hutchison@maidsafe.net>
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Expand Up @@ -332,7 +332,7 @@ test-behavior: test-behavior-transforms test-behavior-formats test-behavior-conf
test-integration: ## Runs all integration tests
test-integration: test-integration-amqp test-integration-appsignal test-integration-aws test-integration-axiom test-integration-azure test-integration-chronicle test-integration-clickhouse
test-integration: test-integration-databend test-integration-docker-logs test-integration-elasticsearch
test-integration: test-integration-eventstoredb test-integration-fluent test-integration-gcp test-integration-humio test-integration-http-client test-integration-influxdb
test-integration: test-integration-eventstoredb test-integration-fluent test-integration-gcp test-integration-greptimedb test-integration-humio test-integration-http-client test-integration-influxdb
test-integration: test-integration-kafka test-integration-logstash test-integration-loki test-integration-mongodb test-integration-nats
test-integration: test-integration-nginx test-integration-opentelemetry test-integration-postgres test-integration-prometheus test-integration-pulsar
test-integration: test-integration-redis test-integration-splunk test-integration-dnstap test-integration-datadog-agent test-integration-datadog-logs
Expand Down
8 changes: 8 additions & 0 deletions scripts/integration/greptimedb/compose.yaml
@@ -0,0 +1,8 @@
version: '3'

services:
greptimedb:
image: docker.io/greptime/greptimedb:${CONFIG_VERSION}
command: "standalone start --http-addr=0.0.0.0:4000 --rpc-addr=0.0.0.0:4001"
healthcheck:
test: "curl -f localhost:4000/health || exit 1"
12 changes: 12 additions & 0 deletions scripts/integration/greptimedb/test.yaml
@@ -0,0 +1,12 @@
features:
- greptimedb-integration-tests

test_filter: '::greptimedb::'

runner:
env:
GREPTIMEDB_ENDPOINT: greptimedb:4001
GREPTIMEDB_HTTP: http://greptimedb:4000

matrix:
version: ['latest']
39 changes: 39 additions & 0 deletions src/sinks/greptimedb/batch.rs
@@ -0,0 +1,39 @@
use vector_core::{
event::{Metric, MetricValue},
stream::batcher::limiter::ItemBatchSize,
};

#[derive(Default)]
pub(super) struct GreptimeDBBatchSizer;

impl GreptimeDBBatchSizer {
pub(super) fn estimated_size_of(&self, item: &Metric) -> usize {
// Metric name.
item.series().name().name().len()
// Metric namespace, with an additional 1 to account for the namespace separator.
+ item.series().name().namespace().map(|s| s.len() + 1).unwrap_or(0)
// Metric tags, with an additional 1 per tag to account for the tag key/value separator.
+ item.series().tags().map(|t| {
t.iter_all().map(|(k, v)| {
k.len() + 1 + v.map(|v| v.len()).unwrap_or(0)
})
.sum()
})
.unwrap_or(0)
+
// value size
match item.value() {
MetricValue::Counter { .. } | MetricValue::Gauge { .. } | MetricValue::Set { ..} => 8,
MetricValue::Distribution { .. } => 8 * 10,
MetricValue::AggregatedHistogram { buckets, .. } => 8 * (buckets.len() + 2),
MetricValue::AggregatedSummary { quantiles, .. } => 8 * (quantiles.len() + 2),
MetricValue::Sketch { .. } => 8 * 10,
sunng87 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

impl ItemBatchSize<Metric> for GreptimeDBBatchSizer {
fn size(&self, item: &Metric) -> usize {
self.estimated_size_of(item)
}
}
88 changes: 88 additions & 0 deletions src/sinks/greptimedb/integration_tests.rs
@@ -0,0 +1,88 @@
use chrono::{DateTime, Duration, Utc};
use futures::stream;
use vector_core::event::{Event, Metric, MetricKind, MetricValue};
use vector_core::metric_tags;

use crate::sinks::util::test::load_sink;
use crate::{
config::{SinkConfig, SinkContext},
test_util::{
components::{run_and_assert_sink_compliance, SINK_TAGS},
trace_init,
},
};

use super::GreptimeDBConfig;

#[tokio::test]
async fn test_greptimedb_sink() {
trace_init();
let cfg = format!(
r#"endpoint= "{}"
"#,
std::env::var("GREPTIMEDB_ENDPOINT").unwrap_or_else(|_| "localhost:4001".to_owned())
);

let (config, _) = load_sink::<GreptimeDBConfig>(&cfg).unwrap();
let (sink, _hc) = config.build(SinkContext::default()).await.unwrap();

let query_client = query_client();

// Drop the table and data inside
let _ = query_client
.get(&format!(
"{}/v1/sql",
std::env::var("GREPTIMEDB_HTTP").unwrap_or_else(|_| "http://localhost:4000".to_owned())
))
.query(&[("sql", "DROP TABLE ns_my_counter")])
.send()
.await
.unwrap();

let base_time = Utc::now();
let events: Vec<_> = (0..10).map(|idx| create_event(idx, base_time)).collect();
run_and_assert_sink_compliance(sink, stream::iter(events), &SINK_TAGS).await;

let query_response = query_client
.get(&format!(
"{}/v1/sql",
std::env::var("GREPTIMEDB_HTTP").unwrap_or_else(|_| "http://localhost:4000".to_owned())
))
.query(&[("sql", "SELECT region, val FROM ns_my_counter")])
.send()
.await
.unwrap()
.text()
.await
.expect("Fetch json from greptimedb failed");
let result: serde_json::Value =
serde_json::from_str(&query_response).expect("Invalid json returned from greptimedb query");
assert_eq!(
result
.pointer("/output/0/records/rows")
.and_then(|v| v.as_array())
.expect("Error getting greptimedb response array")
.len(),
10
)
}

fn query_client() -> reqwest::Client {
reqwest::Client::builder().build().unwrap()
}

fn create_event(i: i32, base_time: DateTime<Utc>) -> Event {
Event::Metric(
Metric::new(
"my_counter".to_owned(),
MetricKind::Incremental,
MetricValue::Counter { value: i as f64 },
)
.with_namespace(Some("ns"))
.with_tags(Some(metric_tags!(
"region" => "us-west-1",
"production" => "true",
)))
.with_timestamp(Some(base_time + Duration::seconds(i as i64))),
)
}