Skip to content

Commit

Permalink
Kinesis Firehose Output support
Browse files Browse the repository at this point in the history
  • Loading branch information
ahma committed Jul 19, 2020
1 parent ad05e64 commit e0ef331
Show file tree
Hide file tree
Showing 6 changed files with 336 additions and 2 deletions.
3 changes: 2 additions & 1 deletion docs/plugins/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ For more information please click on the plugin name
| **[Google Cloud Storage](outputs/gcs/)** | outputs | Store logs in Google Cloud Storage | GA | [0.4.0](https://github.com/banzaicloud/fluent-plugin-gcs) |
| **[Http](outputs/http/)** | outputs | Sends logs to HTTP/HTTPS endpoints. | GA | [more info](https://docs.fluentd.org/output/http) |
| **[Kafka](outputs/kafka/)** | outputs | Send your logs to Kafka | GA | [0.13.0](https://github.com/fluent/fluent-plugin-kafka/releases/tag/v0.13.0) |
| **[Amazon Kinesis](outputs/kinesis_stream/)** | outputs | Fluent plugin for Amazon Kinesis | GA | [3.2.2](https://github.com/awslabs/aws-fluent-plugin-kinesis/releases/tag/v3.2.2) |
| **[Amazon Kinesis Firehose](outputs/kinesis_firehose/)** | outputs | Fluent plugin for Amazon Kinesis | Testing | [3.2.2](https://github.com/awslabs/aws-fluent-plugin-kinesis/releases/tag/v3.2.2) |
| **[Amazon Kinesis Stream](outputs/kinesis_stream/)** | outputs | Fluent plugin for Amazon Kinesis | GA | [3.2.2](https://github.com/awslabs/aws-fluent-plugin-kinesis/releases/tag/v3.2.2) |
| **[LogZ](outputs/logz/)** | outputs | Store logs in LogZ.io | GA | [0.0.20](https://github.com/logzio/fluent-plugin-logzio/releases/tag/v0.0.20) |
| **[Grafana Loki](outputs/loki/)** | outputs | Transfer logs to Loki | GA | [1.2.13](https://github.com/grafana/loki/tree/master/fluentd/fluent-plugin-grafana-loki) |
| **[NewRelic Logs](outputs/newrelic/)** | outputs | Send logs to New Relic Logs | GA | [1.1.8](https://github.com/newrelic/newrelic-fluentd-output) |
Expand Down
49 changes: 49 additions & 0 deletions docs/plugins/outputs/kinesis_firehose.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
---
title: Amazon Kinesis
weight: 200
---

# Kinesis Firehose output plugin for Fluentd
## Overview
More info at https://github.com/awslabs/aws-fluent-plugin-kinesis#configuration-kinesis_firehose

#### Example output configurations
```
spec:
kinesisFirehose:
delivery_stream_name: example-stream-name
region: us-east-1
format:
type: json
```

## Configuration
### KinesisStream
#### Send your logs to a Kinesis Stream

| Variable Name | Type | Required | Default | Description |
|---|---|---|---|---|
| delivery_stream_name | string | Yes | - | Name of the delivery stream to put data.<br> |
| append_new_line | *bool | No | - | If it is enabled, the plugin adds new line character (\n) to each serialized record.<br>Before appending \n, plugin calls chomp and removes separator from the end of each record as chomp_record is true. Therefore, you don't need to enable chomp_record option when you use kinesis_firehose output with default configuration (append_new_line is true). If you want to set append_new_line false, you can choose chomp_record false (default) or true (compatible format with plugin v2). (Default:true)<br> |
| aws_key_id | *secret.Secret | No | - | AWS access key id. This parameter is required when your agent is not running on EC2 instance with an IAM Role.<br> |
| aws_sec_key | *secret.Secret | No | - | AWS secret key. This parameter is required when your agent is not running on EC2 instance with an IAM Role.<br> |
| aws_ses_token | *secret.Secret | No | - | AWS session token. This parameter is optional, but can be provided if using MFA or temporary credentials when your agent is not running on EC2 instance with an IAM Role.<br> |
| aws_iam_retries | int | No | - | The number of attempts to make (with exponential backoff) when loading instance profile credentials from the EC2 metadata service using an IAM role. Defaults to 5 retries.<br> |
| assume_role_credentials | *KinesisStreamAssumeRoleCredentials | No | - | Typically, you can use AssumeRole for cross-account access or federation.<br> |
| region | string | No | - | AWS region of your stream. It should be in form like us-east-1, us-west-2. Default nil, which means try to find from environment variable AWS_REGION.<br> |
| retries_on_batch_request | int | No | - | The plugin will put multiple records to Amazon Kinesis Data Streams in batches using PutRecords. A set of records in a batch may fail for reasons documented in the Kinesis Service API Reference for PutRecords. Failed records will be retried retries_on_batch_request times<br> |
| reset_backoff_if_success | bool | No | - | Boolean, default true. If enabled, when after retrying, the next retrying checks the number of succeeded records on the former batch request and reset exponential backoff if there is any success. Because batch request could be composed by requests across shards, simple exponential backoff for the batch request wouldn't work some cases.<br> |
| batch_request_max_count | int | No | - | Integer, default 500. The number of max count of making batch request from record chunk. It can't exceed the default value because it's API limit.<br> |
| batch_request_max_size | int | No | - | Integer. The number of max size of making batch request from record chunk. It can't exceed the default value because it's API limit.<br> |
| format | *Format | No | - | [Format](../format/)<br> |
| buffer | *Buffer | No | - | [Buffer](../buffer/)<br> |
### Assume Role Credentials
#### assume_role_credentials

| Variable Name | Type | Required | Default | Description |
|---|---|---|---|---|
| role_arn | string | Yes | - | The Amazon Resource Name (ARN) of the role to assume<br> |
| role_session_name | string | Yes | - | An identifier for the assumed role session<br> |
| policy | string | No | - | An IAM policy in JSON format<br> |
| duration_seconds | string | No | - | The duration, in seconds, of the role session (900-3600)<br> |
| external_id | string | No | - | A unique identifier that is used by third parties when assuming roles in their customers' accounts.<br> |
153 changes: 153 additions & 0 deletions pkg/sdk/model/output/kinesis_firehose.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright © 2019 Banzai Cloud
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package output

import (
"github.com/banzaicloud/logging-operator/pkg/sdk/model/types"
"github.com/banzaicloud/operator-tools/pkg/secret"
)

// +name:"Amazon Kinesis"
// +weight:"200"
type _hugoKinesisFirehose interface{}

// +docName:"Kinesis Firehose output plugin for Fluentd"
// More info at https://github.com/awslabs/aws-fluent-plugin-kinesis#configuration-kinesis_firehose
//
// #### Example output configurations
// ```
// spec:
// kinesisFirehose:
// delivery_stream_name: example-stream-name
// region: us-east-1
// format:
// type: json
// ```
type _docKinesisFirehose interface{}

// +name:"Amazon Kinesis Firehose"
// +url:"https://github.com/awslabs/aws-fluent-plugin-kinesis/releases/tag/v3.2.2"
// +version:"3.2.2"
// +description:"Fluent plugin for Amazon Kinesis"
// +status:"Testing"
type _metaKinesisFirehose interface{}

// +kubebuilder:object:generate=true
// +docName:"KinesisStream"
// Send your logs to a Kinesis Stream
type KinesisFirehoseOutputConfig struct {
// Name of the delivery stream to put data.
DeliveryStreamName string `json:"delivery_stream_name"`

// If it is enabled, the plugin adds new line character (\n) to each serialized record.
//Before appending \n, plugin calls chomp and removes separator from the end of each record as chomp_record is true. Therefore, you don't need to enable chomp_record option when you use kinesis_firehose output with default configuration (append_new_line is true). If you want to set append_new_line false, you can choose chomp_record false (default) or true (compatible format with plugin v2). (Default:true)
AppendNewLine *bool `json:"append_new_line,omitempty"`

// AWS access key id. This parameter is required when your agent is not running on EC2 instance with an IAM Role.
AWSKeyId *secret.Secret `json:"aws_key_id,omitempty"`

// AWS secret key. This parameter is required when your agent is not running on EC2 instance with an IAM Role.
AWSSECKey *secret.Secret `json:"aws_sec_key,omitempty"`

// AWS session token. This parameter is optional, but can be provided if using MFA or temporary credentials when your agent is not running on EC2 instance with an IAM Role.
AWSSESToken *secret.Secret `json:"aws_ses_token,omitempty"`

// The number of attempts to make (with exponential backoff) when loading instance profile credentials from the EC2 metadata service using an IAM role. Defaults to 5 retries.
AWSIAMRetries int `json:"aws_iam_retries,omitempty"`

// Typically, you can use AssumeRole for cross-account access or federation.
AssumeRoleCredentials *KinesisStreamAssumeRoleCredentials `json:"assume_role_credentials,omitempty"`

// AWS region of your stream. It should be in form like us-east-1, us-west-2. Default nil, which means try to find from environment variable AWS_REGION.
Region string `json:"region,omitempty"`

// The plugin will put multiple records to Amazon Kinesis Data Streams in batches using PutRecords. A set of records in a batch may fail for reasons documented in the Kinesis Service API Reference for PutRecords. Failed records will be retried retries_on_batch_request times
RetriesOnBatchRequest int `json:"retries_on_batch_request,omitempty"`

// Boolean, default true. If enabled, when after retrying, the next retrying checks the number of succeeded records on the former batch request and reset exponential backoff if there is any success. Because batch request could be composed by requests across shards, simple exponential backoff for the batch request wouldn't work some cases.
ResetBackoffIfSuccess bool `json:"reset_backoff_if_success,omitempty"`

// Integer, default 500. The number of max count of making batch request from record chunk. It can't exceed the default value because it's API limit.
BatchRequestMaxCount int `json:"batch_request_max_count,omitempty"`

// Integer. The number of max size of making batch request from record chunk. It can't exceed the default value because it's API limit.
BatchRequestMaxSize int `json:"batch_request_max_size,omitempty"`

// +docLink:"Format,../format/"
Format *Format `json:"format,omitempty"`
// +docLink:"Buffer,../buffer/"
Buffer *Buffer `json:"buffer,omitempty"`
}

// +kubebuilder:object:generate=true
// +docName:"Assume Role Credentials"
// assume_role_credentials
type KinesisFirehoseAssumeRoleCredentials struct {
// The Amazon Resource Name (ARN) of the role to assume
RoleArn string `json:"role_arn"`
// An identifier for the assumed role session
RoleSessionName string `json:"role_session_name"`
// An IAM policy in JSON format
Policy string `json:"policy,omitempty"`
// The duration, in seconds, of the role session (900-3600)
DurationSeconds string `json:"duration_seconds,omitempty"`
// A unique identifier that is used by third parties when assuming roles in their customers' accounts.
ExternalId string `json:"external_id,omitempty"`
}

func (o *KinesisFirehoseAssumeRoleCredentials) ToDirective(secretLoader secret.SecretLoader, id string) (types.Directive, error) {
return types.NewFlatDirective(types.PluginMeta{
Directive: "assume_role_credentials",
}, o, secretLoader)
}

func (e *KinesisFirehoseOutputConfig) ToDirective(secretLoader secret.SecretLoader, id string) (types.Directive, error) {
pluginType := "kinesis_firehose"
kinesis := &types.OutputPlugin{
PluginMeta: types.PluginMeta{
Type: pluginType,
Directive: "match",
Tag: "**",
Id: id,
},
}
if params, err := types.NewStructToStringMapper(secretLoader).StringsMap(e); err != nil {
return nil, err
} else {
kinesis.Params = params
}
if e.AssumeRoleCredentials != nil {
if assumeRoleCredentials, err := e.AssumeRoleCredentials.ToDirective(secretLoader, id); err != nil {
return nil, err
} else {
kinesis.SubDirectives = append(kinesis.SubDirectives, assumeRoleCredentials)
}
}
if e.Buffer != nil {
if buffer, err := e.Buffer.ToDirective(secretLoader, id); err != nil {
return nil, err
} else {
kinesis.SubDirectives = append(kinesis.SubDirectives, buffer)
}
}
if e.Format != nil {
if format, err := e.Format.ToDirective(secretLoader, ""); err != nil {
return nil, err
} else {
kinesis.SubDirectives = append(kinesis.SubDirectives, format)
}
}
return kinesis, nil
}
66 changes: 66 additions & 0 deletions pkg/sdk/model/output/kinesis_firehose_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright © 2019 Banzai Cloud
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package output_test

import (
"testing"

"github.com/banzaicloud/logging-operator/pkg/sdk/model/output"
"github.com/banzaicloud/logging-operator/pkg/sdk/model/render"
"github.com/ghodss/yaml"
)

func TestKinesisFirehose(t *testing.T) {
CONFIG := []byte(`
delivery_stream_name: test
region: us-east-1
format:
type: json
assume_role_credentials:
role_arn: arn:aws:iam::1111/IAM_ROLE_NAME
role_session_name: logging-operator
buffer:
timekey: 1m
timekey_wait: 30s
timekey_use_utc: true
`)
expected := `
<match **>
@type kinesis_firehose
@id test
delivery_stream_name test
region us-east-1
<assume_role_credentials>
role_arn arn:aws:iam::1111/IAM_ROLE_NAME
role_session_name logging-operator
</assume_role_credentials>
<buffer tag,time>
@type file
path /buffers/test.*.buffer
retry_forever true
timekey 1m
timekey_use_utc true
timekey_wait 30s
</buffer>
<format>
@type json
</format>
</match>
`
kinesis := &output.KinesisFirehoseOutputConfig{}
yaml.Unmarshal(CONFIG, kinesis)
test := render.NewOutputPluginTest(t, kinesis)
test.DiffResult(expected)
}
2 changes: 1 addition & 1 deletion pkg/sdk/model/output/kinesis_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type _hugoKinesisStream interface{}
// ```
type _docKinesisStream interface{}

// +name:"Amazon Kinesis"
// +name:"Amazon Kinesis Stream"
// +url:"https://github.com/awslabs/aws-fluent-plugin-kinesis/releases/tag/v3.2.2"
// +version:"3.2.2"
// +description:"Fluent plugin for Amazon Kinesis"
Expand Down
65 changes: 65 additions & 0 deletions pkg/sdk/model/output/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit e0ef331

Please sign in to comment.