Skip to content

Commit

Permalink
enhancement(prometheus source): Update instrumentation (vectordotdev#…
Browse files Browse the repository at this point in the history
…3317)

* Add instrumentation

Signed-off-by: Kruno Tomola Fabro <krunotf@gmail.com>

* Rename to request_duration_nanoseconds

Signed-off-by: Kruno Tomola Fabro <krunotf@gmail.com>
Signed-off-by: Brian Menges <brian.menges@anaplan.com>
  • Loading branch information
ktff authored and Brian Menges committed Dec 9, 2020
1 parent dcd0db7 commit 9fc065e
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 9 deletions.
46 changes: 41 additions & 5 deletions src/internal_events/prometheus.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,53 @@
use super::InternalEvent;
use crate::sources::prometheus::parser::ParserError;
use metrics::counter;
use metrics::{counter, timing};
use std::time::Instant;

#[derive(Debug)]
pub struct PrometheusRequestCompleted;
pub struct PrometheusEventReceived {
pub byte_size: usize,
pub count: usize,
}

impl InternalEvent for PrometheusEventReceived {
fn emit_logs(&self) {
debug!(message = "scraped events.", ?self.count);
}

fn emit_metrics(&self) {
counter!(
"events_processed", self.count as u64,
"component_kind" => "source",
"component_type" => "prometheus",
);
counter!(
"bytes_processed", self.byte_size as u64,
"component_kind" => "source",
"component_type" => "prometheus",
);
}
}

#[derive(Debug)]
pub struct PrometheusRequestCompleted {
pub start: Instant,
pub end: Instant,
}

impl InternalEvent for PrometheusRequestCompleted {
fn emit_logs(&self) {
debug!(message = "request completed.");
}

fn emit_metrics(&self) {
// TODO: make this a timer
counter!("requests_completed", 1,
"component_kind" => "source",
"component_type" => "prometheus",
);
timing!("request_duration_nanoseconds", self.start,self.end,
"component_kind" => "source",
"component_type" => "prometheus",
);
}
}

Expand All @@ -22,7 +58,7 @@ pub struct PrometheusParseError {

impl InternalEvent for PrometheusParseError {
fn emit_logs(&self) {
error!(message = "parsing error", error = ?self.error);
error!(message = "parsing error.", error = ?self.error);
}

fn emit_metrics(&self) {
Expand All @@ -40,7 +76,7 @@ pub struct PrometheusHttpError {

impl InternalEvent for PrometheusHttpError {
fn emit_logs(&self) {
error!(message = "http request processing error", error = %self.error);
error!(message = "http request processing error.", error = %self.error);
}

fn emit_metrics(&self) {
Expand Down
23 changes: 19 additions & 4 deletions src/sources/prometheus/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::{
hyper::body_to_bytes,
internal_events::{PrometheusHttpError, PrometheusParseError, PrometheusRequestCompleted},
internal_events::{
PrometheusEventReceived, PrometheusHttpError, PrometheusParseError,
PrometheusRequestCompleted,
},
shutdown::ShutdownSignal,
topology::config::GlobalOptions,
Event, Pipeline,
Expand All @@ -14,7 +17,7 @@ use hyper::{Body, Client, Request};
use hyper_openssl::HttpsConnector;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use std::time::Duration;
use std::time::{Duration, Instant};

pub mod parser;

Expand Down Expand Up @@ -73,22 +76,34 @@ fn prometheus(
.body(Body::empty())
.expect("error creating request");

let start = Instant::now();
client
.request(request)
.and_then(|response| body_to_bytes(response.into_body()))
.into_stream()
.filter_map(|response| {
.filter_map(move |response| {
future::ready(match response {
Ok(body) => {
emit!(PrometheusRequestCompleted);
emit!(PrometheusRequestCompleted {
start,
end: Instant::now()
});

let byte_size = body.len();
let packet = String::from_utf8_lossy(&body);
let metrics = parser::parse(&packet)
.map_err(|error| {
emit!(PrometheusParseError { error });
})
.unwrap_or_default();

if !metrics.is_empty() {
emit!(PrometheusEventReceived {
byte_size,
count: metrics.len()
});
}

Some(stream::iter(metrics).map(Event::Metric).map(Ok))
}
Err(error) => {
Expand Down

0 comments on commit 9fc065e

Please sign in to comment.