Skip to content

Commit

Permalink
refactor: address review issues
Browse files Browse the repository at this point in the history
  • Loading branch information
sunng87 committed Jul 8, 2023
1 parent 289f4a8 commit e247b68
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 30 deletions.
19 changes: 14 additions & 5 deletions src/sinks/greptimedb/batch.rs
Expand Up @@ -3,6 +3,13 @@ use vector_core::{
stream::batcher::limiter::ItemBatchSize,
};

use super::request_builder::{
DISTRIBUTION_QUANTILES, DISTRIBUTION_STAT_FIELD_COUNT, SUMMARY_STAT_FIELD_COUNT,
};

const F64_BYTE_SIZE: usize = 8;
const I64_BYTE_SIZE: usize = 8;

#[derive(Default)]
pub(super) struct GreptimeDBBatchSizer;

Expand All @@ -20,14 +27,16 @@ impl GreptimeDBBatchSizer {
.sum()
})
.unwrap_or(0)
// timestamp
+ I64_BYTE_SIZE
+
// value size
match item.value() {
MetricValue::Counter { .. } | MetricValue::Gauge { .. } | MetricValue::Set { ..} => 8,
MetricValue::Distribution { .. } => 8 * 10,
MetricValue::AggregatedHistogram { buckets, .. } => 8 * (buckets.len() + 2),
MetricValue::AggregatedSummary { quantiles, .. } => 8 * (quantiles.len() + 2),
MetricValue::Sketch { .. } => 8 * 10,
MetricValue::Counter { .. } | MetricValue::Gauge { .. } | MetricValue::Set { ..} => F64_BYTE_SIZE,
MetricValue::Distribution { .. } => F64_BYTE_SIZE * (DISTRIBUTION_QUANTILES.len() + DISTRIBUTION_STAT_FIELD_COUNT),
MetricValue::AggregatedHistogram { buckets, .. } => F64_BYTE_SIZE * (buckets.len() + SUMMARY_STAT_FIELD_COUNT),
MetricValue::AggregatedSummary { quantiles, .. } => F64_BYTE_SIZE * (quantiles.len() + SUMMARY_STAT_FIELD_COUNT),
MetricValue::Sketch { .. } => F64_BYTE_SIZE * (DISTRIBUTION_QUANTILES.len() + DISTRIBUTION_STAT_FIELD_COUNT),
}
}
}
Expand Down
83 changes: 61 additions & 22 deletions src/sinks/greptimedb/request_builder.rs
Expand Up @@ -7,6 +7,10 @@ use vector_core::metrics::AgentDDSketch;

use crate::sinks::util::statistic::DistributionStatistic;

pub(super) const DISTRIBUTION_QUANTILES: [f64; 5] = [0.5, 0.75, 0.90, 0.95, 0.99];
pub(super) const DISTRIBUTION_STAT_FIELD_COUNT: usize = 5;
pub(super) const SUMMARY_STAT_FIELD_COUNT: usize = 2;

