Skip to content

Commit

Permalink
Add aggregation capability to signalfx exporter translations
Browse files Browse the repository at this point in the history
We need simple aggregation capability to get machine_cpu_cores metric. For now only "count" aggregation method is needed
  • Loading branch information
dmitryax committed Jul 24, 2020
1 parent 2820d97 commit a7155e9
Show file tree
Hide file tree
Showing 5 changed files with 418 additions and 30 deletions.
2 changes: 1 addition & 1 deletion exporter/signalfxexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestCreateMetricsExporterWithDefaultTranslaitonRules(t *testing.T) {

// Validate that default translation rules are loaded
// Expected values has to be updated once default config changed
assert.Equal(t, 7, len(config.TranslationRules))
assert.Equal(t, 9, len(config.TranslationRules))
assert.Equal(t, translation.ActionRenameDimensionKeys, config.TranslationRules[0].Action)
assert.Equal(t, 15, len(config.TranslationRules[0].Mapping))
}
Expand Down
9 changes: 9 additions & 0 deletions exporter/signalfxexporter/translation/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,14 @@ translation_rules:
cpu.wait: int
cpu.softirq: int
cpu.nice: int
- action: copy_metrics
mapping:
cpu.idle: machine_cpu_cores
- action: aggregate_metric
metric_name: machine_cpu_cores
aggregation_method: count
dimension_key: host
`
)
37 changes: 22 additions & 15 deletions exporter/signalfxexporter/translation/metricdata_to_signalfxv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,23 +81,17 @@ func MetricDataToSignalFxV2(
metricTranslator *MetricTranslator,
md consumerdata.MetricsData,
) (sfxDataPoints []*sfxpb.DataPoint, numDroppedTimeSeries int) {
sfxDataPoints, numDroppedTimeSeries = metricDataToSfxDataPoints(logger, md)
if metricTranslator != nil {
sfxDataPoints = metricTranslator.TranslateDataPoints(logger, sfxDataPoints)
}
sfxDataPoints, numDroppedTimeSeries = metricDataToSfxDataPoints(logger, metricTranslator, md)

sanitizeDataPointDimensions(sfxDataPoints)
return
}

func metricDataToSfxDataPoints(
logger *zap.Logger,
metricTranslator *MetricTranslator,
md consumerdata.MetricsData,
) (sfxDataPoints []*sfxpb.DataPoint, numDroppedTimeSeries int) {

// The final number of data points is likely larger than len(md.Metrics)
// but at least that is expected.
sfxDataPoints = make([]*sfxpb.DataPoint, 0, len(md.Metrics))

var err error

// Labels from Node and Resource.
Expand All @@ -120,6 +114,13 @@ func metricDataToSfxDataPoints(
continue
}

if sfxDataPoints == nil {
// Suppose all metrics has roughly similar number of timeseries
sfxDataPoints = make([]*sfxpb.DataPoint, 0, len(md.Metrics)*len(metric.Timeseries))
}

metricDataPoints := make([]*sfxpb.DataPoint, 0, len(metric.Timeseries))

// Build the fixed parts for this metrics from the descriptor.
descriptor := metric.MetricDescriptor
metricName := descriptor.Name
Expand Down Expand Up @@ -157,15 +158,15 @@ func metricDataToSfxDataPoints(
switch pv := dp.Value.(type) {
case *metricspb.Point_Int64Value:
sfxDataPoint.Value = sfxpb.Datum{IntValue: &pv.Int64Value}
sfxDataPoints = append(sfxDataPoints, sfxDataPoint)
metricDataPoints = append(metricDataPoints, sfxDataPoint)

case *metricspb.Point_DoubleValue:
sfxDataPoint.Value = sfxpb.Datum{DoubleValue: &pv.DoubleValue}
sfxDataPoints = append(sfxDataPoints, sfxDataPoint)
metricDataPoints = append(metricDataPoints, sfxDataPoint)

case *metricspb.Point_DistributionValue:
sfxDataPoints, err = appendDistributionValues(
sfxDataPoints,
metricDataPoints, err = appendDistributionValues(
metricDataPoints,
sfxDataPoint,
pv.DistributionValue)
if err != nil {
Expand All @@ -176,8 +177,8 @@ func metricDataToSfxDataPoints(
zap.String("metric", sfxDataPoint.Metric))
}
case *metricspb.Point_SummaryValue:
sfxDataPoints, err = appendSummaryValues(
sfxDataPoints,
metricDataPoints, err = appendSummaryValues(
metricDataPoints,
sfxDataPoint,
pv.SummaryValue)
if err != nil {
Expand All @@ -196,6 +197,12 @@ func metricDataToSfxDataPoints(

}
}

if metricTranslator != nil {
metricDataPoints = metricTranslator.TranslateDataPoints(logger, metricDataPoints)
}

sfxDataPoints = append(sfxDataPoints, metricDataPoints...)
}

return sfxDataPoints, numDroppedTimeSeries
Expand Down
187 changes: 173 additions & 14 deletions exporter/signalfxexporter/translation/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,28 @@ const (
// k8s.pod.network.io{direction="receive"} -> pod_network_receive_bytes_total{}
// k8s.pod.network.io{direction="transmit"} -> pod_network_transmit_bytes_total{}
ActionSplitMetric Action = "split_metric"

// ActionAggregateMetric aggregates metrics by one dimension provided in tr.DimensionKey.
// It takes datapoints with name tr.MetricName and aggregates them to a smaller set keeping the same name.
// It drops all other dimensions if they have different values across aggregated datapoints,
// dimensions with the same values across aggregated datapoints are kept.
// tr.DimensionKey is always kept because it is used to aggregate datapoints by.
// If a datapoint doesn't have a dimension with tr.DimensionKey, it will be dropped.
// tr.AggregationMethod is used to specify a method to aggregate the values.
// For example, having the following translation rule:
// - action: aggregate_metric
// metric_name: machine_cpu_cores
// aggregation_method: count
// dimension_key: host
// The following translations will be performed:
// Original datapoints:
// machine_cpu_cores{cpu="cpu1",host="host1"} 0.22
// machine_cpu_cores{cpu="cpu2",host="host1"} 0.11
// machine_cpu_cores{cpu="cpu1",host="host2"} 0.33
// Transformed datapoints:
// machine_cpu_cores{host="host1"} 2
// machine_cpu_cores{host="host2"} 1
ActionAggregateMetric Action = "aggregate_metric"
)

// MetricValueType is the enum to capture valid metric value types that can be converted
Expand All @@ -77,6 +99,14 @@ const (
MetricValueTypeDouble MetricValueType = "double"
)

// AggregationMethod is the enum used to capture aggregation method
type AggregationMethod string

const (
// AggregationMethodCount represents count aggregation method
AggregationMethodCount AggregationMethod = "count"
)

type Rule struct {
// Action specifies the translation action to be applied on metrics.
// This is a required field.
Expand Down Expand Up @@ -105,6 +135,8 @@ type Rule struct {
// TypesMapping is represents metric_name/metric_type key/value pairs,
// used by ActionConvertValues.
TypesMapping map[string]MetricValueType `mapstructure:"types_mapping"`

AggregationMethod AggregationMethod `mapstructure:"aggregation_method"`
}

type MetricTranslator struct {
Expand Down Expand Up @@ -178,6 +210,16 @@ func validateTranslationRules(rules []Rule) error {
return fmt.Errorf("invalid value type %q set for metric %q in \"types_mapping\"", v, k)
}
}
case ActionAggregateMetric:
if tr.MetricName == "" || tr.DimensionKey == "" || tr.AggregationMethod == "" {
return fmt.Errorf("fields \"metric_name\", \"dimension_key\", and \"aggregation_method\" "+
"are required for %q translation rule", tr.Action)
}
if tr.AggregationMethod != "count" {
return fmt.Errorf("invalid \"aggregation_method\": %q provided for %q translation rule",
tr.AggregationMethod, tr.Action)
}

default:
return fmt.Errorf("unknown \"action\" value: %q", tr.Action)
}
Expand All @@ -197,63 +239,99 @@ func createDimensionsMap(rules []Rule) map[string]string {
return nil
}

// TranslateDataPoints transforms datapoints to a format compatible with signalfx backend
// sfxDataPoints represents one metric converted to signalfx protobuf datapoints
func (mp *MetricTranslator) TranslateDataPoints(logger *zap.Logger, sfxDataPoints []*sfxpb.DataPoint) []*sfxpb.DataPoint {
processedDataPoints := sfxDataPoints

for _, tr := range mp.rules {
newDataPoints := []*sfxpb.DataPoint{}

for _, dp := range processedDataPoints {
switch tr.Action {
case ActionRenameDimensionKeys:
switch tr.Action {
case ActionRenameDimensionKeys:
for _, dp := range processedDataPoints {
for _, d := range dp.Dimensions {
if newKey, ok := tr.Mapping[d.Key]; ok {
d.Key = newKey
}
}
case ActionRenameMetrics:
}
case ActionRenameMetrics:
for _, dp := range processedDataPoints {
if newKey, ok := tr.Mapping[dp.Metric]; ok {
dp.Metric = newKey
}
case ActionMultiplyInt:
}
case ActionMultiplyInt:
for _, dp := range processedDataPoints {
if multiplier, ok := tr.ScaleFactorsInt[dp.Metric]; ok {
v := dp.GetValue().IntValue
if v != nil {
*v = *v * multiplier
}
}
case ActionDivideInt:
}
case ActionDivideInt:
for _, dp := range processedDataPoints {
if divisor, ok := tr.ScaleFactorsInt[dp.Metric]; ok {
v := dp.GetValue().IntValue
if v != nil {
*v = *v / divisor
}
}
case ActionMultiplyFloat:
}
case ActionMultiplyFloat:
for _, dp := range processedDataPoints {
if multiplier, ok := tr.ScaleFactorsFloat[dp.Metric]; ok {
v := dp.GetValue().DoubleValue
if v != nil {
*v = *v * multiplier
}
}
case ActionCopyMetrics:
}
case ActionCopyMetrics:
newDataPoints := []*sfxpb.DataPoint{}
for _, dp := range processedDataPoints {
if newMetric, ok := tr.Mapping[dp.Metric]; ok {
newDataPoint := proto.Clone(dp).(*sfxpb.DataPoint)
newDataPoint.Metric = newMetric
newDataPoints = append(newDataPoints, newDataPoint)
}
case ActionSplitMetric:
}
processedDataPoints = append(processedDataPoints, newDataPoints...)
case ActionSplitMetric:
for _, dp := range processedDataPoints {
if tr.MetricName == dp.Metric {
splitMetric(dp, tr.DimensionKey, tr.Mapping)
}
case ActionConvertValues:
}
case ActionConvertValues:
for _, dp := range processedDataPoints {
if newType, ok := tr.TypesMapping[dp.Metric]; ok {
convertMetricValue(logger, dp, newType)
}
}
case ActionAggregateMetric:
// NOTE: Based on the usage of TranslateDataPoints we can assume that the datapoints batch []*sfxpb.DataPoint
// represents only one metric and all the datapoints can be aggregated together.
var dpsToAggregate []*sfxpb.DataPoint
var otherDps []*sfxpb.DataPoint
for i, dp := range processedDataPoints {
if dp.Metric == tr.MetricName {
if dpsToAggregate == nil {
dpsToAggregate = make([]*sfxpb.DataPoint, 0, len(processedDataPoints)-i)
}
dpsToAggregate = append(dpsToAggregate, dp)
} else {
if otherDps == nil {
otherDps = make([]*sfxpb.DataPoint, 0, len(processedDataPoints)-i)
}
// This slice can contain additional datapoints from a different metric
// for example copied in a translation step before
otherDps = append(otherDps, dp)
}
}
aggregatedDps := aggregateDatapoints(logger, dpsToAggregate, tr.DimensionKey, tr.AggregationMethod)
processedDataPoints = append(otherDps, aggregatedDps...)
}

processedDataPoints = append(processedDataPoints, newDataPoints...)
}

return processedDataPoints
Expand All @@ -266,6 +344,87 @@ func (mp *MetricTranslator) TranslateDimension(orig string) string {
return orig
}

// aggregateDatapoints aggregates datapoints assuming that they have
// the same Timestamp, MetricType, Metric and Source fields.
func aggregateDatapoints(
logger *zap.Logger,
dps []*sfxpb.DataPoint,
dimensionKey string,
aggregation AggregationMethod,
) []*sfxpb.DataPoint {
if len(dps) == 0 {
return nil
}

// group datapoints by dimension values
dimValToDps := make(map[string][]*sfxpb.DataPoint, len(dps))
for i, dp := range dps {
var dimensionFound bool
for _, d := range dp.Dimensions {
if d.Key == dimensionKey {
if _, ok := dimValToDps[d.Value]; !ok {
// set slice capacity to the possible maximum = len(dps)-i to avoid reallocations
dimValToDps[d.Value] = make([]*sfxpb.DataPoint, 0, len(dps)-i)
}
dimValToDps[d.Value] = append(dimValToDps[d.Value], dp)
dimensionFound = true
}
}
if !dimensionFound {
logger.Debug("dimension to aggregate by is not found, datapoint is dropped",
zap.String("metric", dp.Metric), zap.String("dimension", dimensionKey))
}
}

// Get aggregated results
result := make([]*sfxpb.DataPoint, 0, len(dimValToDps))
for _, dps := range dimValToDps {
dp := proto.Clone(dps[0]).(*sfxpb.DataPoint)
dp.Dimensions = getCommonDimensions(dps)
switch aggregation {
case AggregationMethodCount:
gauge := sfxpb.MetricType_GAUGE
dp.MetricType = &gauge
value := int64(len(dps))
dp.Value = sfxpb.Datum{
IntValue: &value,
}
}
result = append(result, dp)
}

return result
}

// getCommonDimensions returns list of dimension that are the same across provided datapoints
func getCommonDimensions(dps []*sfxpb.DataPoint) []*sfxpb.Dimension {
if len(dps) == 0 {
return nil
}

commonDimensions := make([]*sfxpb.Dimension, len(dps[0].Dimensions))
copy(commonDimensions, dps[0].Dimensions)

for _, dp := range dps[1:] {
dimensions := commonDimensions
commonDimensions = make([]*sfxpb.Dimension, 0, len(dimensions))
for _, cd := range dimensions {
var match bool
for _, d := range dp.Dimensions {
if d.Key == cd.Key && d.Value == cd.Value {
match = true
break
}
}
if match {
commonDimensions = append(commonDimensions, cd)
}
}
}

return commonDimensions
}

// splitMetric renames a metric with "dimension key" == dimensionKey to mapping["dimension value"],
// datapoint not changed if not dimension found equal to dimensionKey:mapping->key.
func splitMetric(dp *sfxpb.DataPoint, dimensionKey string, mapping map[string]string) {
Expand Down

0 comments on commit a7155e9

Please sign in to comment.