Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(new_relic sink): Multiple fixes related to metrics #18151

Merged
merged 26 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
05025b1
Convert tags into attributes.
asllop Sep 15, 2022
4cdef4e
Merge branch 'vectordotdev:master' into master
asllop Sep 15, 2022
087d4bf
Decompose metric into parts to reuse tags instead of cloning.
asllop Sep 16, 2022
2dd2668
Remove unnecessary blocks.
asllop Sep 16, 2022
6020efb
Set metric type.
asllop Sep 22, 2022
9e7d57c
Fix field name typo.
asllop Sep 22, 2022
567f236
Remove unnecessary brackets.
asllop Sep 23, 2022
c3b4784
Merge branch 'master' into master
asllop Sep 23, 2022
b755769
Cargo fmt.
asllop Oct 20, 2022
67943c9
Merge branch 'master' of https://github.com/asllop/vector
asllop Oct 20, 2022
48f16d8
Update src/sinks/new_relic/model.rs
asllop Nov 3, 2022
af14a7d
Update src/sinks/new_relic/model.rs
asllop Nov 3, 2022
137943c
Fix typo.
asllop Nov 3, 2022
2a2282c
Add tests for newly supported metrics.
asllop Nov 3, 2022
b72faed
Simplify metric type cases.
asllop Nov 3, 2022
84a8d61
Merge branch 'vectordotdev:master' into master
asllop Nov 4, 2022
41b41ab
Cargo fmt.
asllop Dec 12, 2022
f247c95
Merge remote-tracking branch 'origin/master' into asllop/master
jszwedko Aug 3, 2023
62addd3
Update to handle new tag model
jszwedko Aug 3, 2023
1f2bc83
PR feedback
jszwedko Aug 4, 2023
f256b33
refactor
dsmith3197 Aug 11, 2023
0ecfa81
add dropped event metrics
dsmith3197 Aug 22, 2023
c3769db
track unsupported metric types as well
dsmith3197 Aug 22, 2023
494b12e
merge master
dsmith3197 Aug 22, 2023
c11c958
feedback
dsmith3197 Sep 11, 2023
4e4943f
more feedback
dsmith3197 Sep 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 62 additions & 51 deletions src/sinks/new_relic/model.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
use std::{collections::HashMap, convert::TryFrom, fmt::Debug, time::SystemTime};
use std::{
collections::{BTreeMap, HashMap},
convert::TryFrom,
fmt::Debug,
time::SystemTime,
};

use chrono::{DateTime, Utc};
use ordered_float::NotNan;
use serde::{Deserialize, Serialize};

use super::NewRelicSinkError;
use crate::event::{Event, MetricValue, Value};
use crate::event::{Event, MetricKind, MetricValue, Value};

