From 53bb072d86636738fa31d61c4d196a7feab923cb Mon Sep 17 00:00:00 2001 From: Jesse Szwedko Date: Tue, 29 Sep 2020 15:18:19 -0400 Subject: [PATCH 1/5] chore(rfcs): Add AWS S3 source RFC Signed-off-by: Jesse Szwedko --- rfcs/2020-09-29-4155-aws-s3-source.md | 186 ++++++++++++++++++++++++++ 1 file changed, 186 insertions(+) create mode 100644 rfcs/2020-09-29-4155-aws-s3-source.md diff --git a/rfcs/2020-09-29-4155-aws-s3-source.md b/rfcs/2020-09-29-4155-aws-s3-source.md new file mode 100644 index 0000000000000..758ca668450b7 --- /dev/null +++ b/rfcs/2020-09-29-4155-aws-s3-source.md @@ -0,0 +1,186 @@ +## RFC #4155 - 2020-09-29 - AWS S3 source + +This RFC describes a new `aws_s3` source for ingesting events stored as objects +in S3 via SQS bucket notifications. This will permit, for example, ingesting +CloudTrail events, VPC flow logs, and ELB logs which can be written to S3, as +well as custom logging pipelines writing there. + +## Scope + +This RFC will cover the implementation of an `aws_s3` source that will rely on +SQS for bucket notifications. + +It will not cover the [proposed polling +strategy](https://github.com/timberio/vector/issues/1017#issuecomment-694287111), +but will leave space for it to be added later in a backwards compatible fashion. + +It will not cover parsing of events within the objects. Vector's current +approach for this is to delegate to transforms. For example, it is likely we'll +want to add a transform to extract CloudTrail events stored as objects in S3. + +## Motivation + +There are some types of logs that AWS can write to S3. For example: + +* CloudTrail logs (can also go to CloudWatch Logs but with a size limitation) +* AWS load balancer access logs (only S3) +* VPC flow logs (can also go to CloudWatch logs) + +Even if some AWS service logs _can_ go to CloudWatch Logs, I think it will be +common that operators will want to just send them to S3 as it is cheaper storage +if they don't need to regularly access them. + +## Internal Proposal + +A new `aws_s3` source will be added that will poll SQS for bucket notifications +and ingest new objects from S3. It will wait to delete the SQS message until the +object has been fully processed to guarantee "at least" once delivery to the +pipeline (though it can still be lost after that until we tackle [end-to-end +acks](https://github.com/timberio/vector/issues/3922). + +This approach will allow ingestion to be easily horizontally scaled by spinning +up extra `vector` instances, relying on SQS to handle the fan-out. + +## Doc-level Proposal + +We will create a user guide based on +[AWS's](https://docs.aws.amazon.com/AmazonS3/latest/user-guide/enable-event-notifications.html#s3-event-notification-destinations) +for configuring bucket notifications to publish object created events to SQS. + +The new source configuration will look like: + +```toml +[sources.my_source_id] + # General + type = "aws_s3" # required + bucket = "my-bucket" # required + region = "us-east-1" # required, required when endpoint = "" + compression = "auto" # optional; compression format of the objects; one of: ["auto", "none", "gzip" "lz4" "snappy" "zstd"]; default + + strategy = "sqs" # optional, default, one of ["sqs"] + + sqs.queue_url = "https://sqs.us-east-1.amazonaws.com/1234/test-s3-queue" + sqs.poll_secs = 30 # minimum poll interval, optional, default + sqs.visibility_timeout_secs = 300 # default visibility timeout for SQS message; if vector does not process the message in this time, the SQS message will be available to be reprocessed + sqs.delete_message = true # whether to delete the message after processing; false is; useful for debugging; default +``` + +[A +comment](https://github.com/timberio/vector/issues/1017#issuecomment-699125610) +mentions desiring the ability to filter objects by prefix, but I think this is +better configured when setting up bucket notifications to avoid publishing +notifications for these objects at all. + +### Log metadata + +The `LastModified` value for the object will be used as the `timestamp`. + +The object key, bucket, and region will be associated with the log as fields: +`key`, `bucket`, and `region`. + +All [custom S3 object metadata +key/values](https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html#object-metadata) +will be set as fields on the log event. + +### Object compression + +It is common to compress objects stored in S3 as part of the upload process. To +facilitate this, we will allow the user to indicate the compression format in +the config, as show above, but we will also support an "auto" value that will +instruct Vector to automatically try to decompress the objects via: + +* If `Content-Type` of the object is set to a known compression mime-type, use + this, otherwise: +* If the file extension matches a known compression file extension, use this, + otherwise: +* Do no decoding and treat it as if the encoding was "none" + +## Prior Art + +* [fluentd](https://github.com/fluent/fluent-plugin-s3) +* [filebeat](https://www.elastic.co/guide/en/beats/filebeat/master/filebeat-input-s3.html) +* [logstash](https://github.com/logstash-plugins/logstash-input-s3) + +## Drawbacks + +Maintenance burden of an additional source. + +## Alternatives + +There are a few alternate approaches ([some also noted by +James](https://github.com/timberio/vector/issues/1017#issuecomment-699125610)). + +### SNS notifications + +AWS S3 can be configured to also send push-based notifications via SNS. This +would require running Vector in a way where AWS can send requests to it. + +This may be a worthwhile approach in the future as it would result in faster +object ingestion than polling SQS, but I think SQS's pull-based model is likely +to be much more easy to use out-of-the-box as it does not require exposing +a `vector` instance to incoming network requests and SQS consumers are more +easily parallelizable (SNS would require a load balancer). + +I'd recommend tabling this one for now. + +### Polling + +As mentioned in +[#1017](https://github.com/timberio/vector/issues/1017#issuecomment-694287111), +another approach is to long-poll the bucket; listing all of the objects, and +processing any new ones. + +This approach is liable to take a long time to process a bucket (hours or days +with large buckets) and so is not the most generally applicable approach. It can +work for small buckets or if a tightly scoped object prefix is used. + +This RFC will leave room for this, and possibly other, approaches to be added in +the future. + +### Storing S3 object metadata in DynamoDB + +One recommendation in [a +comment](https://github.com/timberio/vector/issues/1017#issuecomment-699125610) +was to [store object created messages in +DynamoDB](https://aws.amazon.com/blogs/big-data/building-and-maintaining-an-amazon-s3-metadata-index-without-servers/) +(via Lambda). This could certainly work, but is a more complicated pipeline and +I'm unsure how common it would be. + +I'd recommend tabling this one and see if anyone asks for it. I consider this +approach representative of more complex S3 pipelines (e.g. you could imagine +having the Lambda write the metadata to RDS instead). + +### Running Vector as a Lambda + +It is possible to have [AWS SQS directly call +a Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html) so a user +could attempt to run Vector in a Lambda. We've generally tried to stay away from +recommending running Vector in a Lambda due to the fact that Lambda's are +largely stateless and some Vector components are stateful. + +## Outstanding Questions + +* We could attempt to make the compression handling more robust through falling + back to ["magic bytes"](https://en.wikipedia.org/wiki/List_of_file_signatures) + of the object to see if it _appears_ to be compressed. We may want to have + the user opt into this behavior though as I could see it causing issues in + edge cases where the files are not compressed but happen to match the magic + bytes. + +## Plan Of Attack + +* [ ] Submit a PR with the initial implementation of source + +## Future work + +* Perhaps allow deletion of S3 objects after they've been processed +* Conditionally pull object tags as additional metadata (requires an extra AWS + API request per object) +* Enhancing parallelization within vector. For example, could process N messages + at a time +* Update message visibility timeout if an object is still actively being + processed by an active vector process to avoid it being reprocessed by another + `vector`. Maybe this would be optional? +* Possibly adding rate limit parameters. I don't think they'd really be useful + in this case though given [the high number of requests/s S3 + allows](https://docs.aws.amazon.com/AmazonS3/latest/dev/optimizing-performance.html). From a8a90660107cb9d55b6ad6e1a2ccd2be16844360 Mon Sep 17 00:00:00 2001 From: Jesse Szwedko Date: Tue, 29 Sep 2020 15:37:24 -0400 Subject: [PATCH 2/5] Clarifying prior art approaches Signed-off-by: Jesse Szwedko --- rfcs/2020-09-29-4155-aws-s3-source.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/rfcs/2020-09-29-4155-aws-s3-source.md b/rfcs/2020-09-29-4155-aws-s3-source.md index 758ca668450b7..2f3eecdb8d422 100644 --- a/rfcs/2020-09-29-4155-aws-s3-source.md +++ b/rfcs/2020-09-29-4155-aws-s3-source.md @@ -97,9 +97,10 @@ instruct Vector to automatically try to decompress the objects via: ## Prior Art -* [fluentd](https://github.com/fluent/fluent-plugin-s3) -* [filebeat](https://www.elastic.co/guide/en/beats/filebeat/master/filebeat-input-s3.html) -* [logstash](https://github.com/logstash-plugins/logstash-input-s3) +* [fluentd](https://github.com/fluent/fluent-plugin-s3) (uses SQS approach) +* [filebeat](https://www.elastic.co/guide/en/beats/filebeat/master/filebeat-input-s3.html) (uses SQS approach) +* [logstash](https://github.com/logstash-plugins/logstash-input-s3) (uses + polling approach) ## Drawbacks From b693da8b8b124eaba9ee958dfd3a147b18b7ac0d Mon Sep 17 00:00:00 2001 From: Jesse Szwedko Date: Tue, 29 Sep 2020 15:38:54 -0400 Subject: [PATCH 3/5] Stray ; Signed-off-by: Jesse Szwedko --- rfcs/2020-09-29-4155-aws-s3-source.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rfcs/2020-09-29-4155-aws-s3-source.md b/rfcs/2020-09-29-4155-aws-s3-source.md index 2f3eecdb8d422..9d714f1b64e09 100644 --- a/rfcs/2020-09-29-4155-aws-s3-source.md +++ b/rfcs/2020-09-29-4155-aws-s3-source.md @@ -62,7 +62,7 @@ The new source configuration will look like: sqs.queue_url = "https://sqs.us-east-1.amazonaws.com/1234/test-s3-queue" sqs.poll_secs = 30 # minimum poll interval, optional, default sqs.visibility_timeout_secs = 300 # default visibility timeout for SQS message; if vector does not process the message in this time, the SQS message will be available to be reprocessed - sqs.delete_message = true # whether to delete the message after processing; false is; useful for debugging; default + sqs.delete_message = true # whether to delete the message after processing; false is useful for debugging; default ``` [A From a6c782180eaa03913c36e3f280de3a1b7bce53fd Mon Sep 17 00:00:00 2001 From: Jesse Szwedko Date: Wed, 30 Sep 2020 09:57:59 -0400 Subject: [PATCH 4/5] tabs to spaces Signed-off-by: Jesse Szwedko --- rfcs/2020-09-29-4155-aws-s3-source.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rfcs/2020-09-29-4155-aws-s3-source.md b/rfcs/2020-09-29-4155-aws-s3-source.md index 9d714f1b64e09..37d387a1825de 100644 --- a/rfcs/2020-09-29-4155-aws-s3-source.md +++ b/rfcs/2020-09-29-4155-aws-s3-source.md @@ -183,5 +183,5 @@ largely stateless and some Vector components are stateful. processed by an active vector process to avoid it being reprocessed by another `vector`. Maybe this would be optional? * Possibly adding rate limit parameters. I don't think they'd really be useful - in this case though given [the high number of requests/s S3 - allows](https://docs.aws.amazon.com/AmazonS3/latest/dev/optimizing-performance.html). + in this case though given [the high number of requests/s S3 + allows](https://docs.aws.amazon.com/AmazonS3/latest/dev/optimizing-performance.html). From c90aa50acd73cd854a6370f58f44035727e8fa0a Mon Sep 17 00:00:00 2001 From: Jesse Szwedko Date: Wed, 30 Sep 2020 12:42:38 -0400 Subject: [PATCH 5/5] Feedback Signed-off-by: Jesse Szwedko --- rfcs/2020-09-29-4155-aws-s3-source.md | 72 +++++++++++++++++++++++++-- 1 file changed, 69 insertions(+), 3 deletions(-) diff --git a/rfcs/2020-09-29-4155-aws-s3-source.md b/rfcs/2020-09-29-4155-aws-s3-source.md index 37d387a1825de..79700c90f3102 100644 --- a/rfcs/2020-09-29-4155-aws-s3-source.md +++ b/rfcs/2020-09-29-4155-aws-s3-source.md @@ -41,6 +41,72 @@ acks](https://github.com/timberio/vector/issues/3922). This approach will allow ingestion to be easily horizontally scaled by spinning up extra `vector` instances, relying on SQS to handle the fan-out. +Example SQS bucket notification message: + +```json +{ + "Records":[ + { + "eventVersion":"2.2", + "eventSource":"aws:s3", + "awsRegion":"us-west-2", + "eventTime":The time, in ISO-8601 format, for example, 1970-01-01T00:00:00.000Z, when Amazon S3 finished processing the request, + "eventName":"event-type", + "userIdentity":{ + "principalId":"Amazon-customer-ID-of-the-user-who-caused-the-event" + }, + "requestParameters":{ + "sourceIPAddress":"ip-address-where-request-came-from" + }, + "responseElements":{ + "x-amz-request-id":"Amazon S3 generated request ID", + "x-amz-id-2":"Amazon S3 host that processed the request" + }, + "s3":{ + "s3SchemaVersion":"1.0", + "configurationId":"ID found in the bucket notification configuration", + "bucket":{ + "name":"bucket-name", + "ownerIdentity":{ + "principalId":"Amazon-customer-ID-of-the-bucket-owner" + }, + "arn":"bucket-ARN" + }, + "object":{ + "key":"object-key", + "size":object-size, + "eTag":"object eTag", + "versionId":"object version if bucket is versioning-enabled, otherwise null", + "sequencer": "a string representation of a hexadecimal value used to determine event sequence, + only used with PUTs and DELETEs" + } + }, + "glacierEventData": { + "restoreEventData": { + "lifecycleRestorationExpiryTime": "The time, in ISO-8601 format, for example, 1970-01-01T00:00:00.000Z, of Restore Expiry", + "lifecycleRestoreStorageClass": "Source storage class for restore" + } + } + } + ] +} +``` + +A S3 client will be initialized when an SQS message is received using the +`s3.bucket.arn`. It will be cached so we don't need to recerate the client for +messages referring to the same bucket. + +Events will be published for: + +* When SQS message is receieved + * Number of records in message +* When SQS message is processed (success or failure) + * Number of success / failure / unprocessed records +* When object is processed + * Bytes + +These may be revised when the source is actually implemented. + ## Doc-level Proposal We will create a user guide based on @@ -53,8 +119,6 @@ The new source configuration will look like: [sources.my_source_id] # General type = "aws_s3" # required - bucket = "my-bucket" # required - region = "us-east-1" # required, required when endpoint = "" compression = "auto" # optional; compression format of the objects; one of: ["auto", "none", "gzip" "lz4" "snappy" "zstd"]; default strategy = "sqs" # optional, default, one of ["sqs"] @@ -174,7 +238,8 @@ largely stateless and some Vector components are stateful. ## Future work -* Perhaps allow deletion of S3 objects after they've been processed +* Allow deletion, or changing the storage class, of S3 objects after they've + been processed * Conditionally pull object tags as additional metadata (requires an extra AWS API request per object) * Enhancing parallelization within vector. For example, could process N messages @@ -185,3 +250,4 @@ largely stateless and some Vector components are stateful. * Possibly adding rate limit parameters. I don't think they'd really be useful in this case though given [the high number of requests/s S3 allows](https://docs.aws.amazon.com/AmazonS3/latest/dev/optimizing-performance.html). +* Consider reusing compression handling logic with `file` source