Skip to content

Commit

Permalink
feat(new source): aws_s3 source (vectordotdev#4779)
Browse files Browse the repository at this point in the history
See RFC: https://github.com/timberio/vector/blob/master/rfcs/2020-09-29-4155-aws-s3-source.md
Fixes vectordotdev#1017

This is the initial implementation of an `aws_s3` source that relies an AWS SQS for bucket notifications to inform of new S3 objects to consume. See RFC for discussion of other approaches and why this one was chosen. The source does have an enum `strategy` configuration to allow for additional approaches (like SNS or long polling) to be supported.

The basic flow is:

User setup:
* bucket is created
* queue is created
* bucket is configured to notify the queue for ObjectCreated events
* vector is configured with the `aws_s3` source using the queue configuration whereupon it will process the ObjectCreated events to read each S3 object.

Example configuration:

```toml
[sources.s3]
  type = "aws_s3"

  region = "us-east-1"
  strategy = "sqs"
  compression = "gzip"

  sqs.queue_name = "jesse-test-flow-logs"
  sqs.delete_message = false

[sinks.console]
  type = "console"
  inputs = ["s3"]
  encoding.codec = "json"
```

The commits can be viewed in-order, but the resulting diff probably isn't to bad either (it's mostly just `sources/aws_s3`). It may be worth looking at the added cue documentation first.

The source also behaves very much like the `file` source where it emits one event per-line, but also supports the same multiline configuration that the `file` source supports. **Note** there is a rough edge here where the `multiline` config supports a `timeout_ms` option that isn't really applicable here but is applied just the same.

Future work:
1. Additional codec support (like `application/ndjson`). For now, this acts very much like the `file` source. This could be looped into the general work around codecs  vectordotdev#4278
2. Additional compression formats (Snappy, LZ4, Zip). This was requested in the original issue. I started with just supporting the formats that were supported out-of-the-box by the `async_compression` crate we are using.
3. (potential) multi-region support. Currently we only support reading from a queue and a bucket in the same region. I expect this will cover most cases since AWS requires the bucket to publish notifications to a queue in the same region. One could forward messages from a queue in one region to another, but this seems unlikely. I'd prefer to wait and see if anyone asks for multi-region support; especially given that fluentbit and filebeat have the same restriction.
4. Concurrent processing. Right now one message is processed at a time which leads to predictable behavior, but we may observe some performance improvements by processing multiple objects at once. The benefit should be vetted though, the process may be limited by incoming network bandwidth anyway.
5. Refresh the message visibility timeout. Right now, the visibility timeout is set once, when the message is retrieved, but we could refresh this timeout if we are still processing a message when it gets close to the end of the timeout to avoid another vector instance picking it up. This would let users have the best of both worlds: short visibility timeouts to quickly reprocess messages when a `vector` instance falls over, but also avoiding concurrent processing of messages for large objects where the processing time exceeds the visibility timeout.

I'll create issues for 2 and 5. I think the others can be left until we observe their necessity.

Signed-off-by: Brian Menges <brian.menges@anaplan.com>
  • Loading branch information
jszwedko authored and Brian Menges committed Dec 9, 2020
1 parent 7f275ce commit 7c912cf
Show file tree
Hide file tree
Showing 15 changed files with 1,628 additions and 159 deletions.
2 changes: 2 additions & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
/docs/reference/components/sinks/tcp.cue @bruceg @lukesteensen

/docs/reference/components/sources/apache_metrics.cue @jszwedko
/docs/reference/components/sources/aws_s3.cue @jszwedko
/docs/reference/components/sources/docker.cue @fanatid
/docs/reference/components/sources/generator.cue @bruceg
/docs/reference/components/sources/host_metrics.cue @bruceg
Expand Down Expand Up @@ -132,6 +133,7 @@

