Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New aws_kinesis_firehose source to read forwarded AWS CloudWatch Logs #3566

Closed
jszwedko opened this issue Aug 25, 2020 · 4 comments · Fixed by #4101
Closed

New aws_kinesis_firehose source to read forwarded AWS CloudWatch Logs #3566

jszwedko opened this issue Aug 25, 2020 · 4 comments · Fixed by #4101
Assignees
Labels
domain: logs Anything related to Vector's log events domain: sources Anything related to the Vector's sources provider: aws Anything `aws` service provider related type: feature A value-adding code addition that introduce new functionality.

Comments

@jszwedko
Copy link
Member

jszwedko commented Aug 25, 2020

This issue represents the introduction of a new source that can be used as the HTTP endpoint for an AWS Kinesis Firehose destination.

This source will specifically be designed to read forwarded AWS CloudWatch Logs first (format, but could be expanded to other Firehose sources in the future.

I recommend a spike into this to discover and answer open questions and, possibly, an RFC if there appear to be more decisions to be made.

Other work that will probably need to happen as part of this:

  • Creating a guide on vector.dev for setting this up
  • Streamlining the AWS configuration via cloudformation, terraform module, or something else
@jszwedko jszwedko added domain: sources Anything related to the Vector's sources provider: aws Anything `aws` service provider related type: feature A value-adding code addition that introduce new functionality. domain: logs Anything related to Vector's log events labels Aug 25, 2020
@andreyvital
Copy link

This will be pretty relevant now! Check this out: https://aws.amazon.com/about-aws/whats-new/2020/08/cloudfront-realtimelogs/

You can also easily deliver these logs to a generic HTTP endpoint using Amazon Kinesis Data Firehose.

I'd love to use vector to ingest such data...

🤩

@jszwedko
Copy link
Member Author

Jotting some notes here as I explore this.

I set up the pipeline to publish CloudWatch Logs -> Firehose -> HTTP endpoint (largely following https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html).

Here is an example request for forwarded CloudWatch Logs (including headers): https://gist.github.com/jszwedko/46bf9338c737cd3b1ac5f5cd39c48daa

The data part of each record is base64-encoded gzipped data. Decoding the first one in that request looks like (trimmed):

{
  "messageType": "DATA_MESSAGE",
  "owner": "071959437513",
  "logGroup": "/jesse/test",
  "logStream": "test",
  "subscriptionFilters": [
    "Destination"
  ],
  "logEvents": [
    {
      "id": "35683658089614582423604394983260738922885519999578275840",
      "timestamp": 1600110569039,
      "message": "{\"bytes\":26780,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"157.130.216.193\",\"method\":\"PUT\",\"protocol\":\"HTTP/1.0\",\"referer\":\"https://www.principalcross-platform.io/markets/ubiquitous\",\"request\":\"/expedite/convergence\",\"source_type\":\"stdin\",\"status\":301,\"user-identifier\":\"-\"}"
    },
    {
      "id": "35683658089659183914001456229543810359430816722590236673",
      "timestamp": 1600110569041,
      "message": "{\"bytes\":17707,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"109.81.244.252\",\"method\":\"GET\",\"protocol\":\"HTTP/2.0\",\"referer\":\"http://www.investormission-critical.io/24/7/vortals\",\"request\":\"/scale/functionalities/optimize\",\"source_type\":\"stdin\",\"status\":502,\"user-identifier\":\"feeney1708\"}"
    },
...

Curiously there doesn't appear to be any sort of enum in the records or headers to determine that the incoming events are from CloudWatch Logs. It seems like we'll simply need to rely on the schema matching (perhaps by just looking for a logEvents key).

Firehose also pings with control messages on a regular basis. The content of the records looks like:

{"messageType":"CONTROL_MESSAGE","owner":"CloudwatchLogs","logGroup":"","logStream":"","subscriptionFilters":[],"logEvents":[{"id":"","timestamp":1600110003794,"message":"CWL CONTROL MESSAGE: Checking health of destination Firehose."}]}

@andreyvital
Copy link

@jszwedko have you tested the CloudFront integration? Curious to see what you think...

Curiously there doesn't appear to be any sort of enum in the records or headers to determine that the incoming events are from CloudWatch Logs.

Yeah that's odd. I'm not 100% into how Vector is internally architected, but I guess you'd need it to skip invalid entries & for the input model?

@jszwedko
Copy link
Member Author

I haven't tried the CloudFront integration just yet. This initial pass will be focused on CloudWatch Logs, but it should be easy to add support for additional services like CloudFront once the pattern is set.

Yeah that's odd. I'm not 100% into how Vector is internally architected, but I guess you'd need it to skip invalid entries & for the input model?

Yeah, my realization was that Kinesis Firehose is similar to Kinesis in that it is just passing arbitrary bytes around. It's up to the consumer and producer to coordinate what those bytes mean. In this case, we'll need to rely on the user configuring vector to indicate that messages being passed via Firehose are from an AWS CloudWatch Logs subscription so that we can parse them as such.

jszwedko added a commit that referenced this issue Oct 9, 2020
…ransform (#4101)

This PR represents the initial implementation of the the Firehose source.

It also introduces an `aws_cloudwatch_logs_subscription_parser` transform to handle the transformation of CloudWatch Log events. I opted not to split this off into a separate PR because I think its existence is relevant to the overall discussion of this source implementation as a goal is to facilitate getting CloudWatch Logs to vector.

Closes #3566

I tried to keep the commits relatively atomic (aside from the first one) so it may be useful to review them in order. Otherwise I'd recommend reviewing the the source first and then the transform.

This source ingests arbitrary data forwarded by AWS Kinesis Firehose via their [HTTP forwarder](https://docs.aws.amazon.com/firehose/latest/dev/create-destination.html#create-destination-http).

Some design decisions:

I opted to implement the Firehose source using `warp` directly rather than using the `HttpSource` trait primarily because of Firehose's requirement of a [specific HTTP response structure](https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html). I modeled it pretty closely after warp's [todo example](https://github.com/seanmonstar/warp/blob/master/examples/todos.rs) which I think resulted in pretty good modeling.

I was initially going to make parsing of CloudWatch Log subscription events part of the Firehose source (implementing it as a `codec`), but, after starting in on this, it became clear to me that Firehose is capable of forwarding arbitrary payloads, which makes it more like the `kafka` source. Up until now, any decoding and parsing in `vector` has been delegated downstream to transforms so, instead, I introduced an `aws_cloudwatch_logs_subscription_parser` that is able to parse out the [JSON structure of subscription events](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#LambdaFunctionExample). An advantage of this approach is that this transform could be used in other circumstances where `vector` is ingesting CloudWatch Logs subscription events.

Specifically for the scenario where a user wants to ingest CloudWatch Logs via Firehose, the steps would look look like:

1. Deploy `vector` somewhere using the added source. Note there is a caveat that the endpoint be exposed as HTTPS. The configuration would look something like:

```toml
[sources.firehose]
  # General
  type = "aws_kinesis_firehose"
  address = "127.0.0.1:9000"
  access_key = "secret"

[transforms.cloudwatch]
  type = "aws_cloudwatch_logs_subscription_parser"
  inputs = ["firehose"]

[transforms.json]
  type = "json_parser"
  inputs = ["cloudwatch"]

[sinks.console]
  type = "console"
  inputs = ["json"]
  encoding.codec = "json"
```

I think this is good fodder for thinking about what configuration macros in vector might look like.
mengesb pushed a commit to jacobbraaten/vector that referenced this issue Dec 9, 2020
…ransform (vectordotdev#4101)

This PR represents the initial implementation of the the Firehose source.

It also introduces an `aws_cloudwatch_logs_subscription_parser` transform to handle the transformation of CloudWatch Log events. I opted not to split this off into a separate PR because I think its existence is relevant to the overall discussion of this source implementation as a goal is to facilitate getting CloudWatch Logs to vector.

Closes vectordotdev#3566

I tried to keep the commits relatively atomic (aside from the first one) so it may be useful to review them in order. Otherwise I'd recommend reviewing the the source first and then the transform.

This source ingests arbitrary data forwarded by AWS Kinesis Firehose via their [HTTP forwarder](https://docs.aws.amazon.com/firehose/latest/dev/create-destination.html#create-destination-http).

Some design decisions:

I opted to implement the Firehose source using `warp` directly rather than using the `HttpSource` trait primarily because of Firehose's requirement of a [specific HTTP response structure](https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html). I modeled it pretty closely after warp's [todo example](https://github.com/seanmonstar/warp/blob/master/examples/todos.rs) which I think resulted in pretty good modeling.

I was initially going to make parsing of CloudWatch Log subscription events part of the Firehose source (implementing it as a `codec`), but, after starting in on this, it became clear to me that Firehose is capable of forwarding arbitrary payloads, which makes it more like the `kafka` source. Up until now, any decoding and parsing in `vector` has been delegated downstream to transforms so, instead, I introduced an `aws_cloudwatch_logs_subscription_parser` that is able to parse out the [JSON structure of subscription events](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#LambdaFunctionExample). An advantage of this approach is that this transform could be used in other circumstances where `vector` is ingesting CloudWatch Logs subscription events.

Specifically for the scenario where a user wants to ingest CloudWatch Logs via Firehose, the steps would look look like:

1. Deploy `vector` somewhere using the added source. Note there is a caveat that the endpoint be exposed as HTTPS. The configuration would look something like:

```toml
[sources.firehose]
  # General
  type = "aws_kinesis_firehose"
  address = "127.0.0.1:9000"
  access_key = "secret"

[transforms.cloudwatch]
  type = "aws_cloudwatch_logs_subscription_parser"
  inputs = ["firehose"]

[transforms.json]
  type = "json_parser"
  inputs = ["cloudwatch"]

[sinks.console]
  type = "console"
  inputs = ["json"]
  encoding.codec = "json"
```

I think this is good fodder for thinking about what configuration macros in vector might look like.

Signed-off-by: Brian Menges <brian.menges@anaplan.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: logs Anything related to Vector's log events domain: sources Anything related to the Vector's sources provider: aws Anything `aws` service provider related type: feature A value-adding code addition that introduce new functionality.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants