Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(new source): aws_s3 source #4779

Merged
merged 49 commits into from Nov 10, 2020
Merged

feat(new source): aws_s3 source #4779

merged 49 commits into from Nov 10, 2020

Conversation

jszwedko
Copy link
Member

@jszwedko jszwedko commented Oct 27, 2020

See RFC: https://github.com/timberio/vector/blob/master/rfcs/2020-09-29-4155-aws-s3-source.md
Fixes #1017

This is the initial implementation of an aws_s3 source that relies an AWS SQS for bucket notifications to inform of new S3 objects to consume. See RFC for discussion of other approaches and why this one was chosen. The source does have an enum strategy configuration to allow for additional approaches (like SNS or long polling) to be supported.

The basic flow is:

User setup:

  • bucket is created
  • queue is created
  • bucket is configured to notify the queue for ObjectCreated events
  • vector is configured with the aws_s3 source using the queue configuration whereupon it will process the ObjectCreated events to read each S3 object.

Example configuration:

[sources.s3]
  type = "aws_s3"

  region = "us-east-1"
  strategy = "sqs"
  compression = "gzip"

  sqs.queue_name = "jesse-test-flow-logs"
  sqs.delete_message = false

[sinks.console]
  type = "console"
  inputs = ["s3"]
  encoding.codec = "json"

The commits can be viewed in-order, but the resulting diff probably isn't to bad either (it's mostly just sources/aws_s3). It may be worth looking at the added cue documentation first.

The source also behaves very much like the file source where it emits one event per-line, but also supports the same multiline configuration that the file source supports. Note there is a rough edge here where the multiline config supports a timeout_ms option that isn't really applicable here but is applied just the same.

Future work:

  1. Additional codec support (like application/ndjson). For now, this acts very much like the file source. This could be looped into the general work around codecs Decouple metrics parsers as codecs #4278
  2. Additional compression formats (Snappy, LZ4, Zip). This was requested in the original issue. I started with just supporting the formats that were supported out-of-the-box by the async_compression crate we are using.
  3. (potential) multi-region support. Currently we only support reading from a queue and a bucket in the same region. I expect this will cover most cases since AWS requires the bucket to publish notifications to a queue in the same region. One could forward messages from a queue in one region to another, but this seems unlikely. I'd prefer to wait and see if anyone asks for multi-region support; especially given that fluentbit and filebeat have the same restriction.
  4. Concurrent processing. Right now one message is processed at a time which leads to predictable behavior, but we may observe some performance improvements by processing multiple objects at once. The benefit should be vetted though, the process may be limited by incoming network bandwidth anyway.
  5. Refresh the message visibility timeout. Right now, the visibility timeout is set once, when the message is retrieved, but we could refresh this timeout if we are still processing a message when it gets close to the end of the timeout to avoid another vector instance picking it up. This would let users have the best of both worlds: short visibility timeouts to quickly reprocess messages when a vector instance falls over, but also avoiding concurrent processing of messages for large objects where the processing time exceeds the visibility timeout.

I'll create issues for 2 and 5. I think the others can be left until we observe their necessity.

Copy link
Member Author

@jszwedko jszwedko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could probably be better refactored by having the SQS bits implement an interface to drive a top-level object reader by having it simply return S3 keys to read, but I figured that was better left until we actually add another "strategy" which could then refactor this a bit to share the logic doing the object reads and event creation.

docs/reference/components/aws.cue Show resolved Hide resolved
src/sources/aws_s3/mod.rs Show resolved Hide resolved
src/sources/aws_s3/sqs.rs Outdated Show resolved Hide resolved
body,
);

// Record the read error saw to propagate up later so we avoid ack'ing the SQS
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideas here also welcome 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really sure what the issue is here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just feels a little messy to me to record intermediate state rather than just streaming. We ended up having a conversation about it in discord though: https://discordapp.com/channels/742820443487993987/751107681217019944/771486734110162964

My conclusion from that conversation is that it'd be nice to use more TryStreams and have send_all stop at the first error and return it so that I could flow all errors and then handle them in one place.

