Skip to content

Commit

Permalink
feat(internal_metrics source): instrument "kafka" source
Browse files Browse the repository at this point in the history
ref: #2007
Signed-off-by: Jean Mertz <git@jeanmertz.com>
  • Loading branch information
JeanMertz committed Jul 24, 2020
1 parent fe69741 commit a3be286
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 3 deletions.
64 changes: 64 additions & 0 deletions src/internal_events/kafka.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use super::InternalEvent;
use metrics::counter;

#[derive(Debug)]
pub struct KafkaEventReceived {
pub byte_size: usize,
}

impl InternalEvent for KafkaEventReceived {
fn emit_logs(&self) {
trace!(message = "received one event.", rate_limit_secs = 10);
}

fn emit_metrics(&self) {
counter!(
"events_processed", 1,
"component_kind" => "source",
"component_type" => "kafka",
);
counter!(
"bytes_processed", self.byte_size as u64,
"component_kind" => "source",
"component_type" => "kafka",
);
}
}

#[derive(Debug)]
pub struct KafkaOffsetUpdateFailed {
pub error: rdkafka::error::KafkaError,
}

impl InternalEvent for KafkaOffsetUpdateFailed {
fn emit_logs(&self) {
error!(message = "unable to update consumer offset.", error = %self.error);
}

fn emit_metrics(&self) {
counter!(
"consumer_offset_updates_failed", 1,
"component_kind" => "source",
"component_type" => "kafka",
);
}
}

#[derive(Debug)]
pub struct KafkaEventFailed {
pub error: rdkafka::error::KafkaError,
}

impl InternalEvent for KafkaEventFailed {
fn emit_logs(&self) {
error!(message = "failed to read message.", error = %self.error);
}

fn emit_metrics(&self) {
counter!(
"events_failed", 1,
"component_kind" => "source",
"component_type" => "kafka",
);
}
}
2 changes: 2 additions & 0 deletions src/internal_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod blackhole;
mod elasticsearch;
mod file;
mod json;
mod kafka;
#[cfg(feature = "sources-kubernetes-logs")]
mod kubernetes_logs;
#[cfg(feature = "transforms-lua")]
Expand All @@ -28,6 +29,7 @@ pub use self::blackhole::*;
pub use self::elasticsearch::*;
pub use self::file::*;
pub use self::json::*;
pub use self::kafka::*;
#[cfg(feature = "sources-kubernetes-logs")]
pub use self::kubernetes_logs::*;
#[cfg(feature = "transforms-lua")]
Expand Down
13 changes: 10 additions & 3 deletions src/sources/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{
event::{self, Event},
kafka::KafkaAuthConfig,
shutdown::ShutdownSignal,
internal_events::{KafkaEventFailed, KafkaEventReceived, KafkaOffsetUpdateFailed},
topology::config::{DataType, GlobalOptions, SourceConfig, SourceDescription},
};
use bytes::Bytes;
Expand Down Expand Up @@ -113,10 +114,13 @@ fn kafka_source(

async move {
match message {
Err(e) => {
Err(error!(message = "Error reading message from Kafka", error = ?e))
Err(error) => {
emit!(KafkaEventFailed{ error: error.clone() });
Err(error!(message = "Error reading message from Kafka", error = ?error))
}
Ok(msg) => {
emit!(KafkaEventReceived{ byte_size: msg.payload_len() });

let payload = match msg.payload_view::<[u8]>() {
None => return Err(()), // skip messages with empty payload
Some(Err(e)) => {
Expand Down Expand Up @@ -156,7 +160,10 @@ fn kafka_source(
}
}

consumer.store_offset(&msg).map_err(|e| error!(message = "Cannot store offset for the message", error = ?e))?;
consumer.store_offset(&msg).map_err(|error| {
emit!(KafkaOffsetUpdateFailed { error: error.clone() });
error!(message = "Cannot store offset for the message", error = ?error)
})?;
Ok(event)
}
}
Expand Down

0 comments on commit a3be286

Please sign in to comment.