Skip to content

Commit

Permalink
Change datapoint test
Browse files Browse the repository at this point in the history
  • Loading branch information
khanhntd committed Dec 27, 2022
1 parent 8372c05 commit ce3f710
Show file tree
Hide file tree
Showing 14 changed files with 553 additions and 940 deletions.
16 changes: 16 additions & 0 deletions .chloggen/support-export-quantiles-for-summary-metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awsemfexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Support exporting quantile for summary metrics

# One or more tracking issues related to the change
issues: [17265]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
2 changes: 2 additions & 0 deletions exporter/awsemfexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ The following exporter configuration parameters are supported.
| `dimension_rollup_option` | DimensionRollupOption is the option for metrics dimension rollup. Three options are available: `NoDimensionRollup`, `SingleDimensionRollupOnly` and `ZeroAndSingleDimensionRollup` |"ZeroAndSingleDimensionRollup" (Enable both zero dimension rollup and single dimension rollup)|
| `resource_to_telemetry_conversion` | "resource_to_telemetry_conversion" is the option for converting resource attributes to telemetry attributes. It has only one config onption- `enabled`. For metrics, if `enabled=true`, all the resource attributes will be converted to metric labels by default. See `Resource Attributes to Metric Labels` section below for examples. | `enabled=false` |
| `output_destination` | "output_destination" is an option to specify the EMFExporter output. Currently, two options are available. "cloudwatch" or "stdout" | `cloudwatch` |
| `detailed_metrics` | "detailed_metrics" is the options for preserving the OTEL metrics values instead of basic metrics (e.g instead of exporting quantile as a Statistical value , the EMF exporter will preserve the quantile's population | `false` |
| `parse_json_encoded_attr_values` | List of attribute keys whose corresponding values are JSON-encoded strings and will be converted to JSON structures in emf logs. For example, the attribute string value "{\\"x\\":5,\\"y\\":6}" will be converted to a json object: ```{"x": 5, "y": 6}``` | [ ] |
| [`metric_declarations`](#metric_declaration) | List of rules for filtering exported metrics and their dimensions. | [ ] |
| [`metric_descriptors`](#metric_descriptor) | List of rules for inserting or updating metric descriptors. | [ ]|


### metric_declaration
A metric_declaration section characterizes a rule to be used to set dimensions for exported metrics, filtered by the incoming metrics' labels and metric names.

Expand Down
7 changes: 5 additions & 2 deletions exporter/awsemfexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ package awsemfexporter // import "github.com/open-telemetry/opentelemetry-collec
import (
"errors"

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -79,6 +78,10 @@ type Config struct {
// If enabled, all the resource attributes will be converted to metric labels by default.
ResourceToTelemetrySettings resourcetotelemetry.Settings `mapstructure:"resource_to_telemetry_conversion"`

// DetailedMetrics is the options for preserving the OTEL metrics values instead of basic metrics (e.g instead of exporting quantile as
// a Statistical value , the EMF exporter will preserve the quantile's population )
DetailedMetrics bool `mapstructure:"detailed_metrics"`

// logger is the Logger used for writing error/warning logs
logger *zap.Logger
}
Expand Down
94 changes: 49 additions & 45 deletions exporter/awsemfexporter/datapoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.

/*
datapoint.go - calculate delta values or keep the value for each OTEL type (e.g calculate delta for Counter)
*/
package awsemfexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter"

import (
"fmt"
"log"
"strconv"
"time"

aws "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/metrics"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

const (
summaryCountSuffix = "_count"
summarySumSuffix = "_sum"
)

var (
Expand All @@ -45,18 +53,6 @@ func calculateSummaryDelta(prev *aws.MetricValue, val interface{}, timestampMs t
return summaryMetricEntry{summaryDelta, countDelta}, true
}

func mergeLabels(m deltaMetricMetadata, labels map[string]string) map[string]string {
result := map[string]string{
"namespace": m.namespace,
"logGroup": m.logGroup,
"logStream": m.logStream,
}
for k, v := range labels {
result[k] = v
}
return result
}

// dataPoint represents a processed metric data point
type dataPoint struct {
name string
Expand All @@ -71,11 +67,12 @@ type dataPoint struct {
// - pmetric.SummaryDataPointSlice
type dataPointSlice interface {
Len() int
// At gets the adjusted datapoint from the DataPointSlice at i-th index.
// CalculateDeltaDatapoints calculates the delta datapoint from the DataPointSlice at i-th index
// for some type (Counter, Summary)
// dataPoint: the adjusted data point
// retained: indicates whether the data point is valid for further process
// NOTE: It is an expensive call as it calculates the metric value.
CalculateDeltaDatapoints(i int, instrumentationLibraryName string) (dataPoint []dataPoint, retained bool)
CalculateDeltaDatapoints(i int, instrumentationLibraryName string, detailedMetrics bool) (dataPoint []dataPoint, retained bool)
}

// deltaMetricMetadata contains the metadata required to perform rate/delta calculation
Expand All @@ -87,6 +84,18 @@ type deltaMetricMetadata struct {
logStream string
}

func mergeLabels(m deltaMetricMetadata, labels map[string]string) map[string]string {
result := map[string]string{
"namespace": m.namespace,
"logGroup": m.logGroup,
"logStream": m.logStream,
}
for k, v := range labels {
result[k] = v
}
return result
}

// numberDataPointSlice is a wrapper for pmetric.NumberDataPointSlice
type numberDataPointSlice struct {
deltaMetricMetadata
Expand All @@ -95,6 +104,7 @@ type numberDataPointSlice struct {

// histogramDataPointSlice is a wrapper for pmetric.HistogramDataPointSlice
type histogramDataPointSlice struct {
// Todo:(khanhntd) Calculate delta value for count and sum value with histogram
deltaMetricMetadata
pmetric.HistogramDataPointSlice
}
Expand All @@ -111,7 +121,7 @@ type summaryMetricEntry struct {
}

// At retrieves the NumberDataPoint at the given index and performs rate/delta calculation if necessary.
func (dps numberDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationLibraryName string) ([]dataPoint, bool) {
func (dps numberDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationLibraryName string, detailedMetrics bool) ([]dataPoint, bool) {
metric := dps.NumberDataPointSlice.At(i)
labels := createLabels(metric.Attributes(), instrumentationLibraryName)
timestampMs := unixNanoToMilliseconds(metric.Timestamp())
Expand Down Expand Up @@ -146,7 +156,7 @@ func (dps numberDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationL
}

// At retrieves the HistogramDataPoint at the given index.
func (dps histogramDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationLibraryName string) ([]dataPoint, bool) {
func (dps histogramDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationLibraryName string, detailedMetrics bool) ([]dataPoint, bool) {
metric := dps.HistogramDataPointSlice.At(i)
labels := createLabels(metric.Attributes(), instrumentationLibraryName)
timestamp := unixNanoToMilliseconds(metric.Timestamp())
Expand All @@ -165,7 +175,7 @@ func (dps histogramDataPointSlice) CalculateDeltaDatapoints(i int, instrumentati
}

// At retrieves the SummaryDataPoint at the given index.
func (dps summaryDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationLibraryName string) ([]dataPoint, bool) {
func (dps summaryDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationLibraryName string, detailedMetrics bool) ([]dataPoint, bool) {
metric := dps.SummaryDataPointSlice.At(i)
labels := createLabels(metric.Attributes(), instrumentationLibraryName)
timestampMs := unixNanoToMilliseconds(metric.Timestamp())
Expand All @@ -185,35 +195,29 @@ func (dps summaryDataPointSlice) CalculateDeltaDatapoints(i int, instrumentation
sum = summaryMetricDelta.sum
count = summaryMetricDelta.count
}
debugFlags := false
if debugFlags {
metricVal := &cWMetricStats{
Count: count,
Sum: sum,
}
if quantileValues := metric.QuantileValues(); quantileValues.Len() > 0 {
metricVal.Min = quantileValues.At(0).Value()
metricVal.Max = quantileValues.At(quantileValues.Len() - 1).Value()
}
datapoints = append(datapoints, dataPoint{name: dps.metricName, value: metricVal, labels: labels, timestampMs: timestampMs})
} else {

if detailedMetrics {
// Instead of sending metrics as a Statistical Set (contains min,max, count, sum), the emfexporter will enrich the
// values by sending each quantile values as a datapoint (from quantile 0 ... 1)
values := metric.QuantileValues()
datapoints = append(datapoints, dataPoint{name: fmt.Sprint(dps.metricName, "_count"), value: count, labels: labels, timestampMs: timestampMs})
datapoints = append(datapoints, dataPoint{name: fmt.Sprint(dps.metricName, "_sum"), value: sum, labels: labels, timestampMs: timestampMs})
log.Printf("quantile count %v", dataPoint{name: fmt.Sprint(dps.metricName, "_count"), value: count, labels: labels, timestampMs: timestampMs})
log.Printf("quantile _sum %v", dataPoint{name: fmt.Sprint(dps.metricName, "_sum"), value: sum, labels: labels, timestampMs: timestampMs})
log.Printf("labels %v", labels)
datapoints = append(datapoints, dataPoint{name: fmt.Sprint(dps.metricName, summarySumSuffix), value: sum, labels: labels, timestampMs: timestampMs})
datapoints = append(datapoints, dataPoint{name: fmt.Sprint(dps.metricName, summaryCountSuffix), value: count, labels: labels, timestampMs: timestampMs})

for i := 0; i < values.Len(); i++ {
cLabels := maps.Clone(labels)
quantile := values.At(i)
log.Printf("quantile value that exists %v", strconv.FormatFloat(quantile.Quantile(), 'g', -1, 64))
labels["quantile"] = strconv.FormatFloat(quantile.Quantile(), 'g', -1, 64)
log.Printf("quantile value that exists %v", labels["quantile"])
datapoints = append(datapoints, dataPoint{name: dps.metricName, value: quantile.Value(), labels: labels, timestampMs: timestampMs})
cLabels["quantile"] = strconv.FormatFloat(quantile.Quantile(), 'g', -1, 64)
datapoints = append(datapoints, dataPoint{name: dps.metricName, value: quantile.Value(), labels: cLabels, timestampMs: timestampMs})

}
} else {
metricVal := &cWMetricStats{Count: count, Sum: sum}
if quantileValues := metric.QuantileValues(); quantileValues.Len() > 0 {
metricVal.Min = quantileValues.At(0).Value()
metricVal.Max = quantileValues.At(quantileValues.Len() - 1).Value()
}
datapoints = append(datapoints, dataPoint{name: dps.metricName, value: metricVal, labels: labels, timestampMs: timestampMs})
}
log.Printf("labels %v", datapoints)

return datapoints, retained
}
Expand All @@ -238,11 +242,11 @@ func createLabels(attributes pcommon.Map, instrLibName string) map[string]string
// getDataPoints retrieves data points from OT Metric.
func getDataPoints(pmd pmetric.Metric, metadata cWMetricMetadata, logger *zap.Logger) dataPointSlice {
metricMetadata := deltaMetricMetadata{
false,
pmd.Name(),
metadata.namespace,
metadata.logGroup,
metadata.logStream,
adjustToDelta: false,
metricName: pmd.Name(),
namespace: metadata.namespace,
logGroup: metadata.logGroup,
logStream: metadata.logStream,
}

var dps dataPointSlice = nil
Expand Down
Loading

0 comments on commit ce3f710

Please sign in to comment.