Skip to content

Latest commit

 

History

History
253 lines (196 loc) · 10.2 KB

2020-09-29-4155-aws-s3-source.md

File metadata and controls

253 lines (196 loc) · 10.2 KB

RFC #4155 - 2020-09-29 - AWS S3 source

This RFC describes a new aws_s3 source for ingesting events stored as objects in S3 via SQS bucket notifications. This will permit, for example, ingesting CloudTrail events, VPC flow logs, and ELB logs which can be written to S3, as well as custom logging pipelines writing there.

Scope

This RFC will cover the implementation of an aws_s3 source that will rely on SQS for bucket notifications.

It will not cover the proposed polling strategy, but will leave space for it to be added later in a backwards compatible fashion.

It will not cover parsing of events within the objects. Vector's current approach for this is to delegate to transforms. For example, it is likely we'll want to add a transform to extract CloudTrail events stored as objects in S3.

Motivation

There are some types of logs that AWS can write to S3. For example:

  • CloudTrail logs (can also go to CloudWatch Logs but with a size limitation)
  • AWS load balancer access logs (only S3)
  • VPC flow logs (can also go to CloudWatch logs)

Even if some AWS service logs can go to CloudWatch Logs, I think it will be common that operators will want to just send them to S3 as it is cheaper storage if they don't need to regularly access them.

Internal Proposal