#[derive(Debug)]
pub enum NewRelicApiModel {
Expand All @@ -21,30 +26,9 @@ type DataStore = HashMap<String, Vec<KeyValData>>;
pub struct MetricsApiModel(pub Vec<DataStore>);

impl MetricsApiModel {
pub fn new(metric_array: Vec<(Value, Value, Value)>) -> Self {
let mut metric_data_array = vec![];
for (m_name, m_value, m_timestamp) in metric_array {
let mut metric_data = KeyValData::new();
metric_data.insert("name".to_owned(), m_name);
metric_data.insert("value".to_owned(), m_value);
match m_timestamp {
Value::Timestamp(ts) => {
metric_data.insert("timestamp".to_owned(), Value::from(ts.timestamp()));
}
Value::Integer(i) => {
metric_data.insert("timestamp".to_owned(), Value::from(i));
}
_ => {
metric_data.insert(
"timestamp".to_owned(),
Value::from(DateTime::<Utc>::from(SystemTime::now()).timestamp()),
);
}
}
metric_data_array.push(metric_data);
}
pub fn new(metric_array: Vec<KeyValData>) -> Self {
let mut metric_store = DataStore::new();
metric_store.insert("metrics".to_owned(), metric_data_array);
metric_store.insert("metrics".to_owned(), metric_array);
Self(vec![metric_store])
}
}
Expand All @@ -57,39 +41,66 @@ impl TryFrom<Vec<Event>> for MetricsApiModel {

for buf_event in buf_events {
if let Event::Metric(metric) = buf_event {
// Future improvement: put metric type. If type = count, NR metric model requires an interval.ms field, that is not provided by the Vector Metric model.
match metric.value() {
MetricValue::Gauge { value } => {
metric_array.push((
Value::from(metric.name().to_owned()),
Value::from(
NotNan::new(*value).map_err(|_| {
NewRelicSinkError::new("NaN value not supported")
})?,
),
Value::from(metric.timestamp()),
));
}
MetricValue::Counter { value } => {
metric_array.push((
Value::from(metric.name().to_owned()),
Value::from(
NotNan::new(*value).map_err(|_| {
NewRelicSinkError::new("NaN value not supported")
})?,
),
Value::from(metric.timestamp()),
));
// Generate Value::Object() from BTreeMap<String, String>
let (series, data, _) = metric.into_parts();
let attr = series.tags.map(|tags| {
Value::from(
tags.iter_single()
.map(|(key, value)| (key.to_string(), Value::from(value)))
.collect::<BTreeMap<_, _>>(),
)
});

dsmith3197 marked this conversation as resolved.
Show resolved Hide resolved
// We only handle gauge and counter metrics
if let MetricValue::Gauge { value } | MetricValue::Counter { value } = data.value {
let mut metric_data = KeyValData::new();
// Set name, value, and timestamp
metric_data.insert("name".to_owned(), Value::from(series.name.name));
metric_data.insert(
"value".to_owned(),
Value::from(
NotNan::new(value)
.map_err(|_| NewRelicSinkError::new("NaN value not supported"))?,
),
);
metric_data.insert(
"timestamp".to_owned(),
Value::from(
data.time
.timestamp
.unwrap_or_else(|| DateTime::<Utc>::from(SystemTime::now()))
.timestamp(),
),
);
if let Some(attr) = attr {
metric_data.insert("attributes".to_owned(), attr);
}
_ => {
// Unrecognized metric type
// Set type and type related attributes
if let (MetricValue::Counter { .. }, MetricKind::Incremental) =
(data.value, data.kind)
{
if let Some(interval_ms) = data.time.interval_ms {
metric_data.insert(
"interval.ms".to_owned(),
Value::from(interval_ms.get() as i64),
);
} else {
// Incremental counter without an interval is worthless, skip this metric
continue;
}
metric_data.insert("type".to_owned(), Value::from("count"));
} else {
// Anything that's not an incremental counter is considered a gauge, that is gauge and absolute counters metrics.
metric_data.insert("type".to_owned(), Value::from("gauge"));
}
dsmith3197 marked this conversation as resolved.
Show resolved Hide resolved

dsmith3197 marked this conversation as resolved.
Show resolved Hide resolved
metric_array.push(metric_data);
}
dsmith3197 marked this conversation as resolved.
Show resolved Hide resolved
}
}

if !metric_array.is_empty() {
Ok(MetricsApiModel::new(metric_array))
Ok(Self::new(metric_array))
} else {
Err(NewRelicSinkError::new("No valid metrics to generate"))
}
Expand Down
33 changes: 30 additions & 3 deletions src/sinks/new_relic/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, convert::TryFrom, time::SystemTime};
use std::{collections::HashMap, convert::TryFrom, num::NonZeroU32, time::SystemTime};

use chrono::{DateTime, Utc};
use futures::{future::ready, stream};
Expand Down Expand Up @@ -211,7 +211,7 @@ fn generate_metric_api_model() {
MetricsApiModel::try_from(vec![event]).expect("Failed mapping metrics into API model");
let metrics = model.0[0]
.get("metrics")
.expect("Logs data store not present");
.expect("Metric data store not present");

assert_eq!(metrics.len(), 1);
assert!(metrics[0].get("name").is_some());
Expand All @@ -235,7 +235,7 @@ fn generate_metric_api_model() {
MetricsApiModel::try_from(vec![event]).expect("Failed mapping metrics into API model");
let metrics = model.0[0]
.get("metrics")
.expect("Logs data store not present");
.expect("Metric data store not present");

assert_eq!(metrics.len(), 1);
assert!(metrics[0].get("name").is_some());
Expand All @@ -246,4 +246,31 @@ fn generate_metric_api_model() {
assert!(metrics[0].get("value").is_some());
assert_eq!(metrics[0].get("value").unwrap(), &Value::from(100.0));
assert!(metrics[0].get("timestamp").is_some());

// Incremental counter
let m = Metric::new(
"my_metric",
MetricKind::Incremental,
MetricValue::Counter { value: 100.0 },
)
.with_timestamp(Some(DateTime::<Utc>::from(SystemTime::now())))
.with_interval_ms(NonZeroU32::new(1000));
let event = Event::Metric(m);
let model =
MetricsApiModel::try_from(vec![event]).expect("Failed mapping metrics into API model");
let metrics = model.0[0]
.get("metrics")
.expect("Metric data store not present");

assert_eq!(metrics.len(), 1);
assert!(metrics[0].get("name").is_some());
assert_eq!(
metrics[0].get("name").unwrap().to_string_lossy(),
"my_metric".to_owned()
);
assert!(metrics[0].get("value").is_some());
assert_eq!(metrics[0].get("value").unwrap(), &Value::from(100.0));
assert!(metrics[0].get("timestamp").is_some());
assert!(metrics[0].get("interval.ms").is_some());
assert_eq!(metrics[0].get("interval.ms").unwrap(), &Value::from(1000));
}
Loading