Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New aws_s3 source #1017

Closed
ticon-mg opened this issue Oct 11, 2019 · 17 comments · Fixed by #4779
Closed

New aws_s3 source #1017

ticon-mg opened this issue Oct 11, 2019 · 17 comments · Fixed by #4779
Assignees
Labels
have: should We should have this feature, but is not required. It is medium priority. needs: rfc Needs an RFC before work can begin. provider: aws Anything `aws` service provider related type: feature A value-adding code addition that introduce new functionality.

Comments

@ticon-mg
Copy link

Needs Source: AWS S3

@binarylogic
Copy link
Contributor

@ticon-mg, to clarify, you're referring to source archives? You can download those from the Github repo and releases in the interim.

@binarylogic binarylogic added domain: operations type: enhancement A value-adding code change that enhances its existing functionality. labels Oct 11, 2019
@ticon-mg
Copy link
Author

Hi!
I am talking about the source of the log files for processing vector.

@binarylogic
Copy link
Contributor

Ah, that makes more sense. Thanks!

@binarylogic binarylogic added type: new feature and removed domain: operations type: enhancement A value-adding code change that enhances its existing functionality. labels Oct 11, 2019
@binarylogic binarylogic changed the title Source: AWS S3 New aws_s3 source Oct 11, 2019
@zcapper
Copy link

zcapper commented Feb 26, 2020

+1

Use cases
Lots of AWS services write activity and security logs to S3 (CloudTrails, ELB access logs, VPC flow logs, AWS Config events, etc). An AWS S3 sink would allow S3 to act as a data pipeline here, with Vector as the data router receiving and routing new data.

Implementation

  • Similar to Splunk's S3 connector, I'd suggest using S3 event notifications written to an SQS queue.
  • The sink would only need to know the SQS queue ARN/endpoint.
  • As the queue messages contain the bucket name and file key, one sink config could potentially read off multiple buckets.
  • Users can control what's sent to SQS/Vector through S3 Notification filter rules.

@nhlushak
Copy link

nhlushak commented Mar 4, 2020

@zcapper
But does s3 event notifications allow to process s3 data that is already stored in a bucket?
I guess polling is more flexible solution, like it is introduced in Logstash.

@zcapper
Copy link

zcapper commented Mar 5, 2020

@NikitaGl
You're right that the event notification approach wouldn't allow pre-existing data to be backfilled. (I do wonder how Logstash tracks state so it doesn't fetch files twice, e.g. after a restart).

I'm fond of the event notifications approach because it's stateless (in Vector) and could be scaled horizontally to work with very large buckets and inventories.

FWIW Splunk seems to have both a generic and event notification based S3 connectors.

@binarylogic
Copy link
Contributor

I think an interesting approach is to treat S3 like a file system. There are utilities that do exactly this, but I’m not advocating them as a solution for Vector, but more as a mental frame to think through.

@rverma-jm
Copy link

rverma-jm commented Apr 22, 2020

The right approach and what fluentd is doing is to use s3-sqs system, which includes basically reading filename from sqs message and read corresponding data. Fluentd has similar plugin as https://github.com/fluent/fluent-plugin-s3. The issue with a generic plugin is detection of new file require to list all file. As the number of files increase, time taken to discover new file increases, so as api cost for continuous list. Sqs would be way cheaper.

@szibis
Copy link
Contributor

szibis commented Apr 28, 2020

What would be great to use the S3 sink and S3 source with the same features to have that as failover to deliver things reliably?
For Example, if we have distributed Vector configuration with Kafka we would have the ability to build failover through S3 if something wrong with Kafka.
If we will have some metrics from internal sinks exposed we can use conditions to detect some problems with Kafka sink and start also sending data to S3 with SQS exposed info or direct from Vector infrastructure agents.
In the end, we will have our consumer Vector infrastructure that will just consume things from Kafka and S3.
if Kafka stop working or exposing things but S3 starts we will have data (with s3 possible some delays) but still flow will work.
With bigger systems at scale, this would be a very nice addition without building any custom solutions.
We need to have this S3 source and some improvements to S3 sink also please correct me if we will have an option to do actions based on internal metrics from vector sinks when they will be fully announced as I described ??
For only S3 source there are examples from real life like Cloudflare that deliver logs to multiple object stores - https://developers.cloudflare.com/logs/logpush/ and with fluentd-plugin-s3 you can take those logs and this feature at least would be a nice addition.

@rverma-jm
Copy link

rverma-jm commented Apr 29, 2020

I like the idea.

In our context we are not using kafka instead using firehose delivery to convert the records in ORC and deliver to s3. But even with fluentd, I found that when we hit by firehose throttling the worker keep on taking data from queue our buffer keep on growing. This is causing good data loss and we are forced to run firehose in over povisioned mode. The downside is that we are getting small chunks in s3.
It would be cool if fluentd stop taking new data from queue at all. This solves the complete buffering problem and kind of prevent data loss at the first place.

Also from our setup we are using fluentd copy to push data to firehose and grep filter, so that we push only write data to elasticsearch. Probably we can something more robust mechanism to handle these downstream to handle some destination based buffering size.

Something like

s3/sqs. --->.   vector-input --> 10MB buffer. --->   firehose
                             --> 200MB buffer --->  elasticsearch

PS Firehose ingestion is way faster than es ingestion, and it itself do buffering, we can easily do vector-input -> firehose -> elasticsearch, but why not save some money if you can :)

