Skip to content

Commit

Permalink
enhancement(internal_metrics source): instrument "stdin" source (vect…
Browse files Browse the repository at this point in the history
…ordotdev#3151)

Signed-off-by: Brian Menges <brian.menges@anaplan.com>
  • Loading branch information
JeanMertz authored and Brian Menges committed Dec 9, 2020
1 parent 928f742 commit 4302813
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 3 deletions.
2 changes: 2 additions & 0 deletions src/internal_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod lua;
mod prometheus;
mod regex;
mod splunk_hec;
mod stdin;
mod syslog;
mod tcp;
mod udp;
Expand All @@ -36,6 +37,7 @@ pub use self::lua::*;
pub use self::prometheus::*;
pub use self::regex::*;
pub use self::splunk_hec::*;
pub use self::stdin::*;
pub use self::syslog::*;
pub use self::tcp::*;
pub use self::udp::*;
Expand Down
45 changes: 45 additions & 0 deletions src/internal_events/stdin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use super::InternalEvent;
use metrics::counter;

#[derive(Debug)]
pub struct StdinEventReceived {
pub byte_size: usize,
}

impl InternalEvent for StdinEventReceived {
fn emit_logs(&self) {
trace!(message = "received one event.", rate_limit_secs = 10);
}

fn emit_metrics(&self) {
counter!(
"events_processed", 1,
"component_kind" => "source",
"component_type" => "stdin",
);
counter!(
"bytes_processed", self.byte_size as u64,
"component_kind" => "source",
"component_type" => "stdin",
);
}
}

#[derive(Debug)]
pub struct StdinReadFailed {
pub error: std::io::Error,
}

impl InternalEvent for StdinReadFailed {
fn emit_logs(&self) {
error!(message = "unable to read from source.", error = %self.error);
}

fn emit_metrics(&self) {
counter!(
"stdin_reads_failed", 1,
"component_kind" => "source",
"component_type" => "stdin",
);
}
}
12 changes: 9 additions & 3 deletions src/sources/stdin.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
event::{self, Event},
internal_events::{StdinEventReceived, StdinReadFailed},
shutdown::ShutdownSignal,
stream::StreamExt01,
topology::config::{DataType, GlobalOptions, SourceConfig, SourceDescription},
Expand Down Expand Up @@ -124,8 +125,8 @@ where

for line in stdin.lines() {
match line {
Err(e) => {
error!(message = "Unable to read from source.", error = %e);
Err(error) => {
emit!(StdinReadFailed { error });
break;
}
Ok(line) => {
Expand Down Expand Up @@ -163,7 +164,12 @@ where
Ok(Box::new(
Compat::new(receiver)
.take_until(shutdown)
.map(move |line| create_event(line, &host_key, &hostname))
.map(move |line| {
emit!(StdinEventReceived {
byte_size: line.len(),
});
create_event(line, &host_key, &hostname)
})
.map_err(|e| error!("error reading line: {:?}", e))
.forward(
out.sink_map_err(|e| error!(message = "Unable to send event to out.", error = %e)),
Expand Down

0 comments on commit 4302813

Please sign in to comment.