Skip to content

Commit

Permalink
enhancement(observability): Add internal events for http source (#3264
Browse files Browse the repository at this point in the history
)

* Add internal events for HTTP source

Added HTTPEventsReceived and HTTPBadRequest.

Signed-off-by: Duy Do <juchiast@gmail.com>
  • Loading branch information
juchiast committed Aug 4, 2020
1 parent cde9547 commit 059e160
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 3 deletions.
54 changes: 54 additions & 0 deletions src/internal_events/http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use super::InternalEvent;
use metrics::counter;

#[derive(Debug)]
pub struct HTTPEventsReceived {
pub events_count: usize,
pub byte_size: usize,
}

impl InternalEvent for HTTPEventsReceived {
fn emit_logs(&self) {
trace!(
message = "sending events.",
events_count = %self.events_count,
byte_size = %self.byte_size,
);
}

fn emit_metrics(&self) {
counter!("events_processed", self.events_count as u64,
"component_kind" => "source",
"component_type" => "http",
);
counter!("bytes_processed", self.byte_size as u64,
"component_kind" => "source",
"component_type" => "http",
);
}
}

#[derive(Debug)]
pub struct HTTPBadRequest<'a> {
pub error_code: u16,
pub error_message: &'a str,
}

impl<'a> InternalEvent for HTTPBadRequest<'a> {
fn emit_logs(&self) {
warn!(
message = "received bad request.",
code = %self.error_code,
message = %self.error_message,
rate_limit_secs = 10,
);
}

fn emit_metrics(&self) {
counter!(
"http_bad_requests", 1,
"component_kind" => "source",
"component_type" => "http",
);
}
}
2 changes: 2 additions & 0 deletions src/internal_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod aws_kinesis_streams;
mod blackhole;
mod elasticsearch;
mod file;
mod http;
#[cfg(all(feature = "sources-journald", feature = "unix"))]
mod journald;
mod json;
Expand Down Expand Up @@ -32,6 +33,7 @@ pub use self::aws_kinesis_streams::*;
pub use self::blackhole::*;
pub use self::elasticsearch::*;
pub use self::file::*;
pub use self::http::*;
#[cfg(all(feature = "sources-journald", feature = "unix"))]
pub(crate) use self::journald::*;
pub use self::json::*;
Expand Down
19 changes: 16 additions & 3 deletions src/sources/util/http.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use crate::event::Event;
use crate::{
event::Event,
internal_events::{HTTPBadRequest, HTTPEventsReceived},
shutdown::ShutdownSignal,
tls::{MaybeTlsSettings, TlsConfig},
Pipeline,
};
use async_trait::async_trait;
use bytes05::Bytes;
use futures::{compat::Future01CompatExt, FutureExt, TryFutureExt};
use futures01::Sink;
use serde::Serialize;
Expand Down Expand Up @@ -73,15 +75,20 @@ pub trait HttpSource: Clone + Send + Sync + 'static {
.and(warp::path::end())
.and(warp::header::headers_cloned())
.and(warp::body::bytes())
.and_then(move |headers: HeaderMap, body| {
.and_then(move |headers: HeaderMap, body: Bytes| {
info!("Handling http request: {:?}", headers);

let this = self.clone();
let out = out.clone();

async move {
let body_size = body.len();
match this.build_event(body, headers) {
Ok(events) => {
emit!(HTTPEventsReceived {
events_count: events.len(),
byte_size: body_size,
});
out.send_all(futures01::stream::iter_ok(events))
.compat()
.map_err(move |e: futures01::sync::mpsc::SendError<Event>| {
Expand All @@ -94,7 +101,13 @@ pub trait HttpSource: Clone + Send + Sync + 'static {
.map_ok(|_| warp::reply())
.await
}
Err(err) => Err(warp::reject::custom(err)),
Err(err) => {
emit!(HTTPBadRequest {
error_code: err.code,
error_message: err.message.as_str(),
});
Err(warp::reject::custom(err))
}
}
}
});
Expand Down

0 comments on commit 059e160

Please sign in to comment.