fn f64_field(name: &str, value: f64) -> Column {
Column {
column_name: name.to_owned(),
Expand Down Expand Up @@ -112,9 +116,7 @@ pub(super) fn metric_to_insert_request(metric: Metric) -> InsertRequest {
}

fn encode_distribution(samples: &[Sample], columns: &mut Vec<Column>) {
if let Some(stats) =
DistributionStatistic::from_samples(samples, &[0.5, 0.75, 0.90, 0.95, 0.99])
{
if let Some(stats) = DistributionStatistic::from_samples(samples, &DISTRIBUTION_QUANTILES) {
columns.push(f64_field("min", stats.min));
columns.push(f64_field("max", stats.max));
columns.push(f64_field("avg", stats.avg));
Expand Down Expand Up @@ -159,20 +161,11 @@ fn encode_sketch(sketch: &AgentDDSketch, columns: &mut Vec<Column>) {
columns.push(f64_field("avg", avg));
}

if let Some(quantile) = sketch.quantile(0.5) {
columns.push(f64_field("p50", quantile));
}
if let Some(quantile) = sketch.quantile(0.75) {
columns.push(f64_field("p75", quantile));
}
if let Some(quantile) = sketch.quantile(0.90) {
columns.push(f64_field("p90", quantile));
}
if let Some(quantile) = sketch.quantile(0.95) {
columns.push(f64_field("p95", quantile));
}
if let Some(quantile) = sketch.quantile(0.99) {
columns.push(f64_field("p99", quantile));
for q in DISTRIBUTION_QUANTILES {
if let Some(quantile) = sketch.quantile(q) {
let column_name = format!("p{:02}", q * 100f64);
columns.push(f64_field(&column_name, quantile));
}
}
}

Expand Down Expand Up @@ -265,7 +258,10 @@ mod tests {
},
);
let insert = metric_to_insert_request(metric);
assert_eq!(insert.columns.len(), 11);
assert_eq!(
insert.columns.len(),
1 + DISTRIBUTION_STAT_FIELD_COUNT + DISTRIBUTION_QUANTILES.len()
);

assert_eq!(get_column(&insert.columns, "max"), 3.0);
assert_eq!(get_column(&insert.columns, "min"), 1.0);
Expand All @@ -281,17 +277,22 @@ mod tests {

#[test]
fn test_histogram() {
let buckets = vector_core::buckets![1.0 => 1, 2.0 => 2, 3.0 => 1];
let buckets_len = buckets.len();
let metric = Metric::new(
"cpu_seconds_total",
MetricKind::Incremental,
MetricValue::AggregatedHistogram {
buckets: vector_core::buckets![1.0 => 1, 2.0 => 2, 3.0 => 1],
buckets,
count: 4,
sum: 8.0,
},
);
let insert = metric_to_insert_request(metric);
assert_eq!(insert.columns.len(), 6);
assert_eq!(
insert.columns.len(),
1 + SUMMARY_STAT_FIELD_COUNT + buckets_len
);

assert_eq!(get_column(&insert.columns, "b1"), 1.0);
assert_eq!(get_column(&insert.columns, "b2"), 2.0);
Expand All @@ -302,23 +303,61 @@ mod tests {

#[test]
fn test_summary() {
let quantiles = vector_core::quantiles![0.01 => 1.5, 0.5 => 2.0, 0.99 => 3.0];
let quantiles_len = quantiles.len();
let metric = Metric::new(
"cpu_seconds_total",
MetricKind::Incremental,
MetricValue::AggregatedSummary {
quantiles: vector_core::quantiles![0.01 => 1.5, 0.5 => 2.0, 0.99 => 3.0],
quantiles,
count: 6,
sum: 12.0,
},
);

let insert = metric_to_insert_request(metric);
assert_eq!(insert.columns.len(), 6);
assert_eq!(
insert.columns.len(),
1 + SUMMARY_STAT_FIELD_COUNT + quantiles_len
);

assert_eq!(get_column(&insert.columns, "p01"), 1.5);
assert_eq!(get_column(&insert.columns, "p50"), 2.0);
assert_eq!(get_column(&insert.columns, "p99"), 3.0);
assert_eq!(get_column(&insert.columns, "count"), 6.0);
assert_eq!(get_column(&insert.columns, "sum"), 12.0);
}

#[test]
fn test_sketch() {
let mut sketch = AgentDDSketch::with_agent_defaults();
let samples = 10;
for i in 0..samples {
sketch.insert(i as f64);
}

let metric = Metric::new(
"cpu_seconds_total",
MetricKind::Incremental,
MetricValue::Sketch {
sketch: MetricSketch::AgentDDSketch(sketch),
},
);

let insert = metric_to_insert_request(metric);
assert_eq!(
insert.columns.len(),
1 + DISTRIBUTION_QUANTILES.len() + DISTRIBUTION_STAT_FIELD_COUNT
);

assert!(get_column(&insert.columns, "p50") <= 4.0);
assert!(get_column(&insert.columns, "p95") > 8.0);
assert!(get_column(&insert.columns, "p95") <= 9.0);
assert!(get_column(&insert.columns, "p99") > 8.0);
assert!(get_column(&insert.columns, "p99") <= 9.0);
assert_eq!(get_column(&insert.columns, "count"), samples as f64);
assert_eq!(get_column(&insert.columns, "sum"), 45.0);
assert_eq!(get_column(&insert.columns, "max"), 9.0);
assert_eq!(get_column(&insert.columns, "min"), 0.0);
}
}
10 changes: 7 additions & 3 deletions src/sinks/greptimedb/service.rs
Expand Up @@ -143,9 +143,13 @@ impl GreptimeDBService {
.as_ref()
.ok_or(GreptimeDBConfigError::TlsMissingKey)?;

if tls_config.key_pass.is_some() {
if tls_config.key_pass.is_some()
|| tls_config.alpn_protocols.is_some()
|| tls_config.verify_certificate.is_some()
|| tls_config.verify_hostname.is_some()
{
warn!(
message = "TLS key file with password is not supported by greptimedb client at the moment."
message = "TlsConfig: key_pass, alpn_protocols, verify_certificate and verify_hostname are not supported by greptimedb client at the moment."
);
}

Expand Down Expand Up @@ -174,7 +178,7 @@ impl Service<GreptimeDBRequest> for GreptimeDBService {
let client = Arc::clone(&self.client);

Box::pin(async move {
let metadata = req.get_metadata().clone();
let metadata = req.metadata;
let result = client.insert(req.items).await?;

Ok(GreptimeDBBatchOutput {
Expand Down

0 comments on commit e247b68

Please sign in to comment.