@binarylogic binarylogic added needs: approval Needs review & approval before work can begin. needs: requirements Needs a a list of requirements before work can be begin labels Apr 29, 2020
@binarylogic binarylogic added type: feature A value-adding code addition that introduce new functionality. and removed type: new feature labels Jun 16, 2020
@binarylogic binarylogic added have: should We should have this feature, but is not required. It is medium priority. provider: aws Anything `aws` service provider related labels Aug 7, 2020
@rrichardson
Copy link
Contributor

rrichardson commented Sep 17, 2020

I am interested in attempting to build this source plugin. As the label says, it needs requirements. What would be a good minimal set of functionality/options for this source?

As input config options, I guess its a sort of union of File and Http sources:

[sources.my_source_id]
  type = "aws_s3" # required
  bucket = "s3://my-bucket" # required  - (I'm assuming we want to use the S3 protocol, and not generic http through the s3 http server) 
  ignore_older = 86400 # optional, no default, seconds
  include = ["/var/log/nginx/*.log"] # required
  start_at_beginning = false # optional, default
  polling_interval_ms = 30000
  encoding = "json"
  folder_template = "AWSLogs/<my account id>/elasticloadbalancing/us-west-2/%Y/%m/%D/*.log.gz"

Edit

Note, I, personally, am more interested in a polling model than using SQS for events.
I think with judicious use of the folder template, as well as a sane polling interval, I think we could limit the number of API calls per day. The plugin could be enhanced later with optional SQS subscriptions.

Thoughts?

@binarylogic binarylogic added needs: rfc Needs an RFC before work can begin. and removed needs: approval Needs review & approval before work can begin. needs: requirements Needs a a list of requirements before work can be begin labels Sep 17, 2020
@jszwedko
Copy link
Member

jszwedko commented Sep 17, 2020

Hi @rrichardson,

Thanks for your interest in contributing this! We've had a number of people ask for it.

Typically, for a new source like this where there are some decisions to be made, we like to do an RFC first to allow us to discuss and make those decisions before implementation; the goal being to reduce the amount of rework that might be needed if the discussion happened as part of the pull request. An example of this is the recently added apache_metrics source.

Would you be open to writing up a short RFC based on the template? I'm happy to focus on giving you support in the form of feedback and suggestions in the RFC and the PR. I think what you posted is a good start. I'd suggest thinking about:

  • How we would model the configuration in a way that makes it easy to add the S3 strategy of relying on SQS for bucket notifications (as this is also a strategy that people have asked for). If you feel up to it, it would be helpful to lay out the configuration model that would cover both strategies (polling and SQS) and just implement the polling one, which you want, for now so that we can avoid needing to change the configuration in backwards incompatible ways.
  • How we would keep track of which objects have already been ingested by Vector when polling. The file source checkpointing might be useful prior-art for this.

Thanks again! Let me know if I can offer any additional guidance.

@jszwedko
Copy link
Member

@szibis Thanks for your notes too. We do have a new issue for "dead letter" behavior: #1772

@rrichardson
Copy link
Contributor

@jszwedko - Thanks. I will put together an RFC. I am trying to wrap up a couple projects first, but I have this scheduled in my next sprint. :)

@jamtur01
Copy link
Contributor

Some notes from another user:

One of the challenges with collection from S3 is that polling/scanning a bucket does not scale well; AWS only allows you to list the contents against a prefix (max 1000 objects at a time) and it's possible to see scanning times extending to hours, or even multiple days.

Three documented options exist for collection from S3 without needing to re-scan the bucket:

  1. SQS for pull-based notifications of new objects, and
  2. SNS for push-based notifications.
  3. Another option is to configure S3 to write details of new object creation into DynamoDB and then poll DynamoDB for details of objects created since X.

SQS is probably the best approach for typical collectors; it relies on a client to connect to the SQS and retrieve details of new events. SNS is probably the best approach for SaaS-based collection; SNS can proactively notify a publicly accessible endpoint that a new object exists. DynamoDB is probably the best approach where more specific details need to be kept about events, or a permanent record maintained of what assets are available in a large S3 bucket (archive use case?).

  • Should work with programmatic AWS access (key + secret)
  • Should be possible to set bucket name, prefix, and filename match pattern; examples:
Prefix: AWSLogs/041856772135/CloudTrail/* (all CloudTrail logs)
Filename: *CloudTrail*.json.gz
  • Should support plain, gzip, zip, and Snappy/Zippy compression (formats supported by Kinesis output to S3).
  • Should have configurable polling interval to SQS
  • Should have logging/metrics of number of objects found and processed
  • Provide support for reading/processing CloudTrail and CloudWatch logs. CloudWatch logs via Kinesis are presented as multiple non-line-separated (no newlines in file) JSON arrays (example here; note two “{"messageType"” blocks). These should be split into their individual events (i.e. the logEvents array) AND the top level metadata added to each event (owner, logGroup, logStream, subscriptionFilters). CloudTrail logs have a similar format, these have arrays of “Records” requiring similar processing to CloudWatch via Kinesis.

@jszwedko jszwedko self-assigned this Sep 25, 2020
@jamtur01 jamtur01 added this to the 2020-09-28 - Derezzed milestone Sep 25, 2020
@jszwedko
Copy link
Member

jszwedko commented Sep 28, 2020

Hey @rrichardson,

I'm actually going to be starting on an implementation of this source using SQS for bucket notifications this week. If you were also planning on working on this soon, I'd love to collaborate on an RFC and make sure I don't step on your toes with implementation. We have a discord (https://discord.gg/sfFzZ6) that we could use for more real-time communication.

Otherwise, I can ping you when the RFC is up (hopefully tomorrow) and you can leave any feedback. I'll plan to leave a hole in the vector configuration spec for the polling strategy that you can fill in later.

@jszwedko
Copy link
Member

RFC: #4197 . I proposed configuration specifying the SQS implementation as a "strategy" to allow room for specifying a polling strategy as well. cc/ @rrichardson

jszwedko added a commit that referenced this issue Nov 10, 2020
See RFC: https://github.com/timberio/vector/blob/master/rfcs/2020-09-29-4155-aws-s3-source.md
Fixes #1017 

This is the initial implementation of an `aws_s3` source that relies an AWS SQS for bucket notifications to inform of new S3 objects to consume. See RFC for discussion of other approaches and why this one was chosen. The source does have an enum `strategy` configuration to allow for additional approaches (like SNS or long polling) to be supported.

The basic flow is:

User setup:
* bucket is created
* queue is created
* bucket is configured to notify the queue for ObjectCreated events
* vector is configured with the `aws_s3` source using the queue configuration whereupon it will process the ObjectCreated events to read each S3 object.

Example configuration:

```toml
[sources.s3]
  type = "aws_s3"

  region = "us-east-1"
  strategy = "sqs"
  compression = "gzip"

  sqs.queue_name = "jesse-test-flow-logs"
  sqs.delete_message = false

[sinks.console]
  type = "console"
  inputs = ["s3"]
  encoding.codec = "json"
```

The commits can be viewed in-order, but the resulting diff probably isn't to bad either (it's mostly just `sources/aws_s3`). It may be worth looking at the added cue documentation first.

The source also behaves very much like the `file` source where it emits one event per-line, but also supports the same multiline configuration that the `file` source supports. **Note** there is a rough edge here where the `multiline` config supports a `timeout_ms` option that isn't really applicable here but is applied just the same.

Future work:
1. Additional codec support (like `application/ndjson`). For now, this acts very much like the `file` source. This could be looped into the general work around codecs  #4278
2. Additional compression formats (Snappy, LZ4, Zip). This was requested in the original issue. I started with just supporting the formats that were supported out-of-the-box by the `async_compression` crate we are using.
3. (potential) multi-region support. Currently we only support reading from a queue and a bucket in the same region. I expect this will cover most cases since AWS requires the bucket to publish notifications to a queue in the same region. One could forward messages from a queue in one region to another, but this seems unlikely. I'd prefer to wait and see if anyone asks for multi-region support; especially given that fluentbit and filebeat have the same restriction.
4. Concurrent processing. Right now one message is processed at a time which leads to predictable behavior, but we may observe some performance improvements by processing multiple objects at once. The benefit should be vetted though, the process may be limited by incoming network bandwidth anyway.
5. Refresh the message visibility timeout. Right now, the visibility timeout is set once, when the message is retrieved, but we could refresh this timeout if we are still processing a message when it gets close to the end of the timeout to avoid another vector instance picking it up. This would let users have the best of both worlds: short visibility timeouts to quickly reprocess messages when a `vector` instance falls over, but also avoiding concurrent processing of messages for large objects where the processing time exceeds the visibility timeout.

I'll create issues for 2 and 5. I think the others can be left until we observe their necessity.
mengesb pushed a commit to jacobbraaten/vector that referenced this issue Dec 9, 2020
See RFC: https://github.com/timberio/vector/blob/master/rfcs/2020-09-29-4155-aws-s3-source.md
Fixes vectordotdev#1017

This is the initial implementation of an `aws_s3` source that relies an AWS SQS for bucket notifications to inform of new S3 objects to consume. See RFC for discussion of other approaches and why this one was chosen. The source does have an enum `strategy` configuration to allow for additional approaches (like SNS or long polling) to be supported.

The basic flow is:

User setup:
* bucket is created
* queue is created
* bucket is configured to notify the queue for ObjectCreated events
* vector is configured with the `aws_s3` source using the queue configuration whereupon it will process the ObjectCreated events to read each S3 object.

Example configuration:

```toml
[sources.s3]
  type = "aws_s3"

  region = "us-east-1"
  strategy = "sqs"
  compression = "gzip"

  sqs.queue_name = "jesse-test-flow-logs"
  sqs.delete_message = false

[sinks.console]
  type = "console"
  inputs = ["s3"]
  encoding.codec = "json"
```

The commits can be viewed in-order, but the resulting diff probably isn't to bad either (it's mostly just `sources/aws_s3`). It may be worth looking at the added cue documentation first.

The source also behaves very much like the `file` source where it emits one event per-line, but also supports the same multiline configuration that the `file` source supports. **Note** there is a rough edge here where the `multiline` config supports a `timeout_ms` option that isn't really applicable here but is applied just the same.

Future work:
1. Additional codec support (like `application/ndjson`). For now, this acts very much like the `file` source. This could be looped into the general work around codecs  vectordotdev#4278
2. Additional compression formats (Snappy, LZ4, Zip). This was requested in the original issue. I started with just supporting the formats that were supported out-of-the-box by the `async_compression` crate we are using.
3. (potential) multi-region support. Currently we only support reading from a queue and a bucket in the same region. I expect this will cover most cases since AWS requires the bucket to publish notifications to a queue in the same region. One could forward messages from a queue in one region to another, but this seems unlikely. I'd prefer to wait and see if anyone asks for multi-region support; especially given that fluentbit and filebeat have the same restriction.
4. Concurrent processing. Right now one message is processed at a time which leads to predictable behavior, but we may observe some performance improvements by processing multiple objects at once. The benefit should be vetted though, the process may be limited by incoming network bandwidth anyway.
5. Refresh the message visibility timeout. Right now, the visibility timeout is set once, when the message is retrieved, but we could refresh this timeout if we are still processing a message when it gets close to the end of the timeout to avoid another vector instance picking it up. This would let users have the best of both worlds: short visibility timeouts to quickly reprocess messages when a `vector` instance falls over, but also avoiding concurrent processing of messages for large objects where the processing time exceeds the visibility timeout.

I'll create issues for 2 and 5. I think the others can be left until we observe their necessity.

Signed-off-by: Brian Menges <brian.menges@anaplan.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
have: should We should have this feature, but is not required. It is medium priority. needs: rfc Needs an RFC before work can begin. provider: aws Anything `aws` service provider related type: feature A value-adding code addition that introduce new functionality.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

9 participants