diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index f44edb4857b2b..2a3207b489aff 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -12,6 +12,7 @@ mod lua; mod prometheus; mod regex; mod splunk_hec; +mod stdin; mod syslog; mod tcp; mod udp; @@ -36,6 +37,7 @@ pub use self::lua::*; pub use self::prometheus::*; pub use self::regex::*; pub use self::splunk_hec::*; +pub use self::stdin::*; pub use self::syslog::*; pub use self::tcp::*; pub use self::udp::*; diff --git a/src/internal_events/stdin.rs b/src/internal_events/stdin.rs new file mode 100644 index 0000000000000..e0574e1f67706 --- /dev/null +++ b/src/internal_events/stdin.rs @@ -0,0 +1,45 @@ +use super::InternalEvent; +use metrics::counter; + +#[derive(Debug)] +pub struct StdinEventReceived { + pub byte_size: usize, +} + +impl InternalEvent for StdinEventReceived { + fn emit_logs(&self) { + trace!(message = "received one event.", rate_limit_secs = 10); + } + + fn emit_metrics(&self) { + counter!( + "events_processed", 1, + "component_kind" => "source", + "component_type" => "stdin", + ); + counter!( + "bytes_processed", self.byte_size as u64, + "component_kind" => "source", + "component_type" => "stdin", + ); + } +} + +#[derive(Debug)] +pub struct StdinReadFailed { + pub error: std::io::Error, +} + +impl InternalEvent for StdinReadFailed { + fn emit_logs(&self) { + error!(message = "unable to read from source.", error = %self.error); + } + + fn emit_metrics(&self) { + counter!( + "stdin_reads_failed", 1, + "component_kind" => "source", + "component_type" => "stdin", + ); + } +} diff --git a/src/sources/stdin.rs b/src/sources/stdin.rs index 7f0fc78b58a63..cfc2dac2d936d 100644 --- a/src/sources/stdin.rs +++ b/src/sources/stdin.rs @@ -1,5 +1,6 @@ use crate::{ event::{self, Event}, + internal_events::{StdinEventReceived, StdinReadFailed}, shutdown::ShutdownSignal, stream::StreamExt01, topology::config::{DataType, GlobalOptions, SourceConfig, SourceDescription}, @@ -124,8 +125,8 @@ where for line in stdin.lines() { match line { - Err(e) => { - error!(message = "Unable to read from source.", error = %e); + Err(error) => { + emit!(StdinReadFailed { error }); break; } Ok(line) => { @@ -163,7 +164,12 @@ where Ok(Box::new( Compat::new(receiver) .take_until(shutdown) - .map(move |line| create_event(line, &host_key, &hostname)) + .map(move |line| { + emit!(StdinEventReceived { + byte_size: line.len(), + }); + create_event(line, &host_key, &hostname) + }) .map_err(|e| error!("error reading line: {:?}", e)) .forward( out.sink_map_err(|e| error!(message = "Unable to send event to out.", error = %e)),