diff --git a/src/sinks/util/unix.rs b/src/sinks/util/unix.rs index 014ad52678e66..9e8e25dd8f89d 100644 --- a/src/sinks/util/unix.rs +++ b/src/sinks/util/unix.rs @@ -8,6 +8,7 @@ use bytes::Bytes; use futures01::{ future, stream::iter_ok, try_ready, Async, AsyncSink, Future, Poll, Sink, StartSend, }; +use metrics::counter; use serde::{Deserialize, Serialize}; use snafu::Snafu; use std::io; @@ -114,6 +115,7 @@ impl UnixSink { self.path.to_str().unwrap(), err ); + counter!("sinks.unix_socket_connection_failures", 1); UnixSinkState::Backoff(self.next_delay()) } Ok(Async::Ready(stream)) => { @@ -121,6 +123,7 @@ impl UnixSink { message = "connected", path = &field::display(self.path.to_str().unwrap()) ); + counter!("sinks.unix_socket_connections_established", 1); self.backoff = Self::fresh_backoff(); let out = FramedWrite::new(stream, BytesCodec::new()); UnixSinkState::Open(out) @@ -150,6 +153,7 @@ impl Sink for UnixSink { type SinkError = (); fn start_send(&mut self, line: Self::SinkItem) -> StartSend { + let byte_count = line.len() as u64; match self.poll_connection() { Ok(Async::NotReady) => Ok(AsyncSink::NotReady(line)), Err(_) => { @@ -160,10 +164,15 @@ impl Sink for UnixSink { let path = self.path.to_str().unwrap(); debug!(message = "disconnected.", path = &field::display(path)); error!("Error in connection {}: {}", path, err); + counter!("sinks.unix_socket_errors", 1); self.state = UnixSinkState::Disconnected; Ok(AsyncSink::Ready) } - Ok(res) => Ok(res), + Ok(res) => { + counter!("sinks.unix_socket_events_sent", 1); + counter!("sinks.unix_socket_bytes_sent", byte_count); + Ok(res) + } }, } } @@ -182,6 +191,7 @@ impl Sink for UnixSink { let path = self.path.to_str().unwrap(); debug!(message = "disconnected.", path = &field::display(&path)); error!("Error in connection {}: {}", path, err); + counter!("sinks.unix_socket_errors", 1); self.state = UnixSinkState::Disconnected; Ok(Async::Ready(())) } diff --git a/src/sinks/vector.rs b/src/sinks/vector.rs index e2d3d7d352149..9bf2d253cd2b3 100644 --- a/src/sinks/vector.rs +++ b/src/sinks/vector.rs @@ -8,6 +8,7 @@ use crate::{ }; use bytes::{BufMut, Bytes, BytesMut}; use futures01::{stream::iter_ok, Sink}; +use metrics::counter; use prost::Message; use serde::{Deserialize, Serialize}; use snafu::Snafu; @@ -75,6 +76,9 @@ fn encode_event(event: Event) -> Option { let event_len = event.encoded_len() as u32; let full_len = event_len + 4; + counter!("sinks.vector.events", 1); + counter!("sinks.vector.total_bytes", full_len as u64); + let mut out = BytesMut::with_capacity(full_len as usize); out.put_u32_be(event_len); event.encode(&mut out).unwrap(); diff --git a/src/sources/vector.rs b/src/sources/vector.rs index c1bf7835de7c0..e86327ec4da23 100644 --- a/src/sources/vector.rs +++ b/src/sources/vector.rs @@ -7,6 +7,7 @@ use crate::{ }; use bytes::{Bytes, BytesMut}; use futures01::sync::mpsc; +use metrics::counter; use prost::Message; use serde::{Deserialize, Serialize}; use tokio::codec::LengthDelimitedCodec; @@ -79,10 +80,12 @@ impl TcpSource for VectorSource { message = "Received one event.", event = field::debug(&event) ); + counter!("sources.vector.events", 1); Some(event) } Err(e) => { error!("failed to parse protobuf message: {:?}", e); + counter!("sources.vector.parse_errors", 1); None } } diff --git a/src/transforms/lua.rs b/src/transforms/lua.rs index ce7640ab0b971..a7eef792bc547 100644 --- a/src/transforms/lua.rs +++ b/src/transforms/lua.rs @@ -3,9 +3,10 @@ use crate::{ event::{Event, Value}, topology::config::{DataType, TransformConfig, TransformContext, TransformDescription}, }; -use metrics::{counter, gauge}; +use metrics::{counter, gauge, timing}; use serde::{Deserialize, Serialize}; use snafu::{ResultExt, Snafu}; +use std::time::Instant; #[derive(Debug, Snafu)] enum BuildError { @@ -93,6 +94,7 @@ impl Lua { } fn process(&mut self, event: Event) -> Result, rlua::Error> { + let start = Instant::now(); let result = self.lua.context(|ctx| { let globals = ctx.globals(); @@ -109,6 +111,7 @@ impl Lua { self.lua.gc_collect()?; self.invocations_after_gc = 0; } + timing!("transforms.lua.processing_duration", start, Instant::now()); result }