Skip to content

Commit

Permalink
Add aggregation capability to signalfx exporter translations (#501)
Browse files Browse the repository at this point in the history
We need simple aggregation capability to get machine_cpu_cores metric. For now only "count" aggregation method is needed
  • Loading branch information
dmitryax committed Jul 24, 2020
1 parent 4fe8114 commit c8eea78
Show file tree
Hide file tree
Showing 5 changed files with 432 additions and 30 deletions.
2 changes: 1 addition & 1 deletion exporter/signalfxexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestCreateMetricsExporterWithDefaultTranslaitonRules(t *testing.T) {

// Validate that default translation rules are loaded
// Expected values has to be updated once default config changed
assert.Equal(t, 7, len(config.TranslationRules))
assert.Equal(t, 9, len(config.TranslationRules))
assert.Equal(t, translation.ActionRenameDimensionKeys, config.TranslationRules[0].Action)
assert.Equal(t, 15, len(config.TranslationRules[0].Mapping))
}
Expand Down
12 changes: 12 additions & 0 deletions exporter/signalfxexporter/translation/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,17 @@ translation_rules:
cpu.wait: int
cpu.softirq: int
cpu.nice: int
- action: copy_metrics
mapping:
cpu.idle: machine_cpu_cores
- action: aggregate_metric
metric_name: machine_cpu_cores
aggregation_method: count
dimensions:
- host
- kubernetes_node
- kubernetes_cluster
`
)
37 changes: 22 additions & 15 deletions exporter/signalfxexporter/translation/metricdata_to_signalfxv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,23 +81,17 @@ func MetricDataToSignalFxV2(
metricTranslator *MetricTranslator,
md consumerdata.MetricsData,
) (sfxDataPoints []*sfxpb.DataPoint, numDroppedTimeSeries int) {
sfxDataPoints, numDroppedTimeSeries = metricDataToSfxDataPoints(logger, md)
if metricTranslator != nil {
sfxDataPoints = metricTranslator.TranslateDataPoints(logger, sfxDataPoints)
}
sfxDataPoints, numDroppedTimeSeries = metricDataToSfxDataPoints(logger, metricTranslator, md)

sanitizeDataPointDimensions(sfxDataPoints)
return
}

func metricDataToSfxDataPoints(
logger *zap.Logger,
metricTranslator *MetricTranslator,
md consumerdata.MetricsData,
) (sfxDataPoints []*sfxpb.DataPoint, numDroppedTimeSeries int) {

// The final number of data points is likely larger than len(md.Metrics)
// but at least that is expected.
sfxDataPoints = make([]*sfxpb.DataPoint, 0, len(md.Metrics))

var err error

// Labels from Node and Resource.
Expand All @@ -120,6 +114,13 @@ func metricDataToSfxDataPoints(
continue
}

if sfxDataPoints == nil {
// Suppose all metrics has roughly similar number of timeseries
sfxDataPoints = make([]*sfxpb.DataPoint, 0, len(md.Metrics)*len(metric.Timeseries))
}

metricDataPoints := make([]*sfxpb.DataPoint, 0, len(metric.Timeseries))

// Build the fixed parts for this metrics from the descriptor.
descriptor := metric.MetricDescriptor
metricName := descriptor.Name
Expand Down Expand Up @@ -157,15 +158,15 @@ func metricDataToSfxDataPoints(
switch pv := dp.Value.(type) {
case *metricspb.Point_Int64Value:
sfxDataPoint.Value = sfxpb.Datum{IntValue: &pv.Int64Value}
sfxDataPoints = append(sfxDataPoints, sfxDataPoint)
metricDataPoints = append(metricDataPoints, sfxDataPoint)

case *metricspb.Point_DoubleValue:
sfxDataPoint.Value = sfxpb.Datum{DoubleValue: &pv.DoubleValue}
sfxDataPoints = append(sfxDataPoints, sfxDataPoint)
metricDataPoints = append(metricDataPoints, sfxDataPoint)

case *metricspb.Point_DistributionValue:
sfxDataPoints, err = appendDistributionValues(
sfxDataPoints,
metricDataPoints, err = appendDistributionValues(
metricDataPoints,
sfxDataPoint,
pv.DistributionValue)
if err != nil {
Expand All @@ -176,8 +177,8 @@ func metricDataToSfxDataPoints(
zap.String("metric", sfxDataPoint.Metric))
}
case *metricspb.Point_SummaryValue:
sfxDataPoints, err = appendSummaryValues(
sfxDataPoints,
metricDataPoints, err = appendSummaryValues(
metricDataPoints,
sfxDataPoint,
pv.SummaryValue)
if err != nil {
Expand All @@ -196,6 +197,12 @@ func metricDataToSfxDataPoints(

}
}

if metricTranslator != nil {
metricDataPoints = metricTranslator.TranslateDataPoints(logger, metricDataPoints)
}

sfxDataPoints = append(sfxDataPoints, metricDataPoints...)
}

return sfxDataPoints, numDroppedTimeSeries
Expand Down
198 changes: 184 additions & 14 deletions exporter/signalfxexporter/translation/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,27 @@ const (
// k8s.pod.network.io{direction="receive"} -> pod_network_receive_bytes_total{}
// k8s.pod.network.io{direction="transmit"} -> pod_network_transmit_bytes_total{}
ActionSplitMetric Action = "split_metric"

// ActionAggregateMetric aggregates metrics by dimensions provided in tr.Dimensions.
// It takes datapoints with name tr.MetricName and aggregates them to a smaller set keeping the same name.
// It drops all other dimensions other then those that match tr.Dimensions.
// If a datapoint doesn't have a dimension matchin tr.Dimensions, the datapoint will be dropped.
// tr.AggregationMethod is used to specify a method to aggregate the values.
// For example, having the following translation rule:
// - action: aggregate_metric
// metric_name: machine_cpu_cores
// aggregation_method: count
// dimensions:
// - host
// The following translations will be performed:
// Original datapoints:
// machine_cpu_cores{cpu="cpu1",host="host1"} 0.22
// machine_cpu_cores{cpu="cpu2",host="host1"} 0.11
// machine_cpu_cores{cpu="cpu1",host="host2"} 0.33
// Transformed datapoints:
// machine_cpu_cores{host="host1"} 2
// machine_cpu_cores{host="host2"} 1
ActionAggregateMetric Action = "aggregate_metric"
)

// MetricValueType is the enum to capture valid metric value types that can be converted
Expand All @@ -77,6 +98,14 @@ const (
MetricValueTypeDouble MetricValueType = "double"
)

// AggregationMethod is the enum used to capture aggregation method
type AggregationMethod string

const (
// AggregationMethodCount represents count aggregation method
AggregationMethodCount AggregationMethod = "count"
)

type Rule struct {
// Action specifies the translation action to be applied on metrics.
// This is a required field.
Expand Down Expand Up @@ -105,6 +134,14 @@ type Rule struct {
// TypesMapping is represents metric_name/metric_type key/value pairs,
// used by ActionConvertValues.
TypesMapping map[string]MetricValueType `mapstructure:"types_mapping"`

// AggregationMethod specifies method used by "aggregate_metric" translation rule
AggregationMethod AggregationMethod `mapstructure:"aggregation_method"`

// Dimensions is used by "aggregate_metric" translation rule to specify dimension keys
// that will be used to aggregate the metric across.
// Datapoints that don't have all the dimensions will be dropped.
Dimensions []string `mapstructure:"dimensions"`
}

type MetricTranslator struct {
Expand Down Expand Up @@ -178,6 +215,16 @@ func validateTranslationRules(rules []Rule) error {
return fmt.Errorf("invalid value type %q set for metric %q in \"types_mapping\"", v, k)
}
}
case ActionAggregateMetric:
if tr.MetricName == "" || tr.AggregationMethod == "" || len(tr.Dimensions) == 0 {
return fmt.Errorf("fields \"metric_name\", \"dimensions\", and \"aggregation_method\" "+
"are required for %q translation rule", tr.Action)
}
if tr.AggregationMethod != "count" {
return fmt.Errorf("invalid \"aggregation_method\": %q provided for %q translation rule",
tr.AggregationMethod, tr.Action)
}

default:
return fmt.Errorf("unknown \"action\" value: %q", tr.Action)
}
Expand All @@ -197,63 +244,99 @@ func createDimensionsMap(rules []Rule) map[string]string {
return nil
}

// TranslateDataPoints transforms datapoints to a format compatible with signalfx backend
// sfxDataPoints represents one metric converted to signalfx protobuf datapoints
func (mp *MetricTranslator) TranslateDataPoints(logger *zap.Logger, sfxDataPoints []*sfxpb.DataPoint) []*sfxpb.DataPoint {
processedDataPoints := sfxDataPoints

for _, tr := range mp.rules {
newDataPoints := []*sfxpb.DataPoint{}

for _, dp := range processedDataPoints {
switch tr.Action {
case ActionRenameDimensionKeys:
switch tr.Action {
case ActionRenameDimensionKeys:
for _, dp := range processedDataPoints {
for _, d := range dp.Dimensions {
if newKey, ok := tr.Mapping[d.Key]; ok {
d.Key = newKey
}
}
case ActionRenameMetrics:
}
case ActionRenameMetrics:
for _, dp := range processedDataPoints {
if newKey, ok := tr.Mapping[dp.Metric]; ok {
dp.Metric = newKey
}
case ActionMultiplyInt:
}
case ActionMultiplyInt:
for _, dp := range processedDataPoints {
if multiplier, ok := tr.ScaleFactorsInt[dp.Metric]; ok {
v := dp.GetValue().IntValue
if v != nil {
*v = *v * multiplier
}
}
case ActionDivideInt:
}
case ActionDivideInt:
for _, dp := range processedDataPoints {
if divisor, ok := tr.ScaleFactorsInt[dp.Metric]; ok {
v := dp.GetValue().IntValue
if v != nil {
*v = *v / divisor
}
}
case ActionMultiplyFloat:
}
case ActionMultiplyFloat:
for _, dp := range processedDataPoints {
if multiplier, ok := tr.ScaleFactorsFloat[dp.Metric]; ok {
v := dp.GetValue().DoubleValue
if v != nil {
*v = *v * multiplier
}
}
case ActionCopyMetrics:
}
case ActionCopyMetrics:
newDataPoints := []*sfxpb.DataPoint{}
for _, dp := range processedDataPoints {
if newMetric, ok := tr.Mapping[dp.Metric]; ok {
newDataPoint := proto.Clone(dp).(*sfxpb.DataPoint)
newDataPoint.Metric = newMetric
newDataPoints = append(newDataPoints, newDataPoint)
}
case ActionSplitMetric:
}
processedDataPoints = append(processedDataPoints, newDataPoints...)
case ActionSplitMetric:
for _, dp := range processedDataPoints {
if tr.MetricName == dp.Metric {
splitMetric(dp, tr.DimensionKey, tr.Mapping)
}
case ActionConvertValues:
}
case ActionConvertValues:
for _, dp := range processedDataPoints {
if newType, ok := tr.TypesMapping[dp.Metric]; ok {
convertMetricValue(logger, dp, newType)
}
}
case ActionAggregateMetric:
// NOTE: Based on the usage of TranslateDataPoints we can assume that the datapoints batch []*sfxpb.DataPoint
// represents only one metric and all the datapoints can be aggregated together.
var dpsToAggregate []*sfxpb.DataPoint
var otherDps []*sfxpb.DataPoint
for i, dp := range processedDataPoints {
if dp.Metric == tr.MetricName {
if dpsToAggregate == nil {
dpsToAggregate = make([]*sfxpb.DataPoint, 0, len(processedDataPoints)-i)
}
dpsToAggregate = append(dpsToAggregate, dp)
} else {
if otherDps == nil {
otherDps = make([]*sfxpb.DataPoint, 0, len(processedDataPoints)-i)
}
// This slice can contain additional datapoints from a different metric
// for example copied in a translation step before
otherDps = append(otherDps, dp)
}
}
aggregatedDps := aggregateDatapoints(logger, dpsToAggregate, tr.Dimensions, tr.AggregationMethod)
processedDataPoints = append(otherDps, aggregatedDps...)
}

processedDataPoints = append(processedDataPoints, newDataPoints...)
}

return processedDataPoints
Expand All @@ -266,6 +349,93 @@ func (mp *MetricTranslator) TranslateDimension(orig string) string {
return orig
}

// aggregateDatapoints aggregates datapoints assuming that they have
// the same Timestamp, MetricType, Metric and Source fields.
func aggregateDatapoints(
logger *zap.Logger,
dps []*sfxpb.DataPoint,
dimensionsKeys []string,
aggregation AggregationMethod,
) []*sfxpb.DataPoint {
if len(dps) == 0 {
return nil
}

// group datapoints by dimension values
dimValuesToDps := make(map[string][]*sfxpb.DataPoint, len(dps))
for i, dp := range dps {
aggregationKey, err := getAggregationKey(dp.Dimensions, dimensionsKeys)
if err != nil {
logger.Debug("datapoint is dropped", zap.String("metric", dp.Metric), zap.Error(err))
continue
}
if _, ok := dimValuesToDps[aggregationKey]; !ok {
// set slice capacity to the possible maximum = len(dps)-i to avoid reallocations
dimValuesToDps[aggregationKey] = make([]*sfxpb.DataPoint, 0, len(dps)-i)
}
dimValuesToDps[aggregationKey] = append(dimValuesToDps[aggregationKey], dp)
}

// Get aggregated results
result := make([]*sfxpb.DataPoint, 0, len(dimValuesToDps))
for _, dps := range dimValuesToDps {
dp := proto.Clone(dps[0]).(*sfxpb.DataPoint)
dp.Dimensions = filterDimensions(dp.Dimensions, dimensionsKeys)
switch aggregation {
case AggregationMethodCount:
gauge := sfxpb.MetricType_GAUGE
dp.MetricType = &gauge
value := int64(len(dps))
dp.Value = sfxpb.Datum{
IntValue: &value,
}
}
result = append(result, dp)
}

return result
}

// getAggregationKey composes an aggregation key based on provided dimensions.
// If all the dimensions found, the function returns an aggregationkey.
// If any dimension os not found the function returns an error.
func getAggregationKey(dimensions []*sfxpb.Dimension, dimensionsKeys []string) (string, error) {
const aggregationKeyDelimiter = "//"
var aggregationKey string
for _, dk := range dimensionsKeys {
var dimensionFound bool
for _, d := range dimensions {
if d.Key == dk {
// compose an aggregation key with "//" as delimiter
aggregationKey += d.Value + aggregationKeyDelimiter
dimensionFound = true
continue
}
}
if !dimensionFound {
return "", fmt.Errorf("dimension to aggregate by is not found: %q", dk)
}
}
return aggregationKey, nil
}

// filterDimensions returns list of dimension filtered by dimensionsKeys
func filterDimensions(dimensions []*sfxpb.Dimension, dimensionsKeys []string) []*sfxpb.Dimension {
if len(dimensions) == 0 || len(dimensionsKeys) == 0 {
return nil
}
result := make([]*sfxpb.Dimension, 0, len(dimensionsKeys))
for _, dk := range dimensionsKeys {
for _, d := range dimensions {
if d.Key == dk {
result = append(result, d)
continue
}
}
}
return result
}

// splitMetric renames a metric with "dimension key" == dimensionKey to mapping["dimension value"],
// datapoint not changed if not dimension found equal to dimensionKey:mapping->key.
func splitMetric(dp *sfxpb.DataPoint, dimensionKey string, mapping map[string]string) {
Expand Down

0 comments on commit c8eea78

Please sign in to comment.