Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decompress option on kinesis-consumer #8891

Merged
merged 11 commits into from Mar 18, 2021
9 changes: 9 additions & 0 deletions plugins/inputs/kinesis_consumer/README.md
Expand Up @@ -54,6 +54,15 @@ and creates metrics using one of the supported [input data formats][].
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"

##
## Whether or not to uncompress the data from kinesis
## If you are processing a cloudwatch logs kinesis stream then set this to true
## as AWS compresses cloudwatch log data before it is sent to kinesis (aws
## also base64 encodes the zip byte data before pushing to the stream. The base64 decoding
## is done automatically by the golang sdk, as data is read from kinesis)
##
# decompress = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to allow to specify the compression algorithm in order to be more flexible? What if they decide to use snappy or similar in the future?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks good, but it might be beneficial to allow the user to specify the decompression/compression algorithm (i.e. "gzip" for now) to be future proof. What do you think?

Sounds sensible. How's something like the following sound:

decompress: "gzip"

snappy could requested like the following, i.e:

decompress: "snappy"

I can look at adding in the support for snappy at the same time (would be an extra dependency I think).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds exactly like what I had in mind. :-) You don't need to add snappy now but I like to be prepared. :-) So only having "gzip" or "" is fine for me. If we hit other compression types we can also add later. But in case you are already aware of a use-case, you can of course add it in one go.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh and please use internal.choice for testing the valid values... ;-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know how you feel about this change to decompress_with. I've added zlib as an option; but left out the snappy for now (extra dependency required to do the snappy; whereas zlib is standard in golang). I could always look at sending an additional PR for snappy. I added a test for the gzip/zlib/none options.

Happy to make changes. Thanks for the help.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried doing a !retry-failed; but it did not seem to make the CircleCI happier. Unfortunately I can't see what the output of the circle ci tests are; would you be able to provide me the details? Locally I'm unable to see failures related to the PR. (The permissions CircleCI is wanting are very permissive; and is wanting access to all of my repos; private and public).


## Optional
## Configuration for a dynamodb checkpoint
[inputs.kinesis_consumer.checkpoint_dynamodb]
Expand Down
38 changes: 37 additions & 1 deletion plugins/inputs/kinesis_consumer/kinesis_consumer.go
@@ -1,8 +1,11 @@
package kinesis_consumer

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io/ioutil"
"math/big"
"strings"
"sync"
Expand Down Expand Up @@ -38,6 +41,7 @@ type (
ShardIteratorType string `toml:"shard_iterator_type"`
DynamoDB *DynamoDB `toml:"checkpoint_dynamodb"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
Decompress bool `toml:"decompress"`

Log telegraf.Logger

Expand Down Expand Up @@ -118,6 +122,15 @@ var sampleConfig = `
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"

##
## Whether or not to uncompress the data from kinesis
## If you are processing a cloudwatch logs kinesis stream then set this to true
## as AWS compresses cloudwatch log data before it is sent to kinesis (aws
## also base64 encodes the zip byte data before pushing to the stream. The base64 decoding
## is done automatically by the golang sdk, as data is read from kinesis)
##
# decompress = false

## Optional
## Configuration for a dynamodb checkpoint
[inputs.kinesis_consumer.checkpoint_dynamodb]
Expand Down Expand Up @@ -238,8 +251,30 @@ func (k *KinesisConsumer) Start(ac telegraf.Accumulator) error {
return nil
}

func (k *KinesisConsumer) decompress(data []byte) ([]byte, error) {
zipData, err := gzip.NewReader(bytes.NewReader(data))
if err != nil {
return nil, err
}
defer zipData.Close()
return ioutil.ReadAll(zipData)
}

func (k *KinesisConsumer) getRecordData(r *consumer.Record) ([]byte, error) {
var err error
dataToParse := r.Data
if k.Decompress {
dataToParse, err = k.decompress(dataToParse)
}
return dataToParse, err
}

func (k *KinesisConsumer) onMessage(acc telegraf.TrackingAccumulator, r *consumer.Record) error {
metrics, err := k.parser.Parse(r.Data)
data, err := k.getRecordData(r)
if err != nil {
return err
}
metrics, err := k.parser.Parse(data)
if err != nil {
return err
}
Expand Down Expand Up @@ -347,6 +382,7 @@ func init() {
ShardIteratorType: "TRIM_HORIZON",
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
lastSeqNum: maxSeq,
Decompress: false,
}
})
}