Skip to content

Commit

Permalink
instrument some more stuff
Browse files Browse the repository at this point in the history
Signed-off-by: Luke Steensen <luke.steensen@gmail.com>
  • Loading branch information
lukesteensen committed Mar 13, 2020
1 parent 239c671 commit 4d48d69
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 2 deletions.
12 changes: 11 additions & 1 deletion src/sinks/util/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use bytes::Bytes;
use futures01::{
future, stream::iter_ok, try_ready, Async, AsyncSink, Future, Poll, Sink, StartSend,
};
use metrics::counter;
use serde::{Deserialize, Serialize};
use snafu::Snafu;
use std::io;
Expand Down Expand Up @@ -114,13 +115,15 @@ impl UnixSink {
self.path.to_str().unwrap(),
err
);
counter!("sinks.unix_socket_connection_failures", 1);
UnixSinkState::Backoff(self.next_delay())
}
Ok(Async::Ready(stream)) => {
debug!(
message = "connected",
path = &field::display(self.path.to_str().unwrap())
);
counter!("sinks.unix_socket_connections_established", 1);
self.backoff = Self::fresh_backoff();
let out = FramedWrite::new(stream, BytesCodec::new());
UnixSinkState::Open(out)
Expand Down Expand Up @@ -150,6 +153,7 @@ impl Sink for UnixSink {
type SinkError = ();

fn start_send(&mut self, line: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
let byte_count = line.len() as u64;
match self.poll_connection() {
Ok(Async::NotReady) => Ok(AsyncSink::NotReady(line)),
Err(_) => {
Expand All @@ -160,10 +164,15 @@ impl Sink for UnixSink {
let path = self.path.to_str().unwrap();
debug!(message = "disconnected.", path = &field::display(path));
error!("Error in connection {}: {}", path, err);
counter!("sinks.unix_socket_errors", 1);
self.state = UnixSinkState::Disconnected;
Ok(AsyncSink::Ready)
}
Ok(res) => Ok(res),
Ok(res) => {
counter!("sinks.unix_socket_events_sent", 1);
counter!("sinks.unix_socket_bytes_sent", byte_count);
Ok(res)
}
},
}
}
Expand All @@ -182,6 +191,7 @@ impl Sink for UnixSink {
let path = self.path.to_str().unwrap();
debug!(message = "disconnected.", path = &field::display(&path));
error!("Error in connection {}: {}", path, err);
counter!("sinks.unix_socket_errors", 1);
self.state = UnixSinkState::Disconnected;
Ok(Async::Ready(()))
}
Expand Down
4 changes: 4 additions & 0 deletions src/sinks/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
};
use bytes::{BufMut, Bytes, BytesMut};
use futures01::{stream::iter_ok, Sink};
use metrics::counter;
use prost::Message;
use serde::{Deserialize, Serialize};
use snafu::Snafu;
Expand Down Expand Up @@ -75,6 +76,9 @@ fn encode_event(event: Event) -> Option<Bytes> {
let event_len = event.encoded_len() as u32;
let full_len = event_len + 4;

counter!("sinks.vector.events", 1);
counter!("sinks.vector.total_bytes", full_len as u64);

let mut out = BytesMut::with_capacity(full_len as usize);
out.put_u32_be(event_len);
event.encode(&mut out).unwrap();
Expand Down
3 changes: 3 additions & 0 deletions src/sources/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
};
use bytes::{Bytes, BytesMut};
use futures01::sync::mpsc;
use metrics::counter;
use prost::Message;
use serde::{Deserialize, Serialize};
use tokio::codec::LengthDelimitedCodec;
Expand Down Expand Up @@ -79,10 +80,12 @@ impl TcpSource for VectorSource {
message = "Received one event.",
event = field::debug(&event)
);
counter!("sources.vector.events", 1);
Some(event)
}
Err(e) => {
error!("failed to parse protobuf message: {:?}", e);
counter!("sources.vector.parse_errors", 1);
None
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/transforms/lua.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use crate::{
event::{Event, Value},
topology::config::{DataType, TransformConfig, TransformContext, TransformDescription},
};
use metrics::{counter, gauge};
use metrics::{counter, gauge, timing};
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, Snafu};
use std::time::Instant;

#[derive(Debug, Snafu)]
enum BuildError {
Expand Down Expand Up @@ -93,6 +94,7 @@ impl Lua {
}

fn process(&mut self, event: Event) -> Result<Option<Event>, rlua::Error> {
let start = Instant::now();
let result = self.lua.context(|ctx| {
let globals = ctx.globals();

Expand All @@ -109,6 +111,7 @@ impl Lua {
self.lua.gc_collect()?;
self.invocations_after_gc = 0;
}
timing!("transforms.lua.processing_duration", start, Instant::now());

result
}
Expand Down

0 comments on commit 4d48d69

Please sign in to comment.