#[cfg(feature = "aws-s3-integration-tests")]
#[cfg(test)]
mod integration_tests {
use super::{sqs, AwsS3Config, Compression, Strategy};
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I abstained from using use super::* here to see how it felt. I think I like the explicitness better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would personally use use super::* in tests, especially since basically all the use statements below are redundant to what's already imported in the parent module.

But, it's a minor nit, so I'm fine leaving this experiment in.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm not 100% on it. Some of the imports are duplicated, but there is also a much smaller set of imports in the integration test and I found it easier to see where certain types were coming from rather than having to check first the integration_tests imports and then super's imports.

@jszwedko jszwedko requested a review from bruceg October 27, 2020 19:25
A number of outstanding TODOs

Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Also brings up-to-date with master changes.

Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
I ended up dropping a few types that were not already supported
`async_compression`. I'll be creating separate issues to add support for
them.

Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Also propagate up some errors. I still need to emit them.

Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Also resolve a couple of TODOs that are stale.

Just create the Resolver pending
#4685

Remove comment about FIFO handling as it seems to be the same from the
consumer side as a standard queue.

Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
We only want to process ObjectCreated events

Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Also remove stale TODO

Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Ensure that it is a positive i64

Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
This is a failsafe that I don't think will actually be hit in normal
cases

Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Also added comment to unwrap() use

Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Also removed future work notes to attach to PR

Removed TODO for validating shutdown as it does appear to be structured
well

Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Resolving remaining TODO. This appears to be the pattern in other
internal_events (specifically prometheus's).

Also fix spelling error

Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Matches other AWS components.

Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Will add examples and pull in AWS config

Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
To apply it to both sinks and sources using AWS.

Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
@jszwedko
Copy link
Member Author

For the S3 event record version validation, I ended up pulling in the semver crate to handle the deserialization and comparison. It's a small crate and we were already indirectly depending on it:

$ cargo tree -i  semver      
semver v0.9.0
├── cargo-lock v4.0.1
│   └── built v0.4.3
│       [build-dependencies]
│       └── vector v0.11.0 (/home/jesse/workspace/vector)
└── rustc_version v0.2.3
    [build-dependencies]
    ├── cast v0.2.3
    │   ├── criterion v0.3.3
    │   │   [dev-dependencies]
    │   │   └── vector v0.11.0 (/home/jesse/workspace/vector)
    │   └── criterion-plot v0.4.3
    │       └── criterion v0.3.3 (*)
    ├── raw-cpuid v8.1.1
    │   └── heim-virt v0.1.0-beta.1
    │       └── heim v0.1.0-beta.3
    │           └── vector v0.11.0 (/home/jesse/workspace/vector)
    ├── rusoto_core v0.45.0
    │   ├── rusoto_cloudwatch v0.45.0
    │   │   └── vector v0.11.0 (/home/jesse/workspace/vector)
    │   ├── rusoto_firehose v0.45.0
    │   │   └── vector v0.11.0 (/home/jesse/workspace/vector)
    │   ├── rusoto_kinesis v0.45.0
    │   │   └── vector v0.11.0 (/home/jesse/workspace/vector)
    │   ├── rusoto_logs v0.45.0
    │   │   └── vector v0.11.0 (/home/jesse/workspace/vector)
    │   ├── rusoto_s3 v0.45.0
    │   │   └── vector v0.11.0 (/home/jesse/workspace/vector)
    │   ├── rusoto_sts v0.45.0
    │   │   └── vector v0.11.0 (/home/jesse/workspace/vector)
    │   └── vector v0.11.0 (/home/jesse/workspace/vector)
    └── rusoto_signature v0.45.0
        ├── rusoto_core v0.45.0 (*)
        └── vector v0.11.0 (/home/jesse/workspace/vector)

Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
To avoid acking SQS

Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
@jszwedko
Copy link
Member Author

Noting for myself that I need to update this to not allow timeout_secs to apply for the multiline aggregation.

Copy link
Contributor

@JeanMertz JeanMertz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sweet!

Left some comments, nothing major, except for maybe Windows newlines.

docs/reference/components/sources/aws_s3.cue Outdated Show resolved Hide resolved
docs/reference/components/sources/aws_s3.cue Outdated Show resolved Hide resolved
docs/reference/components/sources/aws_s3.cue Outdated Show resolved Hide resolved
docs/reference/components/sources/aws_s3.cue Outdated Show resolved Hide resolved
src/internal_events/aws_s3.rs Outdated Show resolved Hide resolved
src/sources/aws_s3/sqs.rs Outdated Show resolved Hide resolved
src/sources/aws_s3/sqs.rs Show resolved Hide resolved
src/sources/aws_s3/sqs.rs Outdated Show resolved Hide resolved
src/sources/aws_s3/sqs.rs Outdated Show resolved Hide resolved
// https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html#supported-notification-event-types
//
// we could enums here, but that seems overly brittle as deserialization would break if they add
// new event types or names
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we still use enums, but have a catch all that at least logs if something is received that's unexpected (or just ignore "other")?

Still, this seems fine by me as well 👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I'll investigate this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I came back around the other way on this one 😄 I'd like to, at least, recognize any new ObjectCreated events without needing to add new enum kinds. I'll just leave this as-is until we find a need.

Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
@jszwedko
Copy link
Member Author

jszwedko commented Nov 3, 2020

@bruceg do you mind giving this one another look?

Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Copy link
Member

@bruceg bruceg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one suggestion about a long function, nothing blocking though.

Comment on lines +214 to +246
let message_id = message
.message_id
.clone()
.unwrap_or_else(|| "<unknown>".to_owned());

match self.handle_sqs_message(message, out.clone()).await {
Ok(()) => {
emit!(SqsMessageProcessingSucceeded {
message_id: &message_id
});
if self.delete_message {
match self.delete_message(receipt_handle).await {
Ok(_) => {
emit!(SqsMessageDeleteSucceeded {
message_id: &message_id
});
}
Err(err) => {
emit!(SqsMessageDeleteFailed {
error: &err,
message_id: &message_id,
});
}
}
}
}
Err(err) => {
emit!(SqsMessageProcessingFailed {
message_id: &message_id,
error: &err,
});
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is already pretty long. I would tend to suggest moving this all either into handle_sqs_message or into a new wrapper function, and then moving the call to it into the Some(ref handle) above so you can drop the continue.

I mistaken thought derivative default would handle this.

Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
@jszwedko
Copy link
Member Author

jszwedko commented Nov 6, 2020

Note for myself that I broke the deseralization of SQS messages in a recent commit due to relying on the semvar's serde support which requires a fully formed version number (e.g. 2.1.0 rather than 2.1). I opened dtolnay/semver#219 but I'll probably back this out to do custom parsing for now until upstream support is added (if it is at all).

Since semver requires the patch version

Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
@jszwedko
Copy link
Member Author

Windows failure is the memory issue that also exists on master.

@jszwedko jszwedko merged commit 30dc1b8 into master Nov 10, 2020
@jszwedko jszwedko deleted the aws-s3-source branch November 10, 2020 22:43
mengesb pushed a commit to jacobbraaten/vector that referenced this pull request Dec 9, 2020
See RFC: https://github.com/timberio/vector/blob/master/rfcs/2020-09-29-4155-aws-s3-source.md
Fixes vectordotdev#1017

This is the initial implementation of an `aws_s3` source that relies an AWS SQS for bucket notifications to inform of new S3 objects to consume. See RFC for discussion of other approaches and why this one was chosen. The source does have an enum `strategy` configuration to allow for additional approaches (like SNS or long polling) to be supported.

The basic flow is:

User setup:
* bucket is created
* queue is created
* bucket is configured to notify the queue for ObjectCreated events
* vector is configured with the `aws_s3` source using the queue configuration whereupon it will process the ObjectCreated events to read each S3 object.

Example configuration:

```toml
[sources.s3]
  type = "aws_s3"

  region = "us-east-1"
  strategy = "sqs"
  compression = "gzip"

  sqs.queue_name = "jesse-test-flow-logs"
  sqs.delete_message = false

[sinks.console]
  type = "console"
  inputs = ["s3"]
  encoding.codec = "json"
```

The commits can be viewed in-order, but the resulting diff probably isn't to bad either (it's mostly just `sources/aws_s3`). It may be worth looking at the added cue documentation first.

The source also behaves very much like the `file` source where it emits one event per-line, but also supports the same multiline configuration that the `file` source supports. **Note** there is a rough edge here where the `multiline` config supports a `timeout_ms` option that isn't really applicable here but is applied just the same.

Future work:
1. Additional codec support (like `application/ndjson`). For now, this acts very much like the `file` source. This could be looped into the general work around codecs  vectordotdev#4278
2. Additional compression formats (Snappy, LZ4, Zip). This was requested in the original issue. I started with just supporting the formats that were supported out-of-the-box by the `async_compression` crate we are using.
3. (potential) multi-region support. Currently we only support reading from a queue and a bucket in the same region. I expect this will cover most cases since AWS requires the bucket to publish notifications to a queue in the same region. One could forward messages from a queue in one region to another, but this seems unlikely. I'd prefer to wait and see if anyone asks for multi-region support; especially given that fluentbit and filebeat have the same restriction.
4. Concurrent processing. Right now one message is processed at a time which leads to predictable behavior, but we may observe some performance improvements by processing multiple objects at once. The benefit should be vetted though, the process may be limited by incoming network bandwidth anyway.
5. Refresh the message visibility timeout. Right now, the visibility timeout is set once, when the message is retrieved, but we could refresh this timeout if we are still processing a message when it gets close to the end of the timeout to avoid another vector instance picking it up. This would let users have the best of both worlds: short visibility timeouts to quickly reprocess messages when a `vector` instance falls over, but also avoiding concurrent processing of messages for large objects where the processing time exceeds the visibility timeout.

I'll create issues for 2 and 5. I think the others can be left until we observe their necessity.

Signed-off-by: Brian Menges <brian.menges@anaplan.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

New aws_s3 source
3 participants