Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple metrics parsers as codecs #4278

Closed
binarylogic opened this issue Oct 2, 2020 · 1 comment
Closed

Decouple metrics parsers as codecs #4278

binarylogic opened this issue Oct 2, 2020 · 1 comment
Labels
domain: codecs Anything related to Vector's codecs (encoding/decoding) domain: metrics Anything related to Vector's metrics events type: enhancement A value-adding code change that enhances its existing functionality.

Comments

@binarylogic
Copy link
Contributor

Like some of our logs sources (syslog), our metrics sources are just ingesting data over a protocol and parsing them into our internal metrics data model. Coupling these parsers with their sources means we cannot reuse parsers with other sources as demonstrated in #3984.

This, again, fits into our larger discussion of decoupling codecs from sources in a way that makes it easy to compose various combinations. And macros (#3791) would make it easy to redefine our current sources using lower-level sources. Ex: the statsd source could just wrap the socket source with a statsd codec.

@binarylogic binarylogic added type: enhancement A value-adding code change that enhances its existing functionality. domain: codecs Anything related to Vector's codecs (encoding/decoding) domain: metrics Anything related to Vector's metrics events labels Oct 2, 2020
jszwedko added a commit that referenced this issue Nov 10, 2020
See RFC: https://github.com/timberio/vector/blob/master/rfcs/2020-09-29-4155-aws-s3-source.md
Fixes #1017 

This is the initial implementation of an `aws_s3` source that relies an AWS SQS for bucket notifications to inform of new S3 objects to consume. See RFC for discussion of other approaches and why this one was chosen. The source does have an enum `strategy` configuration to allow for additional approaches (like SNS or long polling) to be supported.

The basic flow is:

User setup:
* bucket is created
* queue is created
* bucket is configured to notify the queue for ObjectCreated events
* vector is configured with the `aws_s3` source using the queue configuration whereupon it will process the ObjectCreated events to read each S3 object.

Example configuration:

```toml
[sources.s3]
  type = "aws_s3"

  region = "us-east-1"
  strategy = "sqs"
  compression = "gzip"

  sqs.queue_name = "jesse-test-flow-logs"
  sqs.delete_message = false

[sinks.console]
  type = "console"
  inputs = ["s3"]
  encoding.codec = "json"
```

The commits can be viewed in-order, but the resulting diff probably isn't to bad either (it's mostly just `sources/aws_s3`). It may be worth looking at the added cue documentation first.

The source also behaves very much like the `file` source where it emits one event per-line, but also supports the same multiline configuration that the `file` source supports. **Note** there is a rough edge here where the `multiline` config supports a `timeout_ms` option that isn't really applicable here but is applied just the same.

Future work:
1. Additional codec support (like `application/ndjson`). For now, this acts very much like the `file` source. This could be looped into the general work around codecs  #4278
2. Additional compression formats (Snappy, LZ4, Zip). This was requested in the original issue. I started with just supporting the formats that were supported out-of-the-box by the `async_compression` crate we are using.
3. (potential) multi-region support. Currently we only support reading from a queue and a bucket in the same region. I expect this will cover most cases since AWS requires the bucket to publish notifications to a queue in the same region. One could forward messages from a queue in one region to another, but this seems unlikely. I'd prefer to wait and see if anyone asks for multi-region support; especially given that fluentbit and filebeat have the same restriction.
4. Concurrent processing. Right now one message is processed at a time which leads to predictable behavior, but we may observe some performance improvements by processing multiple objects at once. The benefit should be vetted though, the process may be limited by incoming network bandwidth anyway.
5. Refresh the message visibility timeout. Right now, the visibility timeout is set once, when the message is retrieved, but we could refresh this timeout if we are still processing a message when it gets close to the end of the timeout to avoid another vector instance picking it up. This would let users have the best of both worlds: short visibility timeouts to quickly reprocess messages when a `vector` instance falls over, but also avoiding concurrent processing of messages for large objects where the processing time exceeds the visibility timeout.

I'll create issues for 2 and 5. I think the others can be left until we observe their necessity.
mengesb pushed a commit to jacobbraaten/vector that referenced this issue Dec 9, 2020
See RFC: https://github.com/timberio/vector/blob/master/rfcs/2020-09-29-4155-aws-s3-source.md
Fixes vectordotdev#1017

This is the initial implementation of an `aws_s3` source that relies an AWS SQS for bucket notifications to inform of new S3 objects to consume. See RFC for discussion of other approaches and why this one was chosen. The source does have an enum `strategy` configuration to allow for additional approaches (like SNS or long polling) to be supported.

The basic flow is:

User setup:
* bucket is created
* queue is created
* bucket is configured to notify the queue for ObjectCreated events
* vector is configured with the `aws_s3` source using the queue configuration whereupon it will process the ObjectCreated events to read each S3 object.

Example configuration:

```toml
[sources.s3]
  type = "aws_s3"

  region = "us-east-1"
  strategy = "sqs"
  compression = "gzip"

  sqs.queue_name = "jesse-test-flow-logs"
  sqs.delete_message = false

[sinks.console]
  type = "console"
  inputs = ["s3"]
  encoding.codec = "json"
```

The commits can be viewed in-order, but the resulting diff probably isn't to bad either (it's mostly just `sources/aws_s3`). It may be worth looking at the added cue documentation first.

The source also behaves very much like the `file` source where it emits one event per-line, but also supports the same multiline configuration that the `file` source supports. **Note** there is a rough edge here where the `multiline` config supports a `timeout_ms` option that isn't really applicable here but is applied just the same.

Future work:
1. Additional codec support (like `application/ndjson`). For now, this acts very much like the `file` source. This could be looped into the general work around codecs  vectordotdev#4278
2. Additional compression formats (Snappy, LZ4, Zip). This was requested in the original issue. I started with just supporting the formats that were supported out-of-the-box by the `async_compression` crate we are using.
3. (potential) multi-region support. Currently we only support reading from a queue and a bucket in the same region. I expect this will cover most cases since AWS requires the bucket to publish notifications to a queue in the same region. One could forward messages from a queue in one region to another, but this seems unlikely. I'd prefer to wait and see if anyone asks for multi-region support; especially given that fluentbit and filebeat have the same restriction.
4. Concurrent processing. Right now one message is processed at a time which leads to predictable behavior, but we may observe some performance improvements by processing multiple objects at once. The benefit should be vetted though, the process may be limited by incoming network bandwidth anyway.
5. Refresh the message visibility timeout. Right now, the visibility timeout is set once, when the message is retrieved, but we could refresh this timeout if we are still processing a message when it gets close to the end of the timeout to avoid another vector instance picking it up. This would let users have the best of both worlds: short visibility timeouts to quickly reprocess messages when a `vector` instance falls over, but also avoiding concurrent processing of messages for large objects where the processing time exceeds the visibility timeout.

I'll create issues for 2 and 5. I think the others can be left until we observe their necessity.

Signed-off-by: Brian Menges <brian.menges@anaplan.com>
@jszwedko
Copy link
Member

jszwedko commented Aug 4, 2022

Closing since we have the scaffolding in place for this. We can create new codecs as-needed.

@jszwedko jszwedko closed this as completed Aug 4, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: codecs Anything related to Vector's codecs (encoding/decoding) domain: metrics Anything related to Vector's metrics events type: enhancement A value-adding code change that enhances its existing functionality.
Projects
None yet
Development

No branches or pull requests

2 participants