Skip to content

Commit

Permalink
enhancement(aws_s3 source): Send content_md5 when writing objects (#7936
Browse files Browse the repository at this point in the history
)

* enhancement(aws_s3 source): Send content_md5 when writing objects

This is required when the bucket has [object
locking](https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lock.html)
enabled, but also gives a bit more confidence that the object makes it
to S3 uncorrupted.

Unfortunately the test I added doesn't actually validate that Vector
works with object locked buckets due to [a gap in
localstack](localstack/localstack#4166) but
I did verify it manually be creating a bucket with object locking
enabled, observing the failure, making the implementation change, and
observing the success.

Closes: #7310

Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
  • Loading branch information
jszwedko committed Jun 21, 2021
1 parent 635ea82 commit 444a166
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 32 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Expand Up @@ -223,6 +223,7 @@ listenfd = { version = "0.3.3", default-features = false, optional = true }
logfmt = { version = "0.0.2", default-features = false, optional = true }
lru = { version = "0.6.5", default-features = false, optional = true }
maxminddb = { version = "0.18.0", default-features = false, optional = true }
md-5 = { version = "0.9", optional = true }
mongodb = { version = "2.0.0-beta.1", default-features = false, features = ["tokio-runtime"], optional = true }
async-nats = { version = "0.9.18", default-features = false, optional = true }
nom = { version = "6.1.2", default-features = false, optional = true }
Expand Down Expand Up @@ -581,7 +582,7 @@ sinks-aws_cloudwatch_logs = ["rusoto", "rusoto_logs"]
sinks-aws_cloudwatch_metrics = ["rusoto", "rusoto_cloudwatch"]
sinks-aws_kinesis_firehose = ["rusoto", "rusoto_firehose"]
sinks-aws_kinesis_streams = ["rusoto", "rusoto_kinesis"]
sinks-aws_s3 = ["bytesize", "rusoto", "rusoto_s3", "uuid"]
sinks-aws_s3 = ["base64", "bytesize", "md-5", "rusoto", "rusoto_s3", "uuid"]
sinks-aws_sqs = ["rusoto", "rusoto_sqs"]
sinks-azure_monitor_logs = ["bytesize"]
sinks-blackhole = []
Expand Down
132 changes: 101 additions & 31 deletions src/sinks/aws_s3.rs
Expand Up @@ -19,6 +19,7 @@ use chrono::Utc;
use futures::{future::BoxFuture, stream, FutureExt, SinkExt, StreamExt};
use http::StatusCode;
use lazy_static::lazy_static;
use md5::Digest;
use rusoto_core::RusotoError;
use rusoto_s3::{
HeadBucketRequest, PutObjectError, PutObjectOutput, PutObjectRequest, S3Client, S3,
Expand Down Expand Up @@ -289,6 +290,8 @@ impl Service<Request> for S3Sink {
.content_type
.or_else(|| Some("text/x-log".to_owned()));

let content_md5 = base64::encode(md5::Md5::digest(&request.body));

let mut tagging = url::form_urlencoded::Serializer::new(String::new());
if let Some(tags) = options.tags {
for (p, v) in tags {
Expand All @@ -313,6 +316,7 @@ impl Service<Request> for S3Sink {
ssekms_key_id: options.ssekms_key_id,
storage_class: options.storage_class.map(to_string),
tagging: Some(tagging),
content_md5: Some(content_md5),
..Default::default()
};

Expand Down Expand Up @@ -586,13 +590,15 @@ mod integration_tests {
use std::io::{BufRead, BufReader};
use vector_core::event::{BatchNotifier, BatchStatus, BatchStatusReceiver, LogEvent};

const BUCKET: &str = "router-tests";

#[tokio::test]
async fn s3_insert_message_into() {
let cx = SinkContext::new_test();

let config = config(1000000).await;
let bucket = uuid::Uuid::new_v4().to_string();

create_bucket(&bucket, false).await;

let config = config(&bucket, 1000000);
let prefix = config.key_prefix.clone();
let client = config.create_client().unwrap();
let sink = config.new(client, cx).unwrap();
Expand All @@ -601,13 +607,13 @@ mod integration_tests {
sink.run(events).await.unwrap();
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));

let keys = get_keys(prefix.unwrap()).await;
let keys = get_keys(&bucket, prefix.unwrap()).await;
assert_eq!(keys.len(), 1);

let key = keys[0].clone();
assert!(key.ends_with(".log"));

let obj = get_object(key).await;
let obj = get_object(&bucket, key).await;
assert_eq!(obj.content_encoding, Some("identity".to_string()));

let response_lines = get_lines(obj).await;
Expand All @@ -618,11 +624,15 @@ mod integration_tests {
async fn s3_rotate_files_after_the_buffer_size_is_reached() {
let cx = SinkContext::new_test();

let bucket = uuid::Uuid::new_v4().to_string();

create_bucket(&bucket, false).await;

let config = S3SinkConfig {
key_prefix: Some(format!("{}/{}", random_string(10), "{{i}}")),
filename_time_format: Some("waitsforfullbatch".into()),
filename_append_uuid: Some(false),
..config(1010).await
..config(&bucket, 1010)
};
let prefix = config.key_prefix.clone();
let client = config.create_client().unwrap();
Expand All @@ -645,12 +655,12 @@ mod integration_tests {

sink.run(stream::iter(events)).await.unwrap();

let keys = get_keys(prefix.unwrap()).await;
let keys = get_keys(&bucket, prefix.unwrap()).await;
assert_eq!(keys.len(), 3);

let response_lines = stream::iter(keys)
.fold(Vec::new(), |mut acc, key| async {
acc.push(get_lines(get_object(key).await).await);
acc.push(get_lines(get_object(&bucket, key).await).await);
acc
})
.await;
Expand All @@ -664,10 +674,14 @@ mod integration_tests {
async fn s3_gzip() {
let cx = SinkContext::new_test();

let bucket = uuid::Uuid::new_v4().to_string();

create_bucket(&bucket, false).await;

let config = S3SinkConfig {
compression: Compression::gzip_default(),
filename_time_format: Some("%s%f".into()),
..config(10000).await
..config(&bucket, 10000)
};

let prefix = config.key_prefix.clone();
Expand All @@ -678,13 +692,13 @@ mod integration_tests {
sink.run(events).await.unwrap();
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));

let keys = get_keys(prefix.unwrap()).await;
let keys = get_keys(&bucket, prefix.unwrap()).await;
assert_eq!(keys.len(), 6);

let response_lines = stream::iter(keys).fold(Vec::new(), |mut acc, key| async {
assert!(key.ends_with(".log.gz"));

let obj = get_object(key).await;
let obj = get_object(&bucket, key).await;
assert_eq!(obj.content_encoding, Some("gzip".to_string()));

acc.append(&mut get_gzipped_lines(obj).await);
Expand All @@ -694,11 +708,66 @@ mod integration_tests {
assert_eq!(lines, response_lines.await);
}

// NOTE: this test doesn't actually validate anything because localstack doesn't enforce the
// required Content-MD5 header on the request for buckets with object lock enabled
// https://github.com/localstack/localstack/issues/4166
#[tokio::test]
async fn s3_insert_message_into_object_lock() {
let cx = SinkContext::new_test();

let bucket = uuid::Uuid::new_v4().to_string();

create_bucket(&bucket, true).await;

client()
.put_object_lock_configuration(rusoto_s3::PutObjectLockConfigurationRequest {
bucket: bucket.to_string(),
object_lock_configuration: Some(rusoto_s3::ObjectLockConfiguration {
object_lock_enabled: Some(String::from("Enabled")),
rule: Some(rusoto_s3::ObjectLockRule {
default_retention: Some(rusoto_s3::DefaultRetention {
days: Some(1),
mode: Some(String::from("GOVERNANCE")),
years: None,
}),
}),
}),
..Default::default()
})
.await
.unwrap();

let config = config(&bucket, 1000000);
let prefix = config.key_prefix.clone();
let client = config.create_client().unwrap();
let sink = config.new(client, cx).unwrap();

let (lines, events, mut receiver) = make_events_batch(100, 10);
sink.run(events).await.unwrap();
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));

let keys = get_keys(&bucket, prefix.unwrap()).await;
assert_eq!(keys.len(), 1);

let key = keys[0].clone();
assert!(key.ends_with(".log"));

let obj = get_object(&bucket, key).await;
assert_eq!(obj.content_encoding, Some("identity".to_string()));

let response_lines = get_lines(obj).await;
assert_eq!(lines, response_lines);
}

#[tokio::test]
async fn acknowledges_failures() {
let cx = SinkContext::new_test();

let mut config = config(1).await;
let bucket = uuid::Uuid::new_v4().to_string();

create_bucket(&bucket, false).await;

let mut config = config(&bucket, 1);
// Break the bucket name
config.bucket = format!("BREAK{}IT", config.bucket);
let prefix = config.key_prefix.clone();
Expand All @@ -709,23 +778,25 @@ mod integration_tests {
sink.run(events).await.unwrap();
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Errored));

let objects = list_objects(prefix.unwrap()).await;
let objects = list_objects(&bucket, prefix.unwrap()).await;
assert_eq!(objects, None);
}

#[tokio::test]
async fn s3_healthchecks() {
let config = config(1).await;
let bucket = uuid::Uuid::new_v4().to_string();

create_bucket(&bucket, false).await;

let config = config(&bucket, 1);
let client = config.create_client().unwrap();
config.healthcheck(client).await.unwrap();
}

#[tokio::test]
async fn s3_healthchecks_invalid_bucket() {
let config = S3SinkConfig {
bucket: "asdflkjadskdaadsfadf".to_string(),
..config(1).await
};
let config = config("s3_healthchecks_invalid_bucket", 1);

let client = config.create_client().unwrap();
assert_downcast_matches!(
config.healthcheck(client).await.unwrap_err(),
Expand All @@ -749,11 +820,9 @@ mod integration_tests {
S3Client::new_with(d, p, region)
}

async fn config(batch_size: usize) -> S3SinkConfig {
ensure_bucket(&client()).await;

fn config(bucket: &str, batch_size: usize) -> S3SinkConfig {
S3SinkConfig {
bucket: BUCKET.to_string(),
bucket: bucket.to_string(),
key_prefix: Some(random_string(10) + "/date=%F/"),
filename_time_format: None,
filename_append_uuid: None,
Expand Down Expand Up @@ -785,15 +854,16 @@ mod integration_tests {
(lines, events, receiver)
}

async fn ensure_bucket(client: &S3Client) {
async fn create_bucket(bucket: &str, object_lock_enabled: bool) {
use rusoto_s3::{CreateBucketError, CreateBucketRequest};

let req = CreateBucketRequest {
bucket: BUCKET.to_string(),
bucket: bucket.to_string(),
object_lock_enabled_for_bucket: Some(object_lock_enabled),
..Default::default()
};

match client.create_bucket(req).await {
match client().create_bucket(req).await {
Ok(_) | Err(RusotoError::Service(CreateBucketError::BucketAlreadyOwnedByYou(_))) => {}
Err(e) => match e {
RusotoError::Unknown(resp) => {
Expand All @@ -805,12 +875,12 @@ mod integration_tests {
}
}

async fn list_objects(prefix: String) -> Option<Vec<rusoto_s3::Object>> {
async fn list_objects(bucket: &str, prefix: String) -> Option<Vec<rusoto_s3::Object>> {
let prefix = prefix.split('/').next().unwrap().to_string();

client()
.list_objects_v2(rusoto_s3::ListObjectsV2Request {
bucket: BUCKET.to_string(),
bucket: bucket.to_string(),
prefix: Some(prefix),
..Default::default()
})
Expand All @@ -819,19 +889,19 @@ mod integration_tests {
.contents
}

async fn get_keys(prefix: String) -> Vec<String> {
list_objects(prefix)
async fn get_keys(bucket: &str, prefix: String) -> Vec<String> {
list_objects(bucket, prefix)
.await
.unwrap()
.into_iter()
.map(|obj| obj.key.unwrap())
.collect()
}

async fn get_object(key: String) -> rusoto_s3::GetObjectOutput {
async fn get_object(bucket: &str, key: String) -> rusoto_s3::GetObjectOutput {
client()
.get_object(rusoto_s3::GetObjectRequest {
bucket: BUCKET.to_string(),
bucket: bucket.to_string(),
key,
..Default::default()
})
Expand Down

0 comments on commit 444a166

Please sign in to comment.