Skip to content

Commit

Permalink
Add aggregation capability to signalfx exporter translations
Browse files Browse the repository at this point in the history
We need simple aggregation capability to get machine_cpu_cores metric. For now only "count" aggregation method is needed
  • Loading branch information
dmitryax committed Jul 24, 2020
1 parent 9764b90 commit fc3459a
Show file tree
Hide file tree
Showing 5 changed files with 432 additions and 30 deletions.
2 changes: 1 addition & 1 deletion exporter/signalfxexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestCreateMetricsExporterWithDefaultTranslaitonRules(t *testing.T) {

// Validate that default translation rules are loaded
// Expected values has to be updated once default config changed
assert.Equal(t, 7, len(config.TranslationRules))
assert.Equal(t, 9, len(config.TranslationRules))
assert.Equal(t, translation.ActionRenameDimensionKeys, config.TranslationRules[0].Action)
assert.Equal(t, 15, len(config.TranslationRules[0].Mapping))
}
Expand Down
12 changes: 12 additions & 0 deletions exporter/signalfxexporter/translation/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,17 @@ translation_rules:
cpu.wait: int
cpu.softirq: int
cpu.nice: int
- action: copy_metrics
mapping:
cpu.idle: machine_cpu_cores
- action: aggregate_metric
metric_name: machine_cpu_cores
aggregation_method: count
dimensions:
- host
- kubernetes_node
- kubernetes_cluster
`
)
37 changes: 22 additions & 15 deletions exporter/signalfxexporter/translation/metricdata_to_signalfxv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,23 +81,17 @@ func MetricDataToSignalFxV2(
metricTranslator *MetricTranslator,
md consumerdata.MetricsData,
) (sfxDataPoints []*sfxpb.DataPoint, numDroppedTimeSeries int) {
sfxDataPoints, numDroppedTimeSeries = metricDataToSfxDataPoints(logger, md)
if metricTranslator != nil {
sfxDataPoints = metricTranslator.TranslateDataPoints(logger, sfxDataPoints)
}
sfxDataPoints, numDroppedTimeSeries = metricDataToSfxDataPoints(logger, metricTranslator, md)

sanitizeDataPointDimensions(sfxDataPoints)
return
}

func metricDataToSfxDataPoints(
logger *zap.Logger,
metricTranslator *MetricTranslator,
md consumerdata.MetricsData,
) (sfxDataPoints []*sfxpb.DataPoint, numDroppedTimeSeries int) {

// The final number of data points is likely larger than len(md.Metrics)
// but at least that is expected.
sfxDataPoints = make([]*sfxpb.DataPoint, 0, len(md.Metrics))

var err error

// Labels from Node and Resource.
Expand All @@ -120,6 +114,13 @@ func metricDataToSfxDataPoints(
continue
}

if sfxDataPoints == nil {
// Suppose all metrics has roughly similar number of timeseries
sfxDataPoints = make([]*sfxpb.DataPoint, 0, len(md.Metrics)*len(metric.Timeseries))
}

metricDataPoints := make([]*sfxpb.DataPoint, 0, len(metric.Timeseries))

// Build the fixed parts for this metrics from the descriptor.
descriptor := metric.MetricDescriptor
metricName := descriptor.Name
Expand Down Expand Up @@ -157,15 +158,15 @@ func metricDataToSfxDataPoints(
switch pv := dp.Value.(type) {
case *metricspb.Point_Int64Value:
sfxDataPoint.Value = sfxpb.Datum{IntValue: &pv.Int64Value}
sfxDataPoints = append(sfxDataPoints, sfxDataPoint)
metricDataPoints = append(metricDataPoints, sfxDataPoint)

case *metricspb.Point_DoubleValue:
sfxDataPoint.Value = sfxpb.Datum{DoubleValue: &pv.DoubleValue}
sfxDataPoints = append(sfxDataPoints, sfxDataPoint)
metricDataPoints = append(metricDataPoints, sfxDataPoint)

case *metricspb.Point_DistributionValue:
sfxDataPoints, err = appendDistributionValues(
sfxDataPoints,
metricDataPoints, err = appendDistributionValues(
metricDataPoints,
sfxDataPoint,
pv.DistributionValue)
if err != nil {
Expand All @@ -176,8 +177,8 @@ func metricDataToSfxDataPoints(
zap.String("metric", sfxDataPoint.Metric))
}
case *metricspb.Point_SummaryValue:
sfxDataPoints, err = appendSummaryValues(
sfxDataPoints,
metricDataPoints, err = appendSummaryValues(
metricDataPoints,
sfxDataPoint,
pv.SummaryValue)
if err != nil {
Expand All @@ -196,6 +197,12 @@ func metricDataToSfxDataPoints(

}
}

if metricTranslator != nil {
metricDataPoints = metricTranslator.TranslateDataPoints(logger, metricDataPoints)
}

sfxDataPoints = append(sfxDataPoints, metricDataPoints...)
}

return sfxDataPoints, numDroppedTimeSeries
Expand Down
198 changes: 184 additions & 14 deletions exporter/signalfxexporter/translation/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,27 @@ const (
// k8s.pod.network.io{direction="receive"} -> pod_network_receive_bytes_total{}
// k8s.pod.network.io{direction="transmit"} -> pod_network_transmit_bytes_total{}
ActionSplitMetric Action = "split_metric"

// ActionAggregateMetric aggregates metrics by dimensions provided in tr.Dimensions.
// It takes datapoints with name tr.MetricName and aggregates them to a smaller set keeping the same name.
// It drops all other dimensions other then those that match tr.Dimensions.
// If a datapoint doesn't have a dimension matchin tr.Dimensions, the datapoint will be dropped.
// tr.AggregationMethod is used to specify a method to aggregate the values.
// For example, having the following translation rule:
// - action: aggregate_metric
// metric_name: machine_cpu_cores
// aggregation_method: count
// dimensions:
// - host
// The following translations will be performed:
// Original datapoints:
// machine_cpu_cores{cpu="cpu1",host="host1"} 0.22
// machine_cpu_cores{cpu="cpu2",host="host1"} 0.11
// machine_cpu_cores{cpu="cpu1",host="host2"} 0.33
// Transformed datapoints:
// machine_cpu_cores{host="host1"} 2
// machine_cpu_cores{host="host2"} 1
ActionAggregateMetric Action = "aggregate_metric"
)

// MetricValueType is the enum to capture valid metric value types that can be converted
Expand All @@ -77,6 +98,14 @@ const (
MetricValueTypeDouble MetricValueType = "double"
)

// AggregationMethod is the enum used to capture aggregation method
type AggregationMethod string

const (
// AggregationMethodCount represents count aggregation method
AggregationMethodCount AggregationMethod = "count"
)

type Rule struct {
// Action specifies the translation action to be applied on metrics.
// This is a required field.
Expand Down Expand Up @@ -105,6 +134,14 @@ type Rule struct {
// TypesMapping is represents metric_name/metric_type key/value pairs,
// used by ActionConvertValues.
TypesMapping map[string]MetricValueType `mapstructure:"types_mapping"`

// AggregationMethod specifies method used by "aggregate_metric" translation rule
AggregationMethod AggregationMethod `mapstructure:"aggregation_method"`

// Dimensions is used by "aggregate_metric" translation rule to specify dimension keys
// that will be used to aggregate the metric across.
// Datapoints that don't have all the dimensions will be dropped.
Dimensions []string `mapstructure:"dimensions"`
}

type MetricTranslator struct {
Expand Down Expand Up @@ -178,6 +215,16 @@ func validateTranslationRules(rules []Rule) error {
return fmt.Errorf("invalid value type %q set for metric %q in \"types_mapping\"", v, k)
}
}
case ActionAggregateMetric:
if tr.MetricName == "" || tr.AggregationMethod == "" || len(tr.Dimensions) == 0 {
return fmt.Errorf("fields \"metric_name\", \"dimensions\", and \"aggregation_method\" "+
"are required for %q translation rule", tr.Action)
}
if tr.AggregationMethod != "count" {
return fmt.Errorf("invalid \"aggregation_method\": %q provided for %q translation rule",
tr.AggregationMethod, tr.Action)
}

default:
return fmt.Errorf("unknown \"action\" value: %q", tr.Action)
}
Expand All @@ -197,63 +244,99 @@ func createDimensionsMap(rules []Rule) map[string]string {
return nil
}

// TranslateDataPoints transforms datapoints to a format compatible with signalfx backend
// sfxDataPoints represents one metric converted to signalfx protobuf datapoints
func (mp *MetricTranslator) TranslateDataPoints(logger *zap.Logger, sfxDataPoints []*sfxpb.DataPoint) []*sfxpb.DataPoint {
processedDataPoints := sfxDataPoints

for _, tr := range mp.rules {
newDataPoints := []*sfxpb.DataPoint{}

for _, dp := range processedDataPoints {
switch tr.Action {
case ActionRenameDimensionKeys:
switch tr.Action {
case ActionRenameDimensionKeys:
for _, dp := range processedDataPoints {
for _, d := range dp.Dimensions {
if newKey, ok := tr.Mapping[d.Key]; ok {
d.Key = newKey
}
}
case ActionRenameMetrics:
}
case ActionRenameMetrics:
for _, dp := range processedDataPoints {
if newKey, ok := tr.Mapping[dp.Metric]; ok {
dp.Metric = newKey
}
case ActionMultiplyInt:
}
case ActionMultiplyInt:
for _, dp := range processedDataPoints {
if multiplier, ok := tr.ScaleFactorsInt[dp.Metric]; ok {
v := dp.GetValue().IntValue
if v != nil {
*v = *v * multiplier
}
}
case ActionDivideInt:
}
case ActionDivideInt:
for _, dp := range processedDataPoints {
if divisor, ok := tr.ScaleFactorsInt[dp.Metric]; ok {
v := dp.GetValue().IntValue
if v != nil {
*v = *v / divisor
}
}
case ActionMultiplyFloat:
}
case ActionMultiplyFloat:
for _, dp := range processedDataPoints {
if multiplier, ok := tr.ScaleFactorsFloat[dp.Metric]; ok {
v := dp.GetValue().DoubleValue
if v != nil {
*v = *v * multiplier
}
}
case ActionCopyMetrics:
}
case ActionCopyMetrics:
newDataPoints := []*sfxpb.DataPoint{}
for _, dp := range processedDataPoints {
if newMetric, ok := tr.Mapping[dp.Metric]; ok {
newDataPoint := proto.Clone(dp).(*sfxpb.DataPoint)
newDataPoint.Metric = newMetric
newDataPoints = append(newDataPoints, newDataPoint)
}
case ActionSplitMetric:
}
processedDataPoints = append(processedDataPoints, newDataPoints...)
case ActionSplitMetric:
for _, dp := range processedDataPoints {
if tr.MetricName == dp.Metric {
splitMetric(dp, tr.DimensionKey, tr.Mapping)
}
case ActionConvertValues:
}
case ActionConvertValues:
for _, dp := range processedDataPoints {
if newType, ok := tr.TypesMapping[dp.Metric]; ok {
convertMetricValue(logger, dp, newType)
}
}
case ActionAggregateMetric:
// NOTE: Based on the usage of TranslateDataPoints we can assume that the datapoints batch []*sfxpb.DataPoint
// represents only one metric and all the datapoints can be aggregated together.
var dpsToAggregate []*sfxpb.DataPoint
var otherDps []*sfxpb.DataPoint
for i, dp := range processedDataPoints {
if dp.Metric == tr.MetricName {
if dpsToAggregate == nil {
dpsToAggregate = make([]*sfxpb.DataPoint, 0, len(processedDataPoints)-i)
}
dpsToAggregate = append(dpsToAggregate, dp)
} else {
if otherDps == nil {
otherDps = make([]*sfxpb.DataPoint, 0, len(processedDataPoints)-i)
}
// This slice can contain additional datapoints from a different metric
// for example copied in a translation step before
otherDps = append(otherDps, dp)
}
}
aggregatedDps := aggregateDatapoints(logger, dpsToAggregate, tr.Dimensions, tr.AggregationMethod)
processedDataPoints = append(otherDps, aggregatedDps...)
}

processedDataPoints = append(processedDataPoints, newDataPoints...)
}

return processedDataPoints
Expand All @@ -266,6 +349,93 @@ func (mp *MetricTranslator) TranslateDimension(orig string) string {
return orig
}

// aggregateDatapoints aggregates datapoints assuming that they have
// the same Timestamp, MetricType, Metric and Source fields.
func aggregateDatapoints(
logger *zap.Logger,
dps []*sfxpb.DataPoint,
dimensionsKeys []string,
aggregation AggregationMethod,
) []*sfxpb.DataPoint {
if len(dps) == 0 {
return nil
}

// group datapoints by dimension values
dimValuesToDps := make(map[string][]*sfxpb.DataPoint, len(dps))
for i, dp := range dps {
aggregationKey, err := getAggregationKey(dp.Dimensions, dimensionsKeys)
if err != nil {
logger.Debug("datapoint is dropped", zap.String("metric", dp.Metric), zap.Error(err))
continue
}
if _, ok := dimValuesToDps[aggregationKey]; !ok {
// set slice capacity to the possible maximum = len(dps)-i to avoid reallocations
dimValuesToDps[aggregationKey] = make([]*sfxpb.DataPoint, 0, len(dps)-i)
}
dimValuesToDps[aggregationKey] = append(dimValuesToDps[aggregationKey], dp)
}

// Get aggregated results
result := make([]*sfxpb.DataPoint, 0, len(dimValuesToDps))
for _, dps := range dimValuesToDps {
dp := proto.Clone(dps[0]).(*sfxpb.DataPoint)
dp.Dimensions = filterDimensions(dp.Dimensions, dimensionsKeys)
switch aggregation {
case AggregationMethodCount:
gauge := sfxpb.MetricType_GAUGE
dp.MetricType = &gauge
value := int64(len(dps))
dp.Value = sfxpb.Datum{
IntValue: &value,
}
}
result = append(result, dp)
}

return result
}

// getAggregationKey composes an aggregation key based on provided dimensions.
// If all the dimensions found, the function returns an aggregationkey.
// If any dimension os not found the function returns an error.
func getAggregationKey(dimensions []*sfxpb.Dimension, dimensionsKeys []string) (string, error) {
const aggregationKeyDelimiter = "//"
var aggregationKey string
for _, dk := range dimensionsKeys {
var dimensionFound bool
for _, d := range dimensions {
if d.Key == dk {
// compose an aggregation key with "//" as delimiter
aggregationKey += d.Value + aggregationKeyDelimiter
dimensionFound = true
continue
}
}
if !dimensionFound {
return "", fmt.Errorf("dimension to aggregate by is not found: %q", dk)
}
}
return aggregationKey, nil
}

// filterDimensions returns list of dimension filtered by dimensionsKeys
func filterDimensions(dimensions []*sfxpb.Dimension, dimensionsKeys []string) []*sfxpb.Dimension {
if len(dimensions) == 0 || len(dimensionsKeys) == 0 {
return nil
}
result := make([]*sfxpb.Dimension, 0, len(dimensionsKeys))
for _, dk := range dimensionsKeys {
for _, d := range dimensions {
if d.Key == dk {
result = append(result, d)
continue
}
}
}
return result
}

// splitMetric renames a metric with "dimension key" == dimensionKey to mapping["dimension value"],
// datapoint not changed if not dimension found equal to dimensionKey:mapping->key.
func splitMetric(dp *sfxpb.DataPoint, dimensionKey string, mapping map[string]string) {
Expand Down

0 comments on commit fc3459a

Please sign in to comment.