Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 253 additions & 0 deletions rfcs/2020-09-29-4155-aws-s3-source.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
## RFC #4155 - 2020-09-29 - AWS S3 source

This RFC describes a new `aws_s3` source for ingesting events stored as objects
in S3 via SQS bucket notifications. This will permit, for example, ingesting
CloudTrail events, VPC flow logs, and ELB logs which can be written to S3, as
well as custom logging pipelines writing there.

## Scope

This RFC will cover the implementation of an `aws_s3` source that will rely on
SQS for bucket notifications.

It will not cover the [proposed polling
strategy](https://github.com/timberio/vector/issues/1017#issuecomment-694287111),
but will leave space for it to be added later in a backwards compatible fashion.

It will not cover parsing of events within the objects. Vector's current
approach for this is to delegate to transforms. For example, it is likely we'll
want to add a transform to extract CloudTrail events stored as objects in S3.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we close this in favor of a transform? If so, we should create a transform issue to cover that future work.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be a feature that still needs to exist.
I am not sure if customers would want to configure their AWS services to invoke additional costs in order to work-around a limitation in their log router. Not that an S3 bucket of CloudTrail logs would be particularly expensive, but it's the principle of the matter. (that said, my org is already routing CloudTrail to s3)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For CloudTrail, specifically, I think the only options for ingestion by vector are shipping to S3 or shipping to CloudWatch Logs. Are you aware of any other options? LookupEvents is limited to 2 req/s which seems like it would make it infeasible for scraping.

I think the naive aws_cloudwatch_logs source (#3077) is still relevant apart from CloudTrail.

We may want a transform specifically to handle CloudTrail events in S3, but I'm not 100% sure. Just using the json_parser transform seems like it might be sufficient. Again, this relates back to the idea of Vector config macros that would allow users to more easily configure ingesting CloudTrail events from S3 without needing to string together a source with relevant transforms.


Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding Metrics to the scope?

One incredibly useful feature of this would be to see :

  • The rate of logs-files being processed.
  • The rate of new messages(logs) arriving in SQS

So that I can quickly calculate if the Vector ingest is keeping up with the load, or if I need to introduce more replicas.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like the CloudWatch Metrics for SQS is the best place to get queue metrics and would let you know if the consumers are keeping up, but I'll add some internal event (from which internal metrics are derived) notes to this RFC.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added notes on internal events.

## Motivation

There are some types of logs that AWS can write to S3. For example:

* CloudTrail logs (can also go to CloudWatch Logs but with a size limitation)
* AWS load balancer access logs (only S3)
* VPC flow logs (can also go to CloudWatch logs)
Comment thread
jszwedko marked this conversation as resolved.

Even if some AWS service logs _can_ go to CloudWatch Logs, I think it will be
common that operators will want to just send them to S3 as it is cheaper storage
if they don't need to regularly access them.

## Internal Proposal

A new `aws_s3` source will be added that will poll SQS for bucket notifications
and ingest new objects from S3. It will wait to delete the SQS message until the
object has been fully processed to guarantee "at least" once delivery to the
pipeline (though it can still be lost after that until we tackle [end-to-end
acks](https://github.com/timberio/vector/issues/3922).

This approach will allow ingestion to be easily horizontally scaled by spinning
up extra `vector` instances, relying on SQS to handle the fan-out.

Example SQS bucket notification message:

```json
{
"Records":[
{
"eventVersion":"2.2",
"eventSource":"aws:s3",
"awsRegion":"us-west-2",
"eventTime":The time, in ISO-8601 format, for example, 1970-01-01T00:00:00.000Z, when Amazon S3 finished processing the request,
"eventName":"event-type",
"userIdentity":{
"principalId":"Amazon-customer-ID-of-the-user-who-caused-the-event"
},
"requestParameters":{
"sourceIPAddress":"ip-address-where-request-came-from"
},
"responseElements":{
"x-amz-request-id":"Amazon S3 generated request ID",
"x-amz-id-2":"Amazon S3 host that processed the request"
},
"s3":{
"s3SchemaVersion":"1.0",
"configurationId":"ID found in the bucket notification configuration",
"bucket":{
"name":"bucket-name",
"ownerIdentity":{
"principalId":"Amazon-customer-ID-of-the-bucket-owner"
},
"arn":"bucket-ARN"
},
"object":{
"key":"object-key",
"size":object-size,
"eTag":"object eTag",
"versionId":"object version if bucket is versioning-enabled, otherwise null",
"sequencer": "a string representation of a hexadecimal value used to determine event sequence,
only used with PUTs and DELETEs"
}
},
"glacierEventData": {
"restoreEventData": {
"lifecycleRestorationExpiryTime": "The time, in ISO-8601 format, for example, 1970-01-01T00:00:00.000Z, of Restore Expiry",
"lifecycleRestoreStorageClass": "Source storage class for restore"
}
}
}
]
}
```

A S3 client will be initialized when an SQS message is received using the
`s3.bucket.arn`. It will be cached so we don't need to recerate the client for
messages referring to the same bucket.

Events will be published for:

* When SQS message is receieved
* Number of records in message
* When SQS message is processed (success or failure)
* Number of success / failure / unprocessed records
* When object is processed
* Bytes

These may be revised when the source is actually implemented.

## Doc-level Proposal

We will create a user guide based on
[AWS's](https://docs.aws.amazon.com/AmazonS3/latest/user-guide/enable-event-notifications.html#s3-event-notification-destinations)
for configuring bucket notifications to publish object created events to SQS.

The new source configuration will look like:

```toml
[sources.my_source_id]
# General
type = "aws_s3" # required
compression = "auto" # optional; compression format of the objects; one of: ["auto", "none", "gzip" "lz4" "snappy" "zstd"]; default

strategy = "sqs" # optional, default, one of ["sqs"]

sqs.queue_url = "https://sqs.us-east-1.amazonaws.com/1234/test-s3-queue"
sqs.poll_secs = 30 # minimum poll interval, optional, default
sqs.visibility_timeout_secs = 300 # default visibility timeout for SQS message; if vector does not process the message in this time, the SQS message will be available to be reprocessed
sqs.delete_message = true # whether to delete the message after processing; false is useful for debugging; default
Comment thread
binarylogic marked this conversation as resolved.
```

Comment thread
jszwedko marked this conversation as resolved.
[A
comment](https://github.com/timberio/vector/issues/1017#issuecomment-699125610)
mentions desiring the ability to filter objects by prefix, but I think this is
better configured when setting up bucket notifications to avoid publishing
notifications for these objects at all.

### Log metadata

The `LastModified` value for the object will be used as the `timestamp`.

The object key, bucket, and region will be associated with the log as fields:
`key`, `bucket`, and `region`.

All [custom S3 object metadata
key/values](https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html#object-metadata)
will be set as fields on the log event.
Comment on lines +145 to +147
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this prior art? I'm curious which other tools do this.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FluentD does this: https://github.com/fluent/fluent-plugin-s3 (see add_object_metadata). We could make it conditional, but I don't see a downside of just doing it by default.


### Object compression

It is common to compress objects stored in S3 as part of the upload process. To
facilitate this, we will allow the user to indicate the compression format in
the config, as show above, but we will also support an "auto" value that will
Comment thread
jszwedko marked this conversation as resolved.
instruct Vector to automatically try to decompress the objects via:

* If `Content-Type` of the object is set to a known compression mime-type, use
this, otherwise:
* If the file extension matches a known compression file extension, use this,
otherwise:
* Do no decoding and treat it as if the encoding was "none"

## Prior Art

* [fluentd](https://github.com/fluent/fluent-plugin-s3) (uses SQS approach)
* [filebeat](https://www.elastic.co/guide/en/beats/filebeat/master/filebeat-input-s3.html) (uses SQS approach)
* [logstash](https://github.com/logstash-plugins/logstash-input-s3) (uses
polling approach)

## Drawbacks

Maintenance burden of an additional source.

## Alternatives

There are a few alternate approaches ([some also noted by
James](https://github.com/timberio/vector/issues/1017#issuecomment-699125610)).

### SNS notifications

AWS S3 can be configured to also send push-based notifications via SNS. This
would require running Vector in a way where AWS can send requests to it.

This may be a worthwhile approach in the future as it would result in faster
object ingestion than polling SQS, but I think SQS's pull-based model is likely
to be much more easy to use out-of-the-box as it does not require exposing
a `vector` instance to incoming network requests and SQS consumers are more
easily parallelizable (SNS would require a load balancer).

I'd recommend tabling this one for now.

### Polling

As mentioned in
[#1017](https://github.com/timberio/vector/issues/1017#issuecomment-694287111),
another approach is to long-poll the bucket; listing all of the objects, and
processing any new ones.

This approach is liable to take a long time to process a bucket (hours or days
with large buckets) and so is not the most generally applicable approach. It can
work for small buckets or if a tightly scoped object prefix is used.

This RFC will leave room for this, and possibly other, approaches to be added in
the future.

### Storing S3 object metadata in DynamoDB

One recommendation in [a
comment](https://github.com/timberio/vector/issues/1017#issuecomment-699125610)
was to [store object created messages in
DynamoDB](https://aws.amazon.com/blogs/big-data/building-and-maintaining-an-amazon-s3-metadata-index-without-servers/)
(via Lambda). This could certainly work, but is a more complicated pipeline and
I'm unsure how common it would be.

I'd recommend tabling this one and see if anyone asks for it. I consider this
approach representative of more complex S3 pipelines (e.g. you could imagine
having the Lambda write the metadata to RDS instead).

### Running Vector as a Lambda

It is possible to have [AWS SQS directly call
a Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html) so a user
could attempt to run Vector in a Lambda. We've generally tried to stay away from
recommending running Vector in a Lambda due to the fact that Lambda's are
largely stateless and some Vector components are stateful.

## Outstanding Questions

* We could attempt to make the compression handling more robust through falling
back to ["magic bytes"](https://en.wikipedia.org/wiki/List_of_file_signatures)
of the object to see if it _appears_ to be compressed. We may want to have
the user opt into this behavior though as I could see it causing issues in
edge cases where the files are not compressed but happen to match the magic
bytes.

## Plan Of Attack

* [ ] Submit a PR with the initial implementation of source

## Future work
Comment thread
jszwedko marked this conversation as resolved.

* Allow deletion, or changing the storage class, of S3 objects after they've
been processed
* Conditionally pull object tags as additional metadata (requires an extra AWS
API request per object)
* Enhancing parallelization within vector. For example, could process N messages
at a time
* Update message visibility timeout if an object is still actively being
processed by an active vector process to avoid it being reprocessed by another
`vector`. Maybe this would be optional?
* Possibly adding rate limit parameters. I don't think they'd really be useful
in this case though given [the high number of requests/s S3
allows](https://docs.aws.amazon.com/AmazonS3/latest/dev/optimizing-performance.html).
* Consider reusing compression handling logic with `file` source