Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decompress option on kinesis-consumer #8891

Merged
merged 11 commits into from Mar 18, 2021
9 changes: 9 additions & 0 deletions plugins/inputs/kinesis_consumer/README.md
Expand Up @@ -54,6 +54,15 @@ and creates metrics using one of the supported [input data formats][].
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"

##
## Whether or not to uncompress the data from kinesis
## If you are processing a cloudwatch logs kinesis stream then set this to "gzip"
## as AWS compresses cloudwatch log data before it is sent to kinesis (aws
## also base64 encodes the zip byte data before pushing to the stream. The base64 decoding
## is done automatically by the golang sdk, as data is read from kinesis)
##
# decompress = "none"

## Optional
## Configuration for a dynamodb checkpoint
[inputs.kinesis_consumer.checkpoint_dynamodb]
Expand Down
65 changes: 64 additions & 1 deletion plugins/inputs/kinesis_consumer/kinesis_consumer.go
@@ -1,8 +1,12 @@
package kinesis_consumer

import (
"bytes"
"compress/gzip"
"compress/zlib"
"context"
"fmt"
"io/ioutil"
"math/big"
"strings"
"sync"
Expand Down Expand Up @@ -38,6 +42,7 @@ type (
ShardIteratorType string `toml:"shard_iterator_type"`
DynamoDB *DynamoDB `toml:"checkpoint_dynamodb"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
DecompressionType string `toml:"decompress"`

Log telegraf.Logger

Expand All @@ -55,6 +60,8 @@ type (
recordsTex sync.Mutex
wg sync.WaitGroup

decompressionFunc decompress

lastSeqNum *big.Int
}

Expand All @@ -68,6 +75,8 @@ const (
defaultMaxUndeliveredMessages = 1000
)

type decompress func([]byte) ([]byte, error)

// this is the largest sequence number allowed - https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SequenceNumberRange.html
var maxSeq = strToBint(strings.Repeat("9", 129))

Expand Down Expand Up @@ -118,6 +127,15 @@ var sampleConfig = `
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"

##
## Whether or not to uncompress the data from kinesis
## If you are processing a cloudwatch logs kinesis stream then set this to "gzip"
## as AWS compresses cloudwatch log data before it is sent to kinesis (aws
## also base64 encodes the zip byte data before pushing to the stream. The base64 decoding
## is done automatically by the golang sdk, as data is read from kinesis)
##
# decompress = "none"

## Optional
## Configuration for a dynamodb checkpoint
[inputs.kinesis_consumer.checkpoint_dynamodb]
Expand Down Expand Up @@ -239,7 +257,11 @@ func (k *KinesisConsumer) Start(ac telegraf.Accumulator) error {
}

func (k *KinesisConsumer) onMessage(acc telegraf.TrackingAccumulator, r *consumer.Record) error {
metrics, err := k.parser.Parse(r.Data)
data, err := k.decompressionFunc(r.Data)
if err != nil {
return err
}
metrics, err := k.parser.Parse(data)
if err != nil {
return err
}
Expand Down Expand Up @@ -334,6 +356,46 @@ func (k *KinesisConsumer) Set(streamName, shardID, sequenceNumber string) error
return nil
}

func decompressGzip(data []byte) ([]byte, error) {
zipData, err := gzip.NewReader(bytes.NewReader(data))
if err != nil {
return nil, err
}
defer zipData.Close()
return ioutil.ReadAll(zipData)
}

func decompressZlib(data []byte) ([]byte, error) {
zlibData, err := zlib.NewReader(bytes.NewReader(data))
if err != nil {
return nil, err
}
defer zlibData.Close()
return ioutil.ReadAll(zlibData)
}

func decompressNoOp(data []byte) ([]byte, error) {
return data, nil
}

func (k *KinesisConsumer) configureDecompressionFunc() error {
switch k.DecompressionType {
case "gzip":
k.decompressionFunc = decompressGzip
case "zlib":
k.decompressionFunc = decompressZlib
case "none", "":
k.decompressionFunc = decompressNoOp
default:
return fmt.Errorf("unknown decompression %q", k.DecompressionType)
}
return nil
}

func (k *KinesisConsumer) Init() error {
return k.configureDecompressionFunc()
}

type noopCheckpoint struct{}

func (n noopCheckpoint) Set(string, string, string) error { return nil }
Expand All @@ -347,6 +409,7 @@ func init() {
ShardIteratorType: "TRIM_HORIZON",
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
lastSeqNum: maxSeq,
DecompressionType: "none",
}
})
}
140 changes: 140 additions & 0 deletions plugins/inputs/kinesis_consumer/kinesis_consumer_test.go
@@ -0,0 +1,140 @@
package kinesis_consumer

import (
"encoding/base64"
"github.com/aws/aws-sdk-go/aws"
consumer "github.com/harlow/kinesis-consumer"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/stretchr/testify/assert"
"testing"
)

type TestTrackingAccumulator struct {
telegraf.TrackingAccumulator
Metrics *[]telegraf.Metric
}