A new aws_s3 source will be added that will poll SQS for bucket notifications and ingest new objects from S3. It will wait to delete the SQS message until the object has been fully processed to guarantee "at least" once delivery to the pipeline (though it can still be lost after that until we tackle end-to-end acks.

This approach will allow ingestion to be easily horizontally scaled by spinning up extra vector instances, relying on SQS to handle the fan-out.

Example SQS bucket notification message:

{
   "Records":[
      {
         "eventVersion":"2.2",
         "eventSource":"aws:s3",
         "awsRegion":"us-west-2",
         "eventTime":The time, in ISO-8601 format, for example, 1970-01-01T00:00:00.000Z, when Amazon S3 finished processing the request,
         "eventName":"event-type",
         "userIdentity":{
            "principalId":"Amazon-customer-ID-of-the-user-who-caused-the-event"
         },
         "requestParameters":{
            "sourceIPAddress":"ip-address-where-request-came-from"
         },
         "responseElements":{
            "x-amz-request-id":"Amazon S3 generated request ID",
            "x-amz-id-2":"Amazon S3 host that processed the request"
         },
         "s3":{
            "s3SchemaVersion":"1.0",
            "configurationId":"ID found in the bucket notification configuration",
            "bucket":{
               "name":"bucket-name",
               "ownerIdentity":{
                  "principalId":"Amazon-customer-ID-of-the-bucket-owner"
               },
               "arn":"bucket-ARN"
            },
            "object":{
               "key":"object-key",
               "size":object-size,
               "eTag":"object eTag",
               "versionId":"object version if bucket is versioning-enabled, otherwise null",
               "sequencer": "a string representation of a hexadecimal value used to determine event sequence,
                   only used with PUTs and DELETEs"
            }
         },
         "glacierEventData": {
            "restoreEventData": {
               "lifecycleRestorationExpiryTime": "The time, in ISO-8601 format, for example, 1970-01-01T00:00:00.000Z, of Restore Expiry",
               "lifecycleRestoreStorageClass": "Source storage class for restore"
            }
         }
      }
   ]
}

A S3 client will be initialized when an SQS message is received using the s3.bucket.arn. It will be cached so we don't need to recreate the client for messages referring to the same bucket.

Events will be published for:

  • When SQS message is received
    • Number of records in message
  • When SQS message is processed (success or failure)
    • Number of success / failure / unprocessed records
  • When object is processed
    • Bytes

These may be revised when the source is actually implemented.

Doc-level Proposal

We will create a user guide based on AWS's for configuring bucket notifications to publish object created events to SQS.

The new source configuration will look like:

[sources.my_source_id]
  # General
  type = "aws_s3" # required
  compression = "auto" # optional; compression format of the objects; one of: ["auto", "none", "gzip" "lz4" "snappy" "zstd"]; default

  strategy = "sqs" # optional, default, one of ["sqs"]

  sqs.queue_url = "https://sqs.us-east-1.amazonaws.com/1234/test-s3-queue"
  sqs.poll_secs = 30 # minimum poll interval, optional, default
  sqs.visibility_timeout_secs = 300 # default visibility timeout for SQS message; if vector does not process the message in this time, the SQS message will be available to be reprocessed
  sqs.delete_message = true # whether to delete the message after processing; false is useful for debugging; default

A comment mentions desiring the ability to filter objects by prefix, but I think this is better configured when setting up bucket notifications to avoid publishing notifications for these objects at all.

Log metadata

The LastModified value for the object will be used as the timestamp.

The object key, bucket, and region will be associated with the log as fields: key, bucket, and region.

All custom S3 object metadata key/values will be set as fields on the log event.

Object compression

It is common to compress objects stored in S3 as part of the upload process. To facilitate this, we will allow the user to indicate the compression format in the config, as show above, but we will also support an "auto" value that will instruct Vector to automatically try to decompress the objects via:

  • If Content-Type of the object is set to a known compression mime-type, use this, otherwise:
  • If the file extension matches a known compression file extension, use this, otherwise:
  • Do no decoding and treat it as if the encoding was "none"

Prior Art

Drawbacks

Maintenance burden of an additional source.

Alternatives

There are a few alternate approaches (some also noted by James).

SNS notifications

AWS S3 can be configured to also send push-based notifications via SNS. This would require running Vector in a way where AWS can send requests to it.

This may be a worthwhile approach in the future as it would result in faster object ingestion than polling SQS, but I think SQS's pull-based model is likely to be much more easy to use out-of-the-box as it does not require exposing a vector instance to incoming network requests and SQS consumers are more easily parallelizable (SNS would require a load balancer).

I'd recommend tabling this one for now.

Polling

As mentioned in #1017, another approach is to long-poll the bucket; listing all of the objects, and processing any new ones.

This approach is liable to take a long time to process a bucket (hours or days with large buckets) and so is not the most generally applicable approach. It can work for small buckets or if a tightly scoped object prefix is used.

This RFC will leave room for this, and possibly other, approaches to be added in the future.

Storing S3 object metadata in DynamoDB

One recommendation in a comment was to store object created messages in DynamoDB (via Lambda). This could certainly work, but is a more complicated pipeline and I'm unsure how common it would be.

I'd recommend tabling this one and see if anyone asks for it. I consider this approach representative of more complex S3 pipelines (e.g. you could imagine having the Lambda write the metadata to RDS instead).

Running Vector as a Lambda

It is possible to have AWS SQS directly call a Lambda so a user could attempt to run Vector in a Lambda. We've generally tried to stay away from recommending running Vector in a Lambda due to the fact that Lambda's are largely stateless and some Vector components are stateful.

Outstanding Questions

  • We could attempt to make the compression handling more robust through falling back to "magic bytes" of the object to see if it appears to be compressed. We may want to have the user opt into this behavior though as I could see it causing issues in edge cases where the files are not compressed but happen to match the magic bytes.

Plan Of Attack

  • Submit a PR with the initial implementation of source

Future work

  • Allow deletion, or changing the storage class, of S3 objects after they've been processed
  • Conditionally pull object tags as additional metadata (requires an extra AWS API request per object)
  • Enhancing parallelization within vector. For example, could process N messages at a time
  • Update message visibility timeout if an object is still actively being processed by an active vector process to avoid it being reprocessed by another vector. Maybe this would be optional?
  • Possibly adding rate limit parameters. I don't think they'd really be useful in this case though given the high number of requests/s S3 allows.
  • Consider reusing compression handling logic with file source