/src/sources/apache_metrics/ @jszwedko
/src/sources/aws_kinesis_firehose/ @jszwedko
/src/sources/aws_s3.rs @jszwedko
/src/sources/docker.rs @fanatid
/src/sources/generator.rs @bruceg
/src/sources/host_metrics.rs @bruceg
Expand Down
60 changes: 58 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ metrics-tracing-context = { version = "0.1.0-alpha" }
rusoto_core = { version = "0.45.0", features = ["encoding"], optional = true }
rusoto_es = { version = "0.45.0", optional = true }
rusoto_s3 = { version = "0.45.0", optional = true }
rusoto_sqs = { version = "0.45.0", optional = true }
rusoto_logs = { version = "0.45.0", optional = true }
rusoto_cloudwatch = { version = "0.45.0", optional = true }
rusoto_kinesis = { version = "0.45.0", optional = true }
Expand Down Expand Up @@ -130,7 +131,7 @@ hyper-openssl = "0.8"
openssl = "0.10.30"
openssl-probe = "0.1.2"
flate2 = "1.0.19"
async-compression = { version = "0.3.5", features = ["tokio-02", "gzip"] }
async-compression = { version = "0.3.5", features = ["tokio-02", "gzip", "zstd"] }
structopt = "0.3.19"
indexmap = {version = "1.5.1", features = ["serde-1"]}
http = "0.2"
Expand All @@ -145,6 +146,7 @@ headers = "0.3"
rdkafka = { version = "0.24.0", features = ["libz", "ssl", "zstd"], optional = true }
hostname = "0.3.1"
seahash = { version = "3.0.6", optional = true }
semver = { version = "0.11.0", features = ["serde"] }
jemallocator = { version = "0.3.0", optional = true }
lazy_static = "1.3.0"
rlua = { git = "https://github.com/kyren/rlua", optional = true }
Expand Down Expand Up @@ -297,6 +299,7 @@ api-client = [
sources = [
"sources-apache_metrics",
"sources-aws_kinesis_firehose",
"sources-aws_s3",
"sources-docker",
"sources-file",
"sources-generator",
Expand All @@ -318,6 +321,7 @@ sources = [
]
sources-apache_metrics = []
sources-aws_kinesis_firehose = ["base64", "tls", "warp"]
sources-aws_s3 = ["rusoto_core", "rusoto_credential", "rusoto_signature", "rusoto_sts", "rusoto_s3", "rusoto_sqs"]
sources-docker = ["bollard"]
sources-file = ["bytesize", "file-source"]
sources-generator = []
Expand Down Expand Up @@ -502,7 +506,7 @@ aws-cloudwatch-metrics-integration-tests = ["sinks-aws_cloudwatch_metrics"]
aws-ec2-metadata-integration-tests = ["transforms-aws_ec2_metadata"]
aws-kinesis-firehose-integration-tests = ["sinks-aws_kinesis_firehose", "sinks-elasticsearch", "rusoto_es"]
aws-kinesis-streams-integration-tests = ["sinks-aws_kinesis_streams"]
aws-s3-integration-tests = ["sinks-aws_s3"]
aws-s3-integration-tests = ["sources-aws_s3", "sinks-aws_s3"]
clickhouse-integration-tests = ["sinks-clickhouse", "warp"]
docker-integration-tests = ["sources-docker", "unix"]
es-integration-tests = ["sinks-elasticsearch"]
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ ifeq ($(CONTAINER_TOOL),podman)
$(CONTAINER_TOOL) run -d --$(CONTAINER_ENCLOSURE)=vector-test-integration-aws --name vector_ec2_metadata \
timberiodev/mock-ec2-metadata:latest
$(CONTAINER_TOOL) run -d --$(CONTAINER_ENCLOSURE)=vector-test-integration-aws --name vector_localstack_aws \
-e SERVICES=kinesis,s3,cloudwatch,elasticsearch,es,firehose \
-e SERVICES=kinesis,s3,cloudwatch,elasticsearch,es,firehose,sqs \
localstack/localstack-full:0.11.6
$(CONTAINER_TOOL) run -d --$(CONTAINER_ENCLOSURE)=vector-test-integration-aws --name vector_mockwatchlogs \
-e RUST_LOG=trace luciofranco/mockwatchlogs:latest
Expand All @@ -324,7 +324,7 @@ else
timberiodev/mock-ec2-metadata:latest
$(CONTAINER_TOOL) run -d --$(CONTAINER_ENCLOSURE)=vector-test-integration-aws --name vector_localstack_aws \
-p 4566:4566 -p 4571:4571 \
-e SERVICES=kinesis,s3,cloudwatch,elasticsearch,es,firehose \
-e SERVICES=kinesis,s3,cloudwatch,elasticsearch,es,firehose,sqs \
localstack/localstack-full:0.11.6
$(CONTAINER_TOOL) run -d --$(CONTAINER_ENCLOSURE)=vector-test-integration-aws -p 6000:6000 --name vector_mockwatchlogs \
-e RUST_LOG=trace luciofranco/mockwatchlogs:latest
Expand Down
153 changes: 153 additions & 0 deletions docs/reference/components/aws.cue
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package metadata

import (
"strings"
)

components: [Kind=string]: [Name=string]: {
if Kind == "sink" || Kind == "source" {
if strings.HasPrefix(Name, "aws_") {
configuration: {
assume_role: {
category: "Auth"
common: false
description: "The ARN of an [IAM role](\(urls.aws_iam_role)) to assume at startup."
required: false
type: string: {
default: null
examples: ["arn:aws:iam::123456789098:role/my_role"]
}
}

endpoint: {
common: false
description: "Custom endpoint for use with AWS-compatible services. Providing a value for this option will make `region` moot."
relevant_when: "region = null"
required: false
type: string: {
default: null
examples: ["127.0.0.0:5000/path/to/service"]
}
}

region: {
description: "The [AWS region](\(urls.aws_regions)) of the target service. If `endpoint` is provided it will override this value since the endpoint includes the region."
required: true
relevant_when: "endpoint = null"
type: string: {
examples: ["us-east-1"]
}
}
}

env_vars: {
AWS_ACCESS_KEY_ID: {
description: "The AWS access key id. Used for AWS authentication when communicating with AWS services."
type: string: {
default: null
examples: ["AKIAIOSFODNN7EXAMPLE"]
}
}

AWS_CONFIG_FILE: {
description: "Specifies the location of the file that the AWS CLI uses to store configuration profiles."
type: string: {
default: "~/.aws/config"
}
}

AWS_CREDENTIAL_EXPIRATION: {
description: "Expiration time in RFC 3339 format. If unset, credentials won't expire."
type: string: {
default: null
examples: ["1996-12-19T16:39:57-08:00"]
}
}

AWS_DEFAULT_REGION: {
description: "The default [AWS region](\(urls.aws_regions))."
relevant_when: "endpoint = null"
type: string: {
default: null
examples: ["/path/to/credentials.json"]
}
}

AWS_PROFILE: {
description: "Specifies the name of the CLI profile with the credentials and options to use. This can be the name of a profile stored in a credentials or config file."
type: string: {
default: "default"
examples: ["my-custom-profile"]
}
}

AWS_ROLE_SESSION_NAME: {
description: "Specifies a name to associate with the role session. This value appears in CloudTrail logs for commands performed by the user of this profile."
type: string: {
default: null
examples: ["vector-session"]
}
}

AWS_SECRET_ACCESS_KEY: {
description: "The AWS secret access key. Used for AWS authentication when communicating with AWS services."
type: string: {
default: null
examples: ["wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"]
}
}

AWS_SHARED_CREDENTIALS_FILE: {
description: "Specifies the location of the file that the AWS CLI uses to store access keys."
type: string: {
default: "~/.aws/credentials"
}
}

AWS_SESSION_TOKEN: {
description: "The AWS session token. Used for AWS authentication when communicating with AWS services."
type: string: {
default: null
examples: ["/path/to/credentials.json"]
}
}
}

how_it_works: {
aws_authentication: {
title: "AWS Authentication"
body: """
Vector checks for AWS credentials in the following order:
1. Environment variables `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`.
2. The [`credential_process` command](\(urls.aws_credential_process)) in the AWS config file. (usually located at `~/.aws/config`)
3. The [AWS credentials file](\(urls.aws_credentials_file)). (usually located at `~/.aws/credentials`)
4. The [IAM instance profile](\(urls.iam_instance_profile)). (will only work if running on an EC2 instance with an instance profile/role)
If credentials are not found the [healtcheck](#healthchecks) will fail and an
error will be [logged][docs.monitoring#logs].
"""
sub_sections: [
{
title: "Obtaining an access key"
body: """
In general, we recommend using instance profiles/roles whenever possible. In
cases where this is not possible you can generate an AWS access key for any user
within your AWS account. AWS provides a [detailed guide](\(urls.aws_access_keys)) on
how to do this.
"""
},
{
title: "Assuming roles"
body: """
Vector can assume an AWS IAM role via the [`assume_role`](#assume_role) option. This is an
optional setting that is helpful for a variety of use cases, such as cross
account access.
"""
},
]
}
}
}
}
}
Loading

0 comments on commit 7c912cf

Please sign in to comment.