Skip to content

Commit

Permalink
Support StatisticValues in cloudwatch output plugin (influxdata#4364)
Browse files Browse the repository at this point in the history
  • Loading branch information
david7482 authored and rgitzel committed Oct 17, 2018
1 parent ef15d7c commit 889b009
Show file tree
Hide file tree
Showing 3 changed files with 284 additions and 53 deletions.
10 changes: 10 additions & 0 deletions plugins/outputs/cloudwatch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,13 @@ Examples include but are not limited to:
### namespace

The namespace used for AWS CloudWatch metrics.

### write_statistics

If you have a large amount of metrics, you should consider to send statistic
values instead of raw metrics which could not only improve performance but
also save AWS API cost. If enable this flag, this plugin would parse the required
[CloudWatch statistic fields](https://docs.aws.amazon.com/sdk-for-go/api/service/cloudwatch/#StatisticSet)
(count, min, max, and sum) and send them to CloudWatch. You could use `basicstats`
aggregator to calculate those fields. If not all statistic fields are available,
all fields would still be sent as raw metrics.
289 changes: 238 additions & 51 deletions plugins/outputs/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,128 @@ type CloudWatch struct {

Namespace string `toml:"namespace"` // CloudWatch Metrics Namespace
svc *cloudwatch.CloudWatch

WriteStatistics bool `toml:"write_statistics"`
}

type statisticType int

const (
statisticTypeNone statisticType = iota
statisticTypeMax
statisticTypeMin
statisticTypeSum
statisticTypeCount
)

type cloudwatchField interface {
addValue(sType statisticType, value float64)
buildDatum() []*cloudwatch.MetricDatum
}

type statisticField struct {
metricName string
fieldName string
tags map[string]string
values map[statisticType]float64
timestamp time.Time
}

func (f *statisticField) addValue(sType statisticType, value float64) {
if sType != statisticTypeNone {
f.values[sType] = value
}
}

func (f *statisticField) buildDatum() []*cloudwatch.MetricDatum {

var datums []*cloudwatch.MetricDatum

if f.hasAllFields() {
// If we have all required fields, we build datum with StatisticValues
min, _ := f.values[statisticTypeMin]
max, _ := f.values[statisticTypeMax]
sum, _ := f.values[statisticTypeSum]
count, _ := f.values[statisticTypeCount]

datum := &cloudwatch.MetricDatum{
MetricName: aws.String(strings.Join([]string{f.metricName, f.fieldName}, "_")),
Dimensions: BuildDimensions(f.tags),
Timestamp: aws.Time(f.timestamp),
StatisticValues: &cloudwatch.StatisticSet{
Minimum: aws.Float64(min),
Maximum: aws.Float64(max),
Sum: aws.Float64(sum),
SampleCount: aws.Float64(count),
},
}

datums = append(datums, datum)

} else {
// If we don't have all required fields, we build each field as independent datum
for sType, value := range f.values {
datum := &cloudwatch.MetricDatum{
Value: aws.Float64(value),
Dimensions: BuildDimensions(f.tags),
Timestamp: aws.Time(f.timestamp),
}

switch sType {
case statisticTypeMin:
datum.MetricName = aws.String(strings.Join([]string{f.metricName, f.fieldName, "min"}, "_"))
case statisticTypeMax:
datum.MetricName = aws.String(strings.Join([]string{f.metricName, f.fieldName, "max"}, "_"))
case statisticTypeSum:
datum.MetricName = aws.String(strings.Join([]string{f.metricName, f.fieldName, "sum"}, "_"))
case statisticTypeCount:
datum.MetricName = aws.String(strings.Join([]string{f.metricName, f.fieldName, "count"}, "_"))
default:
// should not be here
continue
}

datums = append(datums, datum)
}
}

return datums
}

func (f *statisticField) hasAllFields() bool {

_, hasMin := f.values[statisticTypeMin]
_, hasMax := f.values[statisticTypeMax]
_, hasSum := f.values[statisticTypeSum]
_, hasCount := f.values[statisticTypeCount]

return hasMin && hasMax && hasSum && hasCount
}

type valueField struct {
metricName string
fieldName string
tags map[string]string
value float64
timestamp time.Time
}

func (f *valueField) addValue(sType statisticType, value float64) {
if sType == statisticTypeNone {
f.value = value
}
}

func (f *valueField) buildDatum() []*cloudwatch.MetricDatum {

return []*cloudwatch.MetricDatum{
{
MetricName: aws.String(strings.Join([]string{f.metricName, f.fieldName}, "_")),
Value: aws.Float64(f.value),
Dimensions: BuildDimensions(f.tags),
Timestamp: aws.Time(f.timestamp),
},
}
}

var sampleConfig = `
Expand Down Expand Up @@ -57,6 +179,14 @@ var sampleConfig = `
## Namespace for the CloudWatch MetricDatums
namespace = "InfluxData/Telegraf"
## If you have a large amount of metrics, you should consider to send statistic
## values instead of raw metrics which could not only improve performance but
## also save AWS API cost. If enable this flag, this plugin would parse the required
## CloudWatch statistic fields (count, min, max, and sum) and send them to CloudWatch.
## You could use basicstats aggregator to calculate those fields. If not all statistic
## fields are available, all fields would still be sent as raw metrics.
# write_statistics = false
`

func (c *CloudWatch) SampleConfig() string {
Expand Down Expand Up @@ -104,7 +234,7 @@ func (c *CloudWatch) Write(metrics []telegraf.Metric) error {

var datums []*cloudwatch.MetricDatum
for _, m := range metrics {
d := BuildMetricDatum(m)
d := BuildMetricDatum(c.WriteStatistics, m)
datums = append(datums, d...)
}

Expand Down Expand Up @@ -159,67 +289,58 @@ func PartitionDatums(size int, datums []*cloudwatch.MetricDatum) [][]*cloudwatch
return partitions
}

// Make a MetricDatum for each field in a Point. Only fields with values that can be
// converted to float64 are supported. Non-supported fields are skipped.
func BuildMetricDatum(point telegraf.Metric) []*cloudwatch.MetricDatum {
datums := make([]*cloudwatch.MetricDatum, len(point.Fields()))
i := 0
// Make a MetricDatum from telegraf.Metric. It would check if all required fields of
// cloudwatch.StatisticSet are available. If so, it would build MetricDatum from statistic values.
// Otherwise, fields would still been built independently.
func BuildMetricDatum(buildStatistic bool, point telegraf.Metric) []*cloudwatch.MetricDatum {

var value float64
fields := make(map[string]cloudwatchField)

for k, v := range point.Fields() {
switch t := v.(type) {
case int:
value = float64(t)
case int32:
value = float64(t)
case int64:
value = float64(t)
case uint64:
value = float64(t)
case float64:
value = t
case bool:
if t {
value = 1
} else {
value = 0
}
case time.Time:
value = float64(t.Unix())
default:
// Skip unsupported type.
datums = datums[:len(datums)-1]
continue
}

// Do CloudWatch boundary checking
// Constraints at: http://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html
if math.IsNaN(value) {
datums = datums[:len(datums)-1]
continue
}
if math.IsInf(value, 0) {
datums = datums[:len(datums)-1]
val, ok := convert(v)
if !ok {
// Only fields with values that can be converted to float64 (and within CloudWatch boundary) are supported.
// Non-supported fields are skipped.
continue
}
if value > 0 && value < float64(8.515920e-109) {
datums = datums[:len(datums)-1]
continue
}
if value > float64(1.174271e+108) {
datums = datums[:len(datums)-1]

sType, fieldName := getStatisticType(k)

// If statistic metric is not enabled or non-statistic type, just take current field as a value field.
if !buildStatistic || sType == statisticTypeNone {
fields[k] = &valueField{
metricName: point.Name(),
fieldName: k,
tags: point.Tags(),
timestamp: point.Time(),
value: val,
}
continue
}

datums[i] = &cloudwatch.MetricDatum{
MetricName: aws.String(strings.Join([]string{point.Name(), k}, "_")),
Value: aws.Float64(value),
Dimensions: BuildDimensions(point.Tags()),
Timestamp: aws.Time(point.Time()),
// Otherwise, it shall be a statistic field.
if _, ok := fields[fieldName]; !ok {
// Hit an uncached field, create statisticField for first time
fields[fieldName] = &statisticField{
metricName: point.Name(),
fieldName: fieldName,
tags: point.Tags(),
timestamp: point.Time(),
values: map[statisticType]float64{
sType: val,
},
}
} else {
// Add new statistic value to this field
fields[fieldName].addValue(sType, val)
}
}

i += 1
var datums []*cloudwatch.MetricDatum
for _, f := range fields {
d := f.buildDatum()
datums = append(datums, d...)
}

return datums
Expand Down Expand Up @@ -268,6 +389,72 @@ func BuildDimensions(mTags map[string]string) []*cloudwatch.Dimension {
return dimensions
}

func getStatisticType(name string) (sType statisticType, fieldName string) {
switch {
case strings.HasSuffix(name, "_max"):
sType = statisticTypeMax
fieldName = strings.TrimSuffix(name, "_max")
case strings.HasSuffix(name, "_min"):
sType = statisticTypeMin
fieldName = strings.TrimSuffix(name, "_min")
case strings.HasSuffix(name, "_sum"):
sType = statisticTypeSum
fieldName = strings.TrimSuffix(name, "_sum")
case strings.HasSuffix(name, "_count"):
sType = statisticTypeCount
fieldName = strings.TrimSuffix(name, "_count")
default:
sType = statisticTypeNone
fieldName = name
}
return
}

func convert(v interface{}) (value float64, ok bool) {

ok = true

switch t := v.(type) {
case int:
value = float64(t)
case int32:
value = float64(t)
case int64:
value = float64(t)
case uint64:
value = float64(t)
case float64:
value = t
case bool:
if t {
value = 1
} else {
value = 0
}
case time.Time:
value = float64(t.Unix())
default:
// Skip unsupported type.
ok = false
return
}

// Do CloudWatch boundary checking
// Constraints at: http://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html
switch {
case math.IsNaN(value):
return 0, false
case math.IsInf(value, 0):
return 0, false
case value > 0 && value < float64(8.515920e-109):
return 0, false
case value > float64(1.174271e+108):
return 0, false
}

return
}

func init() {
outputs.Add("cloudwatch", func() telegraf.Output {
return &CloudWatch{}
Expand Down
Loading

0 comments on commit 889b009

Please sign in to comment.