From 90e354cdc4db6fd5747644996c553f863c60b6e1 Mon Sep 17 00:00:00 2001 From: Alex Gavrisco Date: Sat, 9 May 2020 22:49:11 +0300 Subject: [PATCH 1/9] Instrument splunk_hec sink with metrics Signed-off-by: Alex Gavrisco --- src/internal_events/mod.rs | 2 ++ src/internal_events/splunk_hec.rs | 48 +++++++++++++++++++++++++++++++ src/sinks/splunk_hec.rs | 21 ++++++++++++-- 3 files changed, 68 insertions(+), 3 deletions(-) create mode 100644 src/internal_events/splunk_hec.rs diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index ab3b97306ce5a..e61bcbd152c5f 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -6,6 +6,7 @@ mod lua; #[cfg(feature = "sources-prometheus")] mod prometheus; mod regex; +mod splunk_hec; mod syslog; mod tcp; mod udp; @@ -20,6 +21,7 @@ pub use self::lua::*; #[cfg(feature = "sources-prometheus")] pub use self::prometheus::*; pub use self::regex::*; +pub use self::splunk_hec::*; pub use self::syslog::*; pub use self::tcp::*; pub use self::udp::*; diff --git a/src/internal_events/splunk_hec.rs b/src/internal_events/splunk_hec.rs new file mode 100644 index 0000000000000..0224c5623e2ff --- /dev/null +++ b/src/internal_events/splunk_hec.rs @@ -0,0 +1,48 @@ +use super::InternalEvent; +use metrics::counter; +use serde_json::Error; + +#[derive(Debug)] +pub struct SplunkEventSent { + pub byte_size: usize, +} + +impl InternalEvent for SplunkEventSent { + fn emit_metrics(&self) { + counter!( + "events_processed", 1, + "component_kind" => "sink", + "component_type" => "splunk_hec", + ); + counter!( + "bytes_processed", self.byte_size as u64, + "component_kind" => "sink", + "component_type" => "splunk_hec", + ); + } +} + + +#[derive(Debug)] +pub struct SplunkEventEncodeError { + pub error: Error, +} + + +impl InternalEvent for SplunkEventEncodeError { + fn emit_logs(&self) { + error!( + message = "Error encoding Splunk HEC event to json", + error = ?self.error, + rate_limit_secs = 30, + ); + } + + fn emit_metrics(&self) { + counter!( + "encode_errors", 1, + "component_kind" => "sink", + "component_type" => "splunk_hec", + ); + } +} diff --git a/src/sinks/splunk_hec.rs b/src/sinks/splunk_hec.rs index 5ce688f9ec4ad..f12d334c176e7 100644 --- a/src/sinks/splunk_hec.rs +++ b/src/sinks/splunk_hec.rs @@ -1,6 +1,7 @@ use crate::{ dns::Resolver, event::{self, Event, LogEvent, Value}, + internal_events::{SplunkEventEncodeError, SplunkEventSent}, sinks::util::{ encoding::{EncodingConfigWithDefault, EncodingConfiguration}, http::{BatchedHttpSink, HttpClient, HttpSink}, @@ -159,9 +160,23 @@ impl HttpSink for HecSinkConfig { body["sourcetype"] = json!(sourcetype); } - serde_json::to_vec(&body) - .map_err(|e| error!("Error encoding json body: {}", e)) - .ok() + match serde_json::to_vec(&body) { + Ok(value) => { + emit!(SplunkEventSent { + byte_size: value.len() + }); + Some(value) + } + Err(e) => { + emit!(SplunkEventEncodeError { + error: e + }); + None + } + } + // serde_json::to_vec(&body) + // .map_err(|e| error!("Error encoding json body: {}", e)) + // .ok() } fn build_request(&self, events: Self::Output) -> http::Request> { From 97896d559a4d1d37e35e6571bfa60f7e302298b1 Mon Sep 17 00:00:00 2001 From: Alex Gavrisco Date: Sat, 9 May 2020 22:50:15 +0300 Subject: [PATCH 2/9] Fix formatting Signed-off-by: Alex Gavrisco --- src/internal_events/splunk_hec.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/internal_events/splunk_hec.rs b/src/internal_events/splunk_hec.rs index 0224c5623e2ff..9d086dfd5239f 100644 --- a/src/internal_events/splunk_hec.rs +++ b/src/internal_events/splunk_hec.rs @@ -22,13 +22,11 @@ impl InternalEvent for SplunkEventSent { } } - #[derive(Debug)] pub struct SplunkEventEncodeError { pub error: Error, } - impl InternalEvent for SplunkEventEncodeError { fn emit_logs(&self) { error!( From 3ff9b84b05175ed649ffe65f685f054048ce37e1 Mon Sep 17 00:00:00 2001 From: Alex Gavrisco Date: Fri, 15 May 2020 20:07:52 +0300 Subject: [PATCH 3/9] Instrument aws_kinesis_streams sink with counter metrics Signed-off-by: Alex Gavrisco --- src/internal_events/aws_kinesis_streams.rs | 22 ++++++++++++++++++++++ src/internal_events/mod.rs | 2 ++ src/sinks/aws_kinesis_streams.rs | 4 ++++ 3 files changed, 28 insertions(+) create mode 100644 src/internal_events/aws_kinesis_streams.rs diff --git a/src/internal_events/aws_kinesis_streams.rs b/src/internal_events/aws_kinesis_streams.rs new file mode 100644 index 0000000000000..0f74655ac91fa --- /dev/null +++ b/src/internal_events/aws_kinesis_streams.rs @@ -0,0 +1,22 @@ +use super::InternalEvent; +use metrics::counter; + +#[derive(Debug)] +pub struct AwsKinesisStreamsEventSent { + pub byte_size: usize, +} + +impl InternalEvent for AwsKinesisStreamsEventSent { + fn emit_metrics(&self) { + counter!( + "events_processed", 1, + "component_kind" => "sink", + "component_type" => "aws_kinesis_streams", + ); + counter!( + "bytes_processed", self.byte_size as u64, + "component_kind" => "sink", + "component_type" => "aws_kinesis_streams", + ); + } +} diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index e61bcbd152c5f..f48d53189906b 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -1,3 +1,4 @@ +mod aws_kinesis_streams; mod blackhole; mod elasticsearch; mod file; @@ -13,6 +14,7 @@ mod udp; mod unix; mod vector; +pub use self::aws_kinesis_streams::*; pub use self::blackhole::*; pub use self::elasticsearch::*; pub use self::file::*; diff --git a/src/sinks/aws_kinesis_streams.rs b/src/sinks/aws_kinesis_streams.rs index 9ec1150a4d2b0..915edbb01d703 100644 --- a/src/sinks/aws_kinesis_streams.rs +++ b/src/sinks/aws_kinesis_streams.rs @@ -1,6 +1,7 @@ use crate::{ dns::Resolver, event::{self, Event}, + internal_events::AwsKinesisStreamsEventSent, region::RegionOrEndpoint, sinks::util::{ encoding::{EncodingConfig, EncodingConfiguration}, @@ -251,6 +252,9 @@ fn encode_event( let data = Bytes::from(data); + emit!(AwsKinesisStreamsEventSent { + byte_size: data.len() + }); Some(PutRecordsRequestEntry { data, partition_key, From 4fdcf83a09e69b0d16300d4b54e561a8818bcf28 Mon Sep 17 00:00:00 2001 From: Alex Gavrisco Date: Fri, 15 May 2020 21:30:01 +0300 Subject: [PATCH 4/9] Instrument "add_fields" transform with counter metric Signed-off-by: Alex Gavrisco --- src/internal_events/add_fields.rs | 14 ++++++++++++++ src/internal_events/mod.rs | 2 ++ src/transforms/add_fields.rs | 3 ++- 3 files changed, 18 insertions(+), 1 deletion(-) create mode 100644 src/internal_events/add_fields.rs diff --git a/src/internal_events/add_fields.rs b/src/internal_events/add_fields.rs new file mode 100644 index 0000000000000..74fcceb6eedef --- /dev/null +++ b/src/internal_events/add_fields.rs @@ -0,0 +1,14 @@ +use super::InternalEvent; +use metrics::counter; + +#[derive(Debug)] +pub struct AddFieldsEventProcessed; + +impl InternalEvent for RegexEventProcessed { + fn emit_metrics(&self) { + counter!("events_processed", 1, + "component_kind" => "transform", + "component_type" => "add_fields", + ); + } +} diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index f48d53189906b..2c548d5863040 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -1,3 +1,4 @@ +mod add_fields; mod aws_kinesis_streams; mod blackhole; mod elasticsearch; @@ -14,6 +15,7 @@ mod udp; mod unix; mod vector; +pub use self::add_fields::*; pub use self::aws_kinesis_streams::*; pub use self::blackhole::*; pub use self::elasticsearch::*; diff --git a/src/transforms/add_fields.rs b/src/transforms/add_fields.rs index a060fe2fd1ed0..85b6f85a3105f 100644 --- a/src/transforms/add_fields.rs +++ b/src/transforms/add_fields.rs @@ -2,6 +2,7 @@ use super::Transform; use crate::serde::Fields; use crate::{ event::{Event, Value}, + internal_events::AddFieldsEventProcessed, template::Template, topology::config::{DataType, TransformConfig, TransformContext, TransformDescription}, }; @@ -124,7 +125,7 @@ impl Transform for AddFields { } } } - + emit!(AddFieldsEventProcessed); Some(event) } } From 4c95dcbc42a0a7b4c9b4114885ec6fd6dff2f897 Mon Sep 17 00:00:00 2001 From: Alex Gavrisco Date: Fri, 15 May 2020 21:43:21 +0300 Subject: [PATCH 5/9] Fix typo Signed-off-by: Alex Gavrisco --- src/internal_events/add_fields.rs | 2 +- src/transforms/add_fields.rs | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/internal_events/add_fields.rs b/src/internal_events/add_fields.rs index 74fcceb6eedef..b31727b8db7e1 100644 --- a/src/internal_events/add_fields.rs +++ b/src/internal_events/add_fields.rs @@ -4,7 +4,7 @@ use metrics::counter; #[derive(Debug)] pub struct AddFieldsEventProcessed; -impl InternalEvent for RegexEventProcessed { +impl InternalEvent for AddFieldsEventProcessed { fn emit_metrics(&self) { counter!("events_processed", 1, "component_kind" => "transform", diff --git a/src/transforms/add_fields.rs b/src/transforms/add_fields.rs index 85b6f85a3105f..ba368cca760bf 100644 --- a/src/transforms/add_fields.rs +++ b/src/transforms/add_fields.rs @@ -90,6 +90,8 @@ impl AddFields { impl Transform for AddFields { fn transform(&mut self, mut event: Event) -> Option { + emit!(AddFieldsEventProcessed); + for (key, value_or_template) in self.fields.clone() { let value = match value_or_template { TemplateOrValue::Template(v) => match v.render_string(&event) { @@ -125,7 +127,7 @@ impl Transform for AddFields { } } } - emit!(AddFieldsEventProcessed); + Some(event) } } From bfd0d28f58162a8e4311e43069e9f9d6609da153 Mon Sep 17 00:00:00 2001 From: Alex Gavrisco Date: Sat, 16 May 2020 16:54:29 +0300 Subject: [PATCH 6/9] Instrument json_parser with metrics Signed-off-by: Alex Gavrisco --- src/internal_events/json.rs | 39 +++++++++++++++++++++++++++++++++++ src/internal_events/mod.rs | 2 ++ src/transforms/json_parser.rs | 13 ++++++------ 3 files changed, 48 insertions(+), 6 deletions(-) create mode 100644 src/internal_events/json.rs diff --git a/src/internal_events/json.rs b/src/internal_events/json.rs new file mode 100644 index 0000000000000..6d7f11c166d4a --- /dev/null +++ b/src/internal_events/json.rs @@ -0,0 +1,39 @@ +use super::InternalEvent; +use metrics::counter; + +#[derive(Debug)] +pub struct JsonEventProcessed; + +impl InternalEvent for JsonEventProcessed { + fn emit_metrics(&self) { + counter!("events_processed", 1, + "component_kind" => "transform", + "component_type" => "json_parser", + ); + } +} + +#[derive(Debug)] +pub struct JsonFailedParse<'a> { + pub field: &'a Atom, + pub error: Error, +} + +impl InternalEvent for JsonFailedParse { + fn emit_logs(&self) { + warn!( + message = "Event failed to parse as JSON", + field = %self.field(), + %error, + rate_limit_secs = 30 + ) + } + + fn emit_metrics(&self) { + counter!("processing_error", 1, + "component_kind" => "transform", + "component_type" => "json_parser", + "error_type" => "failed_parse", + ); + } +} diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index 2c548d5863040..532039b9bf098 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -3,6 +3,7 @@ mod aws_kinesis_streams; mod blackhole; mod elasticsearch; mod file; +mod json; #[cfg(feature = "transforms-lua")] mod lua; #[cfg(feature = "sources-prometheus")] @@ -20,6 +21,7 @@ pub use self::aws_kinesis_streams::*; pub use self::blackhole::*; pub use self::elasticsearch::*; pub use self::file::*; +pub use self::json::*; #[cfg(feature = "transforms-lua")] pub use self::lua::*; #[cfg(feature = "sources-prometheus")] diff --git a/src/transforms/json_parser.rs b/src/transforms/json_parser.rs index 09ec49d5c1b54..2cb21b52c8ce2 100644 --- a/src/transforms/json_parser.rs +++ b/src/transforms/json_parser.rs @@ -1,6 +1,7 @@ use super::Transform; use crate::{ event::{self, Event}, + internal_events::{JsonParserEventProcessed, JsonFailedParse}, topology::config::{DataType, TransformConfig, TransformContext, TransformDescription}, }; use serde::{Deserialize, Serialize}; @@ -74,16 +75,16 @@ impl Transform for JsonParser { let log = event.as_mut_log(); let to_parse = log.get(&self.field).map(|s| s.as_bytes()); + emit!(JsonParserEventProcessed); + let parsed = to_parse .and_then(|to_parse| { serde_json::from_slice::(to_parse.as_ref()) .map_err(|error| { - warn!( - message = "Event failed to parse as JSON", - field = self.field.as_ref(), - %error, - rate_limit_secs = 30 - ) + emit!(JsonFailedParse { + field: &self.field, + error: error + }) }) .ok() }) From d9c17f91f71f97f537e0a2169553951b755720c4 Mon Sep 17 00:00:00 2001 From: Alex Gavrisco Date: Sat, 16 May 2020 16:56:22 +0300 Subject: [PATCH 7/9] Fix formatting Signed-off-by: Alex Gavrisco --- src/sinks/splunk_hec.rs | 4 +--- src/transforms/json_parser.rs | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/sinks/splunk_hec.rs b/src/sinks/splunk_hec.rs index f12d334c176e7..f8362ce0ab53c 100644 --- a/src/sinks/splunk_hec.rs +++ b/src/sinks/splunk_hec.rs @@ -168,9 +168,7 @@ impl HttpSink for HecSinkConfig { Some(value) } Err(e) => { - emit!(SplunkEventEncodeError { - error: e - }); + emit!(SplunkEventEncodeError { error: e }); None } } diff --git a/src/transforms/json_parser.rs b/src/transforms/json_parser.rs index 2cb21b52c8ce2..0a36d997f8c24 100644 --- a/src/transforms/json_parser.rs +++ b/src/transforms/json_parser.rs @@ -1,7 +1,7 @@ use super::Transform; use crate::{ event::{self, Event}, - internal_events::{JsonParserEventProcessed, JsonFailedParse}, + internal_events::{JsonFailedParse, JsonParserEventProcessed}, topology::config::{DataType, TransformConfig, TransformContext, TransformDescription}, }; use serde::{Deserialize, Serialize}; From c9dbd1563b37a1f489931f6a1dfc8b391319b104 Mon Sep 17 00:00:00 2001 From: Alex Gavrisco Date: Sat, 16 May 2020 17:38:56 +0300 Subject: [PATCH 8/9] Fix typos Signed-off-by: Alex Gavrisco --- src/internal_events/json.rs | 8 +++++--- src/transforms/json_parser.rs | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/internal_events/json.rs b/src/internal_events/json.rs index 6d7f11c166d4a..ced6d9200fd20 100644 --- a/src/internal_events/json.rs +++ b/src/internal_events/json.rs @@ -1,5 +1,7 @@ use super::InternalEvent; use metrics::counter; +use serde_json::Error; +use string_cache::DefaultAtom as Atom; #[derive(Debug)] pub struct JsonEventProcessed; @@ -19,12 +21,12 @@ pub struct JsonFailedParse<'a> { pub error: Error, } -impl InternalEvent for JsonFailedParse { +impl InternalEvent for JsonFailedParse<'_> { fn emit_logs(&self) { warn!( message = "Event failed to parse as JSON", - field = %self.field(), - %error, + field = %self.field, + %self.error, rate_limit_secs = 30 ) } diff --git a/src/transforms/json_parser.rs b/src/transforms/json_parser.rs index 0a36d997f8c24..f195d9940124f 100644 --- a/src/transforms/json_parser.rs +++ b/src/transforms/json_parser.rs @@ -1,7 +1,7 @@ use super::Transform; use crate::{ event::{self, Event}, - internal_events::{JsonFailedParse, JsonParserEventProcessed}, + internal_events::{JsonEventProcessed, JsonFailedParse}, topology::config::{DataType, TransformConfig, TransformContext, TransformDescription}, }; use serde::{Deserialize, Serialize}; @@ -75,7 +75,7 @@ impl Transform for JsonParser { let log = event.as_mut_log(); let to_parse = log.get(&self.field).map(|s| s.as_bytes()); - emit!(JsonParserEventProcessed); + emit!(JsonEventProcessed); let parsed = to_parse .and_then(|to_parse| { From bf0a9f6415d90b511a559a198dc39b3a12f898d3 Mon Sep 17 00:00:00 2001 From: Alex Gavrisco Date: Sat, 16 May 2020 18:46:24 +0300 Subject: [PATCH 9/9] Clean up comments Signed-off-by: Alex Gavrisco --- src/sinks/splunk_hec.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/sinks/splunk_hec.rs b/src/sinks/splunk_hec.rs index f8362ce0ab53c..03589317161b6 100644 --- a/src/sinks/splunk_hec.rs +++ b/src/sinks/splunk_hec.rs @@ -172,9 +172,6 @@ impl HttpSink for HecSinkConfig { None } } - // serde_json::to_vec(&body) - // .map_err(|e| error!("Error encoding json body: {}", e)) - // .ok() } fn build_request(&self, events: Self::Output) -> http::Request> {