From 5e8170fe069b8d1df287fab51d9b80a3f4b73700 Mon Sep 17 00:00:00 2001 From: Duy Do Date: Thu, 30 Jul 2020 15:06:18 +0700 Subject: [PATCH 1/4] Add internal events for HTTP source Signed-off-by: Duy Do --- src/internal_events/http.rs | 54 +++++++++++++++++++++++++++++++++++++ src/internal_events/mod.rs | 2 ++ src/sources/util/http.rs | 19 ++++++++++--- 3 files changed, 72 insertions(+), 3 deletions(-) create mode 100644 src/internal_events/http.rs diff --git a/src/internal_events/http.rs b/src/internal_events/http.rs new file mode 100644 index 0000000000000..bbd03d8465445 --- /dev/null +++ b/src/internal_events/http.rs @@ -0,0 +1,54 @@ +use super::InternalEvent; +use metrics::counter; + +#[derive(Debug)] +pub struct HTTPEventsReceived { + pub events_count: usize, + pub byte_size: usize, +} + +impl InternalEvent for HTTPEventsReceived { + fn emit_logs(&self) { + debug!( + message = "sending events.", + events_count = %self.events_count, + byte_size = %self.byte_size, + ); + } + + fn emit_metrics(&self) { + counter!("events_processed", self.events_count as u64, + "component_kind" => "source", + "component_type" => "http", + ); + counter!("bytes_processed", self.byte_size as u64, + "component_kind" => "source", + "component_type" => "http", + ); + } +} + +#[derive(Debug)] +pub struct HTTPBadRequestReceived<'a> { + pub error_code: u16, + pub error_message: &'a str, +} + +impl<'a> InternalEvent for HTTPBadRequestReceived<'a> { + fn emit_logs(&self) { + warn!( + message = "received bad request.", + code = %self.error_code, + message = %self.error_message, + rate_limit_secs = 10, + ); + } + + fn emit_metrics(&self) { + counter!( + "http_bad_requests", 1, + "component_kind" => "source", + "component_type" => "http", + ); + } +} diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index f4804b0b26763..63e1ce97f6bf3 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -3,6 +3,7 @@ mod aws_kinesis_streams; mod blackhole; mod elasticsearch; mod file; +mod http; #[cfg(all(feature = "sources-journald", feature = "unix"))] mod journald; mod json; @@ -30,6 +31,7 @@ pub use self::aws_kinesis_streams::*; pub use self::blackhole::*; pub use self::elasticsearch::*; pub use self::file::*; +pub use self::http::*; #[cfg(all(feature = "sources-journald", feature = "unix"))] pub(crate) use self::journald::*; pub use self::json::*; diff --git a/src/sources/util/http.rs b/src/sources/util/http.rs index 3d013e7155b80..29668d6242714 100644 --- a/src/sources/util/http.rs +++ b/src/sources/util/http.rs @@ -1,8 +1,10 @@ -use crate::event::Event; use crate::{ + event::Event, + internal_events::{HTTPBadRequestReceived, HTTPEventsReceived}, shutdown::ShutdownSignal, tls::{MaybeTlsSettings, TlsConfig}, }; +use bytes05::Bytes; use futures::{ compat::{AsyncRead01CompatExt, Future01CompatExt, Stream01CompatExt}, FutureExt, TryFutureExt, TryStreamExt, @@ -74,15 +76,20 @@ pub trait HttpSource: Clone + Send + Sync + 'static { .and(warp::path::end()) .and(warp::header::headers_cloned()) .and(warp::body::bytes()) - .and_then(move |headers: HeaderMap, body| { + .and_then(move |headers: HeaderMap, body: Bytes| { info!("Handling http request: {:?}", headers); let this = self.clone(); let out = out.clone(); async move { + let body_size = body.len(); match this.build_event(body, headers) { Ok(events) => { + emit!(HTTPEventsReceived { + events_count: events.len(), + byte_size: body_size, + }); out.send_all(futures01::stream::iter_ok(events)) .compat() .map_err(move |e: mpsc::SendError| { @@ -95,7 +102,13 @@ pub trait HttpSource: Clone + Send + Sync + 'static { .map_ok(|_| warp::reply()) .await } - Err(err) => Err(warp::reject::custom(err)), + Err(err) => { + emit!(HTTPBadRequestReceived { + error_code: err.code, + error_message: err.message.as_str(), + }); + Err(warp::reject::custom(err)) + } } } }); From 53c6a534ebec2a2f7509d422a176d63ceaf8d429 Mon Sep 17 00:00:00 2001 From: Duy Do Date: Thu, 30 Jul 2020 15:19:09 +0700 Subject: [PATCH 2/4] Rename event Signed-off-by: Duy Do --- src/internal_events/http.rs | 4 ++-- src/sources/util/http.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/internal_events/http.rs b/src/internal_events/http.rs index bbd03d8465445..b03f591f01169 100644 --- a/src/internal_events/http.rs +++ b/src/internal_events/http.rs @@ -29,12 +29,12 @@ impl InternalEvent for HTTPEventsReceived { } #[derive(Debug)] -pub struct HTTPBadRequestReceived<'a> { +pub struct HTTPBadRequestError<'a> { pub error_code: u16, pub error_message: &'a str, } -impl<'a> InternalEvent for HTTPBadRequestReceived<'a> { +impl<'a> InternalEvent for HTTPBadRequestError<'a> { fn emit_logs(&self) { warn!( message = "received bad request.", diff --git a/src/sources/util/http.rs b/src/sources/util/http.rs index 29668d6242714..3c7edfeeb5692 100644 --- a/src/sources/util/http.rs +++ b/src/sources/util/http.rs @@ -1,6 +1,6 @@ use crate::{ event::Event, - internal_events::{HTTPBadRequestReceived, HTTPEventsReceived}, + internal_events::{HTTPBadRequestError, HTTPEventsReceived}, shutdown::ShutdownSignal, tls::{MaybeTlsSettings, TlsConfig}, }; @@ -103,7 +103,7 @@ pub trait HttpSource: Clone + Send + Sync + 'static { .await } Err(err) => { - emit!(HTTPBadRequestReceived { + emit!(HTTPBadRequestError { error_code: err.code, error_message: err.message.as_str(), }); From 7e67341c421cb9954f23808213d20a99c7a2ec8a Mon Sep 17 00:00:00 2001 From: Duy Do Date: Thu, 30 Jul 2020 22:17:56 +0700 Subject: [PATCH 3/4] Remove Error in name Signed-off-by: Duy Do --- src/internal_events/http.rs | 4 ++-- src/sources/util/http.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/internal_events/http.rs b/src/internal_events/http.rs index b03f591f01169..6c5491c62f837 100644 --- a/src/internal_events/http.rs +++ b/src/internal_events/http.rs @@ -29,12 +29,12 @@ impl InternalEvent for HTTPEventsReceived { } #[derive(Debug)] -pub struct HTTPBadRequestError<'a> { +pub struct HTTPBadRequest<'a> { pub error_code: u16, pub error_message: &'a str, } -impl<'a> InternalEvent for HTTPBadRequestError<'a> { +impl<'a> InternalEvent for HTTPBadRequest<'a> { fn emit_logs(&self) { warn!( message = "received bad request.", diff --git a/src/sources/util/http.rs b/src/sources/util/http.rs index 3c7edfeeb5692..f0730dbef15c6 100644 --- a/src/sources/util/http.rs +++ b/src/sources/util/http.rs @@ -1,6 +1,6 @@ use crate::{ event::Event, - internal_events::{HTTPBadRequestError, HTTPEventsReceived}, + internal_events::{HTTPBadRequest, HTTPEventsReceived}, shutdown::ShutdownSignal, tls::{MaybeTlsSettings, TlsConfig}, }; @@ -103,7 +103,7 @@ pub trait HttpSource: Clone + Send + Sync + 'static { .await } Err(err) => { - emit!(HTTPBadRequestError { + emit!(HTTPBadRequest { error_code: err.code, error_message: err.message.as_str(), }); From 675c2e4a20a92045bfbb7687d183420f96f03652 Mon Sep 17 00:00:00 2001 From: Duy Do Date: Fri, 31 Jul 2020 23:00:24 +0700 Subject: [PATCH 4/4] debug! -> trace! Signed-off-by: Duy Do --- src/internal_events/http.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/internal_events/http.rs b/src/internal_events/http.rs index 6c5491c62f837..98991ae4eb89b 100644 --- a/src/internal_events/http.rs +++ b/src/internal_events/http.rs @@ -9,7 +9,7 @@ pub struct HTTPEventsReceived { impl InternalEvent for HTTPEventsReceived { fn emit_logs(&self) { - debug!( + trace!( message = "sending events.", events_count = %self.events_count, byte_size = %self.byte_size,