func (t TestTrackingAccumulator) AddTrackingMetricGroup(group []telegraf.Metric) telegraf.TrackingID {
*t.Metrics = append(*t.Metrics, group...)
return 1
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the tracking accumulator provided by testutil.


func TestKinesisConsumer_onMessage(t *testing.T) {
zlibBytpes, _ := base64.StdEncoding.DecodeString("eF5FjlFrgzAUhf9KuM+2aNB2zdsQ2xe3whQGW8qIeqdhaiSJK0P874u1Y4+Hc/jON0GHxoga858BgUF8fs5fzunHU5Jlj6cEPFDXHvXStGqsrsKWTapq44pW1SetxsF1a8qsRtGt0YyFKbUcrFT9UbYWtQH2frntkm/s7RInkNU6t9JpWNE5WBAFPo3CcHeg+9D703OziUOhCg6MQ/yakrspuZsyEjdYfsm+Jg2K1jZEfZLKQWUvFglylBobZXDLwSP8//EGpD4NNj7dUJpT6hQY3W33h/AhCt84zDBf5l/MDl08")
gzippedBytes, _ := base64.StdEncoding.DecodeString("H4sIAAFXNGAAA0WOUWuDMBSF/0q4z7Zo0HbN2xDbF7fCFAZbyoh6p2FqJIkrQ/zvi7Vjj4dz+M43QYfGiBrznwGBQXx+zl/O6cdTkmWPpwQ8UNce9dK0aqyuwpZNqmrjilbVJ63GwXVryqxG0a3RjIUptRysVP1Rtha1AfZ+ue2Sb+ztEieQ1Tq30mlY0TlYEAU+jcJwd6D70PvTc7OJQ6EKDoxD/JqSuym5mzISN1h+yb4mDYrWNkR9kspBZS8WCXKUGhtlcMvBI/z/8QakPg02Pt1QmlPqFBjdbfeH8CEK3zjMMF/mX0TaxZUpAQAA")
notZippedBytes := []byte(`{"messageType":"CONTROL_MESSAGE","owner":"CloudwatchLogs","logGroup":"","logStream":"",
"subscriptionFilters":[],"logEvents":[
{"id":"","timestamp":1510254469274,"message":"{\"bob\":\"CWL CONTROL MESSAGE: Checking health of destination Firehose.\", \"timestamp\":\"2021-02-22T22:15:26.794854Z\"},"},
{"id":"","timestamp":1510254469274,"message":"{\"bob\":\"CWL CONTROL MESSAGE: Checking health of destination Firehose.\", \"timestamp\":\"2021-02-22T22:15:26.794854Z\"}"}
]}`)
parser, _ := json.New(&json.Config{
MetricName: "json_test",
Query: "logEvents",
StringFields: []string{"message"},
})

type fields struct {
DecompressionType string
parser parsers.Parser
records map[telegraf.TrackingID]string
}
type args struct {
r *consumer.Record
}
type expected struct {
numberOfMetrics int
messageContains string
}
tests := []struct {
name string
fields fields
args args
wantErr bool
expected expected
}{
{
name: "test no compression",
fields: fields{
DecompressionType: "none",
parser: parser,
records: make(map[telegraf.TrackingID]string),
},
args: args{
r: &consumer.Record{Data: notZippedBytes, SequenceNumber: aws.String("anything")},
},
wantErr: false,
expected: expected{
messageContains: "bob",
numberOfMetrics: 2,
},
},
{
name: "test gzip compression",
fields: fields{
DecompressionType: "gzip",
parser: parser,
records: make(map[telegraf.TrackingID]string),
},
args: args{
r: &consumer.Record{Data: gzippedBytes, SequenceNumber: aws.String("anything")},
},
wantErr: false,
expected: expected{
messageContains: "bob",
numberOfMetrics: 1,
},
},
{
name: "test zlib compression",
fields: fields{
DecompressionType: "zlib",
parser: parser,
records: make(map[telegraf.TrackingID]string),
},
args: args{
r: &consumer.Record{Data: zlibBytpes, SequenceNumber: aws.String("anything")},
},
wantErr: false,
expected: expected{
messageContains: "bob",
numberOfMetrics: 1,
},
},
}

k := &KinesisConsumer{
DecompressionType: "notsupported",
}
err := k.Init()
assert.NotNil(t, err)

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
k := &KinesisConsumer{
DecompressionType: tt.fields.DecompressionType,
parser: tt.fields.parser,
records: tt.fields.records,
}
err := k.Init()
assert.Nil(t, err)

var metrics []telegraf.Metric
if err := k.onMessage(TestTrackingAccumulator{Metrics: &metrics}, tt.args.r); (err != nil) != tt.wantErr {
t.Errorf("onMessage() error = %v, wantErr %v", err, tt.wantErr)
}

assert.Equal(t, tt.expected.numberOfMetrics, len(metrics))

for _, metric := range metrics {
if logEventMessage, ok := metric.Fields()["message"]; ok {
assert.Contains(t, logEventMessage.(string), tt.expected.messageContains)
} else {
t.Errorf("Expect logEvents to be present")
}
}
})
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tootedom I think the linter is complaining about this empty line. We need CI to pass before we can merge, including the linter.

Suggested change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many thanks for the pointer. I was failing miserably at installing revive locally (still not fathomed it).
Have removed the line, and pushed. Linter looks happy.

}