Skip to content

Commit

Permalink
Add aggregation capability to signalfx exporter translations
Browse files Browse the repository at this point in the history
We need simple aggregation capability to get machine_cpu_cores metric. For now only "count" aggregation method is needed
  • Loading branch information
dmitryax committed Jul 23, 2020
1 parent 2f9ac2b commit db56d8c
Show file tree
Hide file tree
Showing 5 changed files with 404 additions and 25 deletions.
2 changes: 1 addition & 1 deletion exporter/signalfxexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestCreateMetricsExporterWithDefaultTranslaitonRules(t *testing.T) {

// Validate that default translation rules are loaded
// Expected values has to be updated once default config changed
assert.Equal(t, 7, len(config.TranslationRules))
assert.Equal(t, 9, len(config.TranslationRules))
assert.Equal(t, translation.ActionRenameDimensionKeys, config.TranslationRules[0].Action)
assert.Equal(t, 15, len(config.TranslationRules[0].Mapping))
}
Expand Down
9 changes: 9 additions & 0 deletions exporter/signalfxexporter/translation/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,14 @@ translation_rules:
cpu.wait: int
cpu.softirq: int
cpu.nice: int
- action: copy_metrics
mapping:
cpu.idle: machine_cpu_cores
- action: aggregate_metric
metric_name: machine_cpu_cores
aggregation_method: count
dimension_key: host
`
)
26 changes: 16 additions & 10 deletions exporter/signalfxexporter/translation/metricdata_to_signalfxv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,15 @@ func MetricDataToSignalFxV2(
metricTranslator *MetricTranslator,
md consumerdata.MetricsData,
) (sfxDataPoints []*sfxpb.DataPoint, numDroppedTimeSeries int) {
sfxDataPoints, numDroppedTimeSeries = metricDataToSfxDataPoints(logger, md)
if metricTranslator != nil {
sfxDataPoints = metricTranslator.TranslateDataPoints(logger, sfxDataPoints)
}
sfxDataPoints, numDroppedTimeSeries = metricDataToSfxDataPoints(logger, metricTranslator, md)

sanitizeDataPointDimensions(sfxDataPoints)
return
}

func metricDataToSfxDataPoints(
logger *zap.Logger,
metricTranslator *MetricTranslator,
md consumerdata.MetricsData,
) (sfxDataPoints []*sfxpb.DataPoint, numDroppedTimeSeries int) {

Expand Down Expand Up @@ -120,6 +119,8 @@ func metricDataToSfxDataPoints(
continue
}

metricDataPoints := make([]*sfxpb.DataPoint, 0, len(md.Metrics))

// Build the fixed parts for this metrics from the descriptor.
descriptor := metric.MetricDescriptor
metricName := descriptor.Name
Expand Down Expand Up @@ -157,15 +158,15 @@ func metricDataToSfxDataPoints(
switch pv := dp.Value.(type) {
case *metricspb.Point_Int64Value:
sfxDataPoint.Value = sfxpb.Datum{IntValue: &pv.Int64Value}
sfxDataPoints = append(sfxDataPoints, sfxDataPoint)
metricDataPoints = append(metricDataPoints, sfxDataPoint)

case *metricspb.Point_DoubleValue:
sfxDataPoint.Value = sfxpb.Datum{DoubleValue: &pv.DoubleValue}
sfxDataPoints = append(sfxDataPoints, sfxDataPoint)
metricDataPoints = append(metricDataPoints, sfxDataPoint)

case *metricspb.Point_DistributionValue:
sfxDataPoints, err = appendDistributionValues(
sfxDataPoints,
metricDataPoints, err = appendDistributionValues(
metricDataPoints,
sfxDataPoint,
pv.DistributionValue)
if err != nil {
Expand All @@ -176,8 +177,8 @@ func metricDataToSfxDataPoints(
zap.String("metric", sfxDataPoint.Metric))
}
case *metricspb.Point_SummaryValue:
sfxDataPoints, err = appendSummaryValues(
sfxDataPoints,
metricDataPoints, err = appendSummaryValues(
metricDataPoints,
sfxDataPoint,
pv.SummaryValue)
if err != nil {
Expand All @@ -196,6 +197,11 @@ func metricDataToSfxDataPoints(

}
}

if metricTranslator != nil {
metricDataPoints = metricTranslator.TranslateDataPoints(logger, metricDataPoints)
}
sfxDataPoints = append(sfxDataPoints, metricDataPoints...)
}

return sfxDataPoints, numDroppedTimeSeries
Expand Down
184 changes: 170 additions & 14 deletions exporter/signalfxexporter/translation/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,28 @@ const (
// k8s.pod.network.io{direction="receive"} -> pod_network_receive_bytes_total{}
// k8s.pod.network.io{direction="transmit"} -> pod_network_transmit_bytes_total{}
ActionSplitMetric Action = "split_metric"

// ActionAggregateMetric aggregates metric by one dimension provided in tr.DimensionKey.
// It takes datapoints with name tr.MetricName and aggregate them to a smaller set keeping the same name.
// It drops all other dimensions if they have different values across aggregated datapoints,
// dimensions with the same values across aggregated datapoints are kept.
// tr.DimensionKey is always kept because it is used to aggregate datapoints by.
// If datapoint doesn't have a dimension with tr.DimensionKey, it will be dropped.
// tr.AggregationMethod used to specify a method to aggregate the values.
// For example, having the following translation rule:
// - action: aggregate_metric
// metric_name: machine_cpu_cores
// aggregation_method: count
// dimension_key: host
// The following translations will be performed:
// Original datapoints:
// machine_cpu_cores{cpu="cpu1",host="host1"} 0.22
// machine_cpu_cores{cpu="cpu2",host="host1"} 0.11
// machine_cpu_cores{cpu="cpu1",host="host2"} 0.33
// Transformed datapoints:
// machine_cpu_cores{host="host1"} 2
// machine_cpu_cores{host="host2"} 1
ActionAggregateMetric Action = "aggregate_metric"
)

// MetricValueType is the enum to capture valid metric value types that can be converted
Expand All @@ -77,6 +99,14 @@ const (
MetricValueTypeDouble MetricValueType = "double"
)

// AggregationMethod is the enum used to capture aggregation method
type AggregationMethod string

const (
// AggregationMethodCount represents count aggregation method
AggregationMethodCount AggregationMethod = "count"
)

type Rule struct {
// Action specifies the translation action to be applied on metrics.
// This is a required field.
Expand Down Expand Up @@ -105,6 +135,8 @@ type Rule struct {
// TypesMapping is represents metric_name/metric_type key/value pairs,
// used by ActionConvertValues.
TypesMapping map[string]MetricValueType `mapstructure:"types_mapping"`

AggregationMethod AggregationMethod `mapstructure:"aggregation_method"`
}

type MetricTranslator struct {
Expand Down Expand Up @@ -178,6 +210,16 @@ func validateTranslationRules(rules []Rule) error {
return fmt.Errorf("invalid value type %q set for metric %q in \"types_mapping\"", v, k)
}
}
case ActionAggregateMetric:
if tr.MetricName == "" || tr.DimensionKey == "" || tr.AggregationMethod == "" {
return fmt.Errorf("fields \"metric_name\", \"dimension_key\", and \"aggregation_method\" "+
"are required for %q translation rule", tr.Action)
}
if tr.AggregationMethod != "count" {
return fmt.Errorf("invalid \"aggregation_method\": %q provided for %q translation rule",
tr.AggregationMethod, tr.Action)
}

default:
return fmt.Errorf("unknown \"action\" value: %q", tr.Action)
}
Expand All @@ -197,63 +239,97 @@ func createDimensionsMap(rules []Rule) map[string]string {
return nil
}

// TranslateDataPoints transforms datapoints to a format compatible with signalfx backend
// sfxDataPoints represents one metric converted to signalfx protobuf datapoints
func (mp *MetricTranslator) TranslateDataPoints(logger *zap.Logger, sfxDataPoints []*sfxpb.DataPoint) []*sfxpb.DataPoint {
processedDataPoints := sfxDataPoints

for _, tr := range mp.rules {
newDataPoints := []*sfxpb.DataPoint{}

for _, dp := range processedDataPoints {
switch tr.Action {
case ActionRenameDimensionKeys:
switch tr.Action {
case ActionRenameDimensionKeys:
for _, dp := range processedDataPoints {
for _, d := range dp.Dimensions {
if newKey, ok := tr.Mapping[d.Key]; ok {
d.Key = newKey
}
}
case ActionRenameMetrics:
}
case ActionRenameMetrics:
for _, dp := range processedDataPoints {
if newKey, ok := tr.Mapping[dp.Metric]; ok {
dp.Metric = newKey
}
case ActionMultiplyInt:
}
case ActionMultiplyInt:
for _, dp := range processedDataPoints {
if multiplier, ok := tr.ScaleFactorsInt[dp.Metric]; ok {
v := dp.GetValue().IntValue
if v != nil {
*v = *v * multiplier
}
}
case ActionDivideInt:
}
case ActionDivideInt:
for _, dp := range processedDataPoints {
if divisor, ok := tr.ScaleFactorsInt[dp.Metric]; ok {
v := dp.GetValue().IntValue
if v != nil {
*v = *v / divisor
}
}
case ActionMultiplyFloat:
}
case ActionMultiplyFloat:
for _, dp := range processedDataPoints {
if multiplier, ok := tr.ScaleFactorsFloat[dp.Metric]; ok {
v := dp.GetValue().DoubleValue
if v != nil {
*v = *v * multiplier
}
}
case ActionCopyMetrics:
}
case ActionCopyMetrics:
newDataPoints := []*sfxpb.DataPoint{}
for _, dp := range processedDataPoints {
if newMetric, ok := tr.Mapping[dp.Metric]; ok {
newDataPoint := proto.Clone(dp).(*sfxpb.DataPoint)
newDataPoint.Metric = newMetric
newDataPoints = append(newDataPoints, newDataPoint)
}
case ActionSplitMetric:
}
processedDataPoints = append(processedDataPoints, newDataPoints...)
case ActionSplitMetric:
for _, dp := range processedDataPoints {
if tr.MetricName == dp.Metric {
splitMetric(dp, tr.DimensionKey, tr.Mapping)
}
case ActionConvertValues:
}
case ActionConvertValues:
for _, dp := range processedDataPoints {
if newType, ok := tr.TypesMapping[dp.Metric]; ok {
convertMetricValue(logger, dp, newType)
}
}
}
case ActionAggregateMetric:
result := make([]*sfxpb.DataPoint, 0, len(processedDataPoints))

// NOTE: Based on the usage of TranslateDataPoints we can assume that the datapoints batch []*sfxpb.DataPoint
// represents only one metric and all the datapoints can be aggregated together.
var dpsToAggregate []*sfxpb.DataPoint
for i, dp := range processedDataPoints {
if dp.Metric == tr.MetricName {
if dpsToAggregate == nil {
dpsToAggregate = make([]*sfxpb.DataPoint, 0, len(processedDataPoints)-i)
}
dpsToAggregate = append(dpsToAggregate, dp)
} else {

processedDataPoints = append(processedDataPoints, newDataPoints...)
// Skip datapoints that don't need to be aggregated
result = append(result, dp)
}
}
aggregatedDps := aggregateDatapoints(logger, dpsToAggregate, tr.DimensionKey, tr.AggregationMethod)
processedDataPoints = append(result, aggregatedDps...)
}
}

return processedDataPoints
Expand All @@ -266,6 +342,86 @@ func (mp *MetricTranslator) TranslateDimension(orig string) string {
return orig
}

// aggregateDatapoints aggregates datapoints assuming that they have
// the same Timestamp, MetricType, Metric and Source fields.
func aggregateDatapoints(
logger *zap.Logger,
dps []*sfxpb.DataPoint,
dimensionKey string,
aggregation AggregationMethod,
) []*sfxpb.DataPoint {
if len(dps) == 0 {
return nil
}

// group datapoints by dimension values
dimValToDps := make(map[string][]*sfxpb.DataPoint, len(dps))
for i, dp := range dps {
var dimensionFound bool
for _, d := range dp.Dimensions {
if d.Key == dimensionKey {
if _, ok := dimValToDps[d.Value]; !ok {
dimValToDps[d.Value] = make([]*sfxpb.DataPoint, 0, len(dps)-i)
}
dimValToDps[d.Value] = append(dimValToDps[d.Value], dp)
dimensionFound = true
}
}
if !dimensionFound {
logger.Debug("dimension to aggregate by is not found, datapoint is dropped",
zap.String("metric", dp.Metric), zap.String("dimension", dimensionKey))
}
}

// Get aggregated results
result := make([]*sfxpb.DataPoint, 0, len(dimValToDps))
for _, dps := range dimValToDps {
dp := proto.Clone(dps[0]).(*sfxpb.DataPoint)
dp.Dimensions = getCommonDimensions(dps)
switch aggregation {
case AggregationMethodCount:
gauge := sfxpb.MetricType_GAUGE
dp.MetricType = &gauge
value := int64(len(dps))
dp.Value = sfxpb.Datum{
IntValue: &value,
}
}
result = append(result, dp)
}

return result
}

// getCommonDimensions returns list of dimension that are the same across provided datapoints
func getCommonDimensions(dps []*sfxpb.DataPoint) []*sfxpb.Dimension {
if len(dps) == 0 {
return nil
}

commonDimensions := make([]*sfxpb.Dimension, len(dps[0].Dimensions))
copy(commonDimensions, dps[0].Dimensions)

for _, dp := range dps[1:] {
dimensions := commonDimensions
commonDimensions = make([]*sfxpb.Dimension, 0, len(dimensions))
for _, cd := range dimensions {
var match bool
for _, d := range dp.Dimensions {
if d.Key == cd.Key && d.Value == cd.Value {
match = true
break
}
}
if match {
commonDimensions = append(commonDimensions, cd)
}
}
}

return commonDimensions
}

// splitMetric renames a metric with "dimension key" == dimensionKey to mapping["dimension value"],
// datapoint not changed if not dimension found equal to dimensionKey:mapping->key.
func splitMetric(dp *sfxpb.DataPoint, dimensionKey string, mapping map[string]string) {
Expand Down

0 comments on commit db56d8c

Please sign in to comment.