Skip to content

Commit

Permalink
fix(aws_cloudwatch_logs sink): Handle and enforce batch.max_bytes (ve…
Browse files Browse the repository at this point in the history
…ctordotdev#2916)

* Make the check for batch.max_bytes being set explicit

Signed-off-by: Bruce Guenter <bruce@timber.io>

* Implement enhanced VecBuffer2 with length tracking

Signed-off-by: Bruce Guenter <bruce@timber.io>

* Move cloudwatch log encoding into the partition function

This will allow us to determine the length of logs that are added to the
buffer.

Signed-off-by: Bruce Guenter <bruce@timber.io>

* Use new length buffer in aws_cloudwatch_logs to enforce byte length

Also fix default batch event limit

Signed-off-by: Bruce Guenter <bruce@timber.io>
  • Loading branch information
bruceg committed Jun 30, 2020
1 parent abd8d80 commit cd74040
Show file tree
Hide file tree
Showing 13 changed files with 270 additions and 78 deletions.
2 changes: 1 addition & 1 deletion .meta/sinks/aws_cloudwatch_logs.toml.erb
Expand Up @@ -28,7 +28,7 @@ write_to_description = "[Amazon Web Service's CloudWatch Logs service][urls.aws_
<%= render("_partials/fields/_component_options.toml", type: "sink", name: "aws_cloudwatch_logs") %>
<%= render("_partials/fields/_batch_options.toml", namespace: "sinks.aws_cloudwatch_logs.options", common: false, max_bytes: nil, max_events: 1000, timeout_secs: 1) %>
<%= render("_partials/fields/_batch_options.toml", namespace: "sinks.aws_cloudwatch_logs.options", common: false, max_bytes: 1048576, max_events: 10000, timeout_secs: 1) %>
<%= render(
"_partials/fields/_buffer_options.toml",
Expand Down
12 changes: 10 additions & 2 deletions config/vector.spec.toml
Expand Up @@ -2939,13 +2939,21 @@ require('custom_module')
#

[sinks.aws_cloudwatch_logs.batch]
# The maximum size of a batch, in bytes, before it is flushed.
#
# * optional
# * default: 1048576
# * type: uint
# * unit: bytes
max_bytes = 1048576

# The maximum size of a batch, in events, before it is flushed.
#
# * optional
# * default: 1000
# * default: 10000
# * type: uint
# * unit: events
max_events = 1000
max_events = 10000

# The maximum age of a batch before it is flushed.
#
Expand Down
151 changes: 85 additions & 66 deletions src/sinks/aws_cloudwatch_logs/mod.rs
Expand Up @@ -7,8 +7,8 @@ use crate::{
sinks::util::{
encoding::{EncodingConfig, EncodingConfiguration},
retries::{FixedRetryPolicy, RetryLogic},
rusoto, BatchConfig, BatchSettings, PartitionBatchSink, PartitionBuffer,
PartitionInnerBuffer, TowerRequestConfig, TowerRequestSettings, VecBuffer,
rusoto, BatchConfig, BatchSettings, Length, PartitionBatchSink, PartitionBuffer,
PartitionInnerBuffer, TowerRequestConfig, TowerRequestSettings, VecBuffer2,
},
template::Template,
topology::config::{DataType, SinkConfig, SinkContext},
Expand Down Expand Up @@ -96,7 +96,6 @@ lazy_static! {

pub struct CloudwatchLogsSvc {
client: CloudWatchLogsClient,
encoding: EncodingConfig<Encoding>,
stream_name: String,
group_name: String,
create_missing_group: bool,
Expand All @@ -110,11 +109,11 @@ type Svc = Buffer<
RateLimit<
Retry<
FixedRetryPolicy<CloudwatchRetryLogic>,
Buffer<Timeout<CloudwatchLogsSvc>, Vec<Event>>,
Buffer<Timeout<CloudwatchLogsSvc>, Vec<InputLogEvent>>,
>,
>,
>,
Vec<Event>,
Vec<InputLogEvent>,
>;

pub struct CloudwatchLogsPartitionSvc {
Expand Down Expand Up @@ -145,10 +144,12 @@ pub enum CloudwatchError {
#[typetag::serde(name = "aws_cloudwatch_logs")]
impl SinkConfig for CloudwatchLogsSinkConfig {
fn build(&self, cx: SinkContext) -> crate::Result<(super::RouterSink, super::Healthcheck)> {
let batch = self
.batch
.use_size_as_events()?
.get_settings_or_default(BatchSettings::default().events(1000).timeout(1));
let batch = self.batch.use_size_as_events()?.get_settings_or_default(
BatchSettings::default()
.bytes(1_048_576)
.events(10_000)
.timeout(1),
);
let request = self.request.unwrap_with(&REQUEST_DEFAULTS);

let log_group = self.group_name.clone();
Expand All @@ -161,11 +162,14 @@ impl SinkConfig for CloudwatchLogsSinkConfig {
cx.resolver(),
)?);

let encoding = self.encoding.clone();
let sink = {
let buffer = PartitionBuffer::new(VecBuffer::new(batch.size));
let buffer = PartitionBuffer::new(VecBuffer2::new(batch.size));
let svc_sink = PartitionBatchSink::new(svc, buffer, batch.timeout, cx.acker())
.sink_map_err(|e| error!("Fatal cloudwatchlogs sink error: {}", e))
.with_flat_map(move |event| iter_ok(partition(event, &log_group, &log_stream)));
.with_flat_map(move |event| {
iter_ok(partition_encode(event, &encoding, &log_group, &log_stream))
});
Box::new(svc_sink)
};

Expand Down Expand Up @@ -196,7 +200,9 @@ impl CloudwatchLogsPartitionSvc {
}
}

impl Service<PartitionInnerBuffer<Vec<Event>, CloudwatchKey>> for CloudwatchLogsPartitionSvc {
impl Service<PartitionInnerBuffer<Vec<InputLogEvent>, CloudwatchKey>>
for CloudwatchLogsPartitionSvc
{
type Response = ();
type Error = crate::Error;
type Future = Box<dyn Future<Item = Self::Response, Error = Self::Error> + Send + 'static>;
Expand All @@ -205,7 +211,10 @@ impl Service<PartitionInnerBuffer<Vec<Event>, CloudwatchKey>> for CloudwatchLogs
Ok(().into())
}

fn call(&mut self, req: PartitionInnerBuffer<Vec<Event>, CloudwatchKey>) -> Self::Future {
fn call(
&mut self,
req: PartitionInnerBuffer<Vec<InputLogEvent>, CloudwatchKey>,
) -> Self::Future {
let (events, key) = req.into_parts();

let svc = if let Some(svc) = &mut self.clients.get_mut(&key) {
Expand Down Expand Up @@ -262,7 +271,6 @@ impl CloudwatchLogsSvc {

Ok(CloudwatchLogsSvc {
client,
encoding: config.encoding.clone(),
stream_name,
group_name,
create_missing_group,
Expand All @@ -272,47 +280,14 @@ impl CloudwatchLogsSvc {
})
}

pub(self) fn encode_log(
&self,
mut log: LogEvent,
) -> Result<InputLogEvent, CloudwatchLogsError> {
let timestamp = match log.remove(&event::log_schema().timestamp_key()) {
Some(Value::Timestamp(ts)) => ts.timestamp_millis(),
_ => Utc::now().timestamp_millis(),
};

let message = match self.encoding.codec() {
Encoding::Json => serde_json::to_string(&log).unwrap(),
Encoding::Text => log
.get(&event::log_schema().message_key())
.map(|v| v.to_string_lossy())
.unwrap_or_else(|| "".into()),
};

match message.len() {
length if length <= MAX_MESSAGE_SIZE => Ok(InputLogEvent { message, timestamp }),
length => Err(CloudwatchLogsError::EventTooLong { length }),
}
}

pub fn process_events(&self, events: Vec<Event>) -> Vec<Vec<InputLogEvent>> {
pub fn process_events(&self, events: Vec<InputLogEvent>) -> Vec<Vec<InputLogEvent>> {
let now = Utc::now();
// Acceptable range of Event timestamps.
let age_range = (now - Duration::days(14)).timestamp_millis()
..(now + Duration::hours(2)).timestamp_millis();

let mut events = events
.into_iter()
.map(|mut e| {
self.encoding.apply_rules(&mut e);
e
})
.map(|e| e.into_log())
.filter_map(|e| {
self.encode_log(e)
.map_err(|error| error!(message = "Could not encode event", %error, rate_limit_secs = 5))
.ok()
})
.filter(|e| age_range.contains(&e.timestamp))
.collect::<Vec<_>>();

Expand Down Expand Up @@ -360,7 +335,7 @@ impl CloudwatchLogsSvc {
}
}

impl Service<Vec<Event>> for CloudwatchLogsSvc {
impl Service<Vec<InputLogEvent>> for CloudwatchLogsSvc {
type Response = ();
type Error = CloudwatchError;
type Future = request::CloudwatchFuture;
Expand All @@ -387,7 +362,7 @@ impl Service<Vec<Event>> for CloudwatchLogsSvc {
}
}

fn call(&mut self, req: Vec<Event>) -> Self::Future {
fn call(&mut self, req: Vec<InputLogEvent>) -> Self::Future {
if self.token_rx.is_none() {
let event_batches = self.process_events(req);

Expand All @@ -410,17 +385,47 @@ impl Service<Vec<Event>> for CloudwatchLogsSvc {
}
}

impl Length for InputLogEvent {
fn len(&self) -> usize {
self.message.len() + 26
}
}

#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub struct CloudwatchKey {
group: Bytes,
stream: Bytes,
}

fn partition(
event: Event,
fn encode_log(
mut log: LogEvent,
encoding: &EncodingConfig<Encoding>,
) -> Result<InputLogEvent, CloudwatchLogsError> {
let timestamp = match log.remove(&event::log_schema().timestamp_key()) {
Some(Value::Timestamp(ts)) => ts.timestamp_millis(),
_ => Utc::now().timestamp_millis(),
};

let message = match encoding.codec() {
Encoding::Json => serde_json::to_string(&log).unwrap(),
Encoding::Text => log
.get(&event::log_schema().message_key())
.map(|v| v.to_string_lossy())
.unwrap_or_else(|| "".into()),
};

match message.len() {
length if length <= MAX_MESSAGE_SIZE => Ok(InputLogEvent { message, timestamp }),
length => Err(CloudwatchLogsError::EventTooLong { length }),
}
}

fn partition_encode(
mut event: Event,
encoding: &EncodingConfig<Encoding>,
group: &Template,
stream: &Template,
) -> Option<PartitionInnerBuffer<Event, CloudwatchKey>> {
) -> Option<PartitionInnerBuffer<InputLogEvent, CloudwatchKey>> {
let group = match group.render(&event) {
Ok(b) => b,
Err(missing_keys) => {
Expand All @@ -447,6 +452,11 @@ fn partition(

let key = CloudwatchKey { stream, group };

encoding.apply_rules(&mut event);
let event = encode_log(event.into_log(), encoding)
.map_err(|error| error!(message = "Could not encode event", %error, rate_limit_secs = 5))
.ok()?;

Some(PartitionInnerBuffer::new(event, key))
}

Expand Down Expand Up @@ -666,8 +676,11 @@ mod tests {
let event = Event::from("hello world");
let stream = Template::try_from("stream").unwrap();
let group = "group".try_into().unwrap();
let encoding = Encoding::Text.into();

let (_event, key) = partition(event, &group, &stream).unwrap().into_parts();
let (_event, key) = partition_encode(event, &encoding, &group, &stream)
.unwrap()
.into_parts();

let expected = CloudwatchKey {
stream: "stream".into(),
Expand All @@ -685,8 +698,11 @@ mod tests {

let stream = Template::try_from("{{log_stream}}").unwrap();
let group = "group".try_into().unwrap();
let encoding = Encoding::Text.into();

let (_event, key) = partition(event, &group, &stream).unwrap().into_parts();
let (_event, key) = partition_encode(event, &encoding, &group, &stream)
.unwrap()
.into_parts();

let expected = CloudwatchKey {
stream: "stream".into(),
Expand All @@ -704,8 +720,11 @@ mod tests {

let stream = Template::try_from("abcd-{{log_stream}}").unwrap();
let group = "group".try_into().unwrap();
let encoding = Encoding::Text.into();

let (_event, key) = partition(event, &group, &stream).unwrap().into_parts();
let (_event, key) = partition_encode(event, &encoding, &group, &stream)
.unwrap()
.into_parts();

let expected = CloudwatchKey {
stream: "abcd-stream".into(),
Expand All @@ -723,8 +742,11 @@ mod tests {

let stream = Template::try_from("{{log_stream}}-abcd").unwrap();
let group = "group".try_into().unwrap();
let encoding = Encoding::Text.into();

let (_event, key) = partition(event, &group, &stream).unwrap().into_parts();
let (_event, key) = partition_encode(event, &encoding, &group, &stream)
.unwrap()
.into_parts();

let expected = CloudwatchKey {
stream: "stream-abcd".into(),
Expand All @@ -740,8 +762,9 @@ mod tests {

let stream = Template::try_from("{{log_stream}}").unwrap();
let group = "group".try_into().unwrap();
let encoding = Encoding::Text.into();

let stream_val = partition(event, &group, &stream);
let stream_val = partition_encode(event, &encoding, &group, &stream);

assert!(stream_val.is_none());
}
Expand All @@ -763,9 +786,7 @@ mod tests {
fn cloudwatch_encoded_event_retains_timestamp() {
let mut event = Event::from("hello world").into_log();
event.insert("key", "value");
let encoded = svc(default_config(Encoding::Json))
.encode_log(event.clone())
.unwrap();
let encoded = encode_log(event.clone(), &Encoding::Json.into()).unwrap();

let ts = if let Value::Timestamp(ts) = event[&event::log_schema().timestamp_key()] {
ts.timestamp_millis()
Expand All @@ -778,20 +799,18 @@ mod tests {

#[test]
fn cloudwatch_encode_log_as_json() {
let config = default_config(Encoding::Json);
let mut event = Event::from("hello world").into_log();
event.insert("key", "value");
let encoded = svc(config).encode_log(event.clone()).unwrap();
let encoded = encode_log(event.clone(), &Encoding::Json.into()).unwrap();
let map: HashMap<Atom, String> = serde_json::from_str(&encoded.message[..]).unwrap();
assert!(map.get(&event::log_schema().timestamp_key()).is_none());
}

#[test]
fn cloudwatch_encode_log_as_text() {
let config = default_config(Encoding::Text);
let mut event = Event::from("hello world").into_log();
event.insert("key", "value");
let encoded = svc(config).encode_log(event.clone()).unwrap();
let encoded = encode_log(event.clone(), &Encoding::Text.into()).unwrap();
assert_eq!(encoded.message, "hello world");
}

Expand All @@ -806,7 +825,7 @@ mod tests {
event
.as_mut_log()
.insert(&event::log_schema().timestamp_key(), timestamp);
event
encode_log(event.into_log(), &Encoding::Text.into()).unwrap()
})
.collect();

Expand Down
1 change: 1 addition & 0 deletions src/sinks/aws_cloudwatch_metrics.rs
Expand Up @@ -85,6 +85,7 @@ impl CloudWatchMetricsSvc {

let batch = config
.batch
.disallow_max_bytes()?
.use_size_as_events()?
.get_settings_or_default(BatchSettings::default().events(20).timeout(1));
let request = config.request.unwrap_with(&REQUEST_DEFAULTS);
Expand Down
1 change: 1 addition & 0 deletions src/sinks/aws_kinesis_firehose.rs
Expand Up @@ -99,6 +99,7 @@ impl KinesisFirehoseService {

let batch = config
.batch
.disallow_max_bytes()?
.use_size_as_events()?
.get_settings_or_default(BatchSettings::default().events(500).timeout(1));
let request = config.request.unwrap_with(&REQUEST_DEFAULTS);
Expand Down
1 change: 1 addition & 0 deletions src/sinks/aws_kinesis_streams.rs
Expand Up @@ -104,6 +104,7 @@ impl KinesisService {

let batch = config
.batch
.disallow_max_bytes()?
.use_size_as_events()?
.get_settings_or_default(BatchSettings::default().events(500).timeout(1));
let request = config.request.unwrap_with(&REQUEST_DEFAULTS);
Expand Down
1 change: 1 addition & 0 deletions src/sinks/datadog/metrics.rs
Expand Up @@ -111,6 +111,7 @@ impl SinkConfig for DatadogConfig {

let batch = self
.batch
.disallow_max_bytes()?
.use_size_as_events()?
.get_settings_or_default(BatchSettings::default().events(20).timeout(1));
let request = self.request.unwrap_with(&REQUEST_DEFAULTS);
Expand Down

0 comments on commit cd74040

Please sign in to comment.