Skip to content

Commit

Permalink
[processor/tailsampling] refactor config parsing to use one shared im…
Browse files Browse the repository at this point in the history
…plementation
  • Loading branch information
tim-oster committed Jun 23, 2022
1 parent 1c6ebfe commit c5ee68f
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 188 deletions.
29 changes: 4 additions & 25 deletions processor/tailsamplingprocessor/and_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,16 @@
package tailsamplingprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor"

import (
"fmt"

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling"
)

func getNewAndPolicy(logger *zap.Logger, config AndCfg) (sampling.PolicyEvaluator, error) {
func getNewAndPolicy(logger *zap.Logger, config *AndCfg) (sampling.PolicyEvaluator, error) {
var subPolicyEvaluators []sampling.PolicyEvaluator
for i := range config.SubPolicyCfg {
policyCfg := config.SubPolicyCfg[i]
policy, _ := getAndSubPolicyEvaluator(logger, &policyCfg)
policyCfg := &config.SubPolicyCfg[i]
policy, _ := getAndSubPolicyEvaluator(logger, policyCfg)
subPolicyEvaluators = append(subPolicyEvaluators, policy)
}
return sampling.NewAnd(logger, subPolicyEvaluators), nil
Expand All @@ -35,26 +33,7 @@ func getNewAndPolicy(logger *zap.Logger, config AndCfg) (sampling.PolicyEvaluato
// Return instance of and sub-policy
func getAndSubPolicyEvaluator(logger *zap.Logger, cfg *AndSubPolicyCfg) (sampling.PolicyEvaluator, error) {
switch cfg.Type {
case AlwaysSample:
return sampling.NewAlwaysSample(logger), nil
case NumericAttribute:
nafCfg := cfg.NumericAttributeCfg
return sampling.NewNumericAttributeFilter(logger, nafCfg.Key, nafCfg.MinValue, nafCfg.MaxValue), nil
case StringAttribute:
safCfg := cfg.StringAttributeCfg
return sampling.NewStringAttributeFilter(logger, safCfg.Key, safCfg.Values, safCfg.EnabledRegexMatching, safCfg.CacheMaxSize, safCfg.InvertMatch), nil
case RateLimiting:
rlfCfg := cfg.RateLimitingCfg
return sampling.NewRateLimiting(logger, rlfCfg.SpansPerSecond), nil
case StatusCode:
return sampling.NewStatusCodeFilter(logger, cfg.StatusCodeCfg.StatusCodes)
case Probabilistic:
pfCfg := cfg.ProbabilisticCfg
return sampling.NewProbabilisticSampler(logger, pfCfg.HashSalt, pfCfg.SamplingPercentage), nil
case TraceState:
tsfCfg := cfg.TraceStateCfg
return sampling.NewTraceStateFilter(logger, tsfCfg.Key, tsfCfg.Values), nil
default:
return nil, fmt.Errorf("unknown sampling policy type %s", cfg.Type)
return getSharedPolicyEvaluator(logger, &cfg.sharedPolicyCfg)
}
}
42 changes: 7 additions & 35 deletions processor/tailsamplingprocessor/composite_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,17 @@
package tailsamplingprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor"

import (
"fmt"

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling"
)

func getNewCompositePolicy(logger *zap.Logger, config CompositeCfg) (sampling.PolicyEvaluator, error) {
func getNewCompositePolicy(logger *zap.Logger, config *CompositeCfg) (sampling.PolicyEvaluator, error) {
var subPolicyEvalParams []sampling.SubPolicyEvalParams
rateAllocationsMap := getRateAllocationMap(config)
for i := range config.SubPolicyCfg {
policyCfg := config.SubPolicyCfg[i]
policy, _ := getSubPolicyEvaluator(logger, &policyCfg)
policyCfg := &config.SubPolicyCfg[i]
policy, _ := getCompositeSubPolicyEvaluator(logger, policyCfg)

evalParams := sampling.SubPolicyEvalParams{
Evaluator: policy,
Expand All @@ -39,7 +37,7 @@ func getNewCompositePolicy(logger *zap.Logger, config CompositeCfg) (sampling.Po
}

// Apply rate allocations to the sub-policies
func getRateAllocationMap(config CompositeCfg) map[string]float64 {
func getRateAllocationMap(config *CompositeCfg) map[string]float64 {
rateAllocationsMap := make(map[string]float64)
maxTotalSPS := float64(config.MaxTotalSpansPerSecond)
// Default SPS determined by equally diving number of sub policies
Expand All @@ -55,37 +53,11 @@ func getRateAllocationMap(config CompositeCfg) map[string]float64 {
}

// Return instance of composite sub-policy
func getSubPolicyEvaluator(logger *zap.Logger, cfg *SubPolicyCfg) (sampling.PolicyEvaluator, error) {
func getCompositeSubPolicyEvaluator(logger *zap.Logger, cfg *CompositeSubPolicyCfg) (sampling.PolicyEvaluator, error) {
switch cfg.Type {
case AlwaysSample:
return sampling.NewAlwaysSample(logger), nil
case Latency:
lfCfg := cfg.LatencyCfg
return sampling.NewLatency(logger, lfCfg.ThresholdMs), nil
case NumericAttribute:
nafCfg := cfg.NumericAttributeCfg
return sampling.NewNumericAttributeFilter(logger, nafCfg.Key, nafCfg.MinValue, nafCfg.MaxValue), nil
case Probabilistic:
pfCfg := cfg.ProbabilisticCfg
return sampling.NewProbabilisticSampler(logger, pfCfg.HashSalt, pfCfg.SamplingPercentage), nil
case StatusCode:
scCfg := cfg.StatusCodeCfg
return sampling.NewStatusCodeFilter(logger, scCfg.StatusCodes)
case StringAttribute:
safCfg := cfg.StringAttributeCfg
return sampling.NewStringAttributeFilter(logger, safCfg.Key, safCfg.Values, safCfg.EnabledRegexMatching, safCfg.CacheMaxSize, safCfg.InvertMatch), nil
case RateLimiting:
rlfCfg := cfg.RateLimitingCfg
return sampling.NewRateLimiting(logger, rlfCfg.SpansPerSecond), nil
case And:
return getNewAndPolicy(logger, cfg.AndCfg)
case SpanCount:
scCfg := cfg.SpanCountCfg
return sampling.NewSpanCount(logger, scCfg.MinSpans), nil
case TraceState:
tsfCfg := cfg.TraceStateCfg
return sampling.NewTraceStateFilter(logger, tsfCfg.Key, tsfCfg.Values), nil
return getNewAndPolicy(logger, &cfg.AndCfg)
default:
return nil, fmt.Errorf("unknown sampling policy type %s", cfg.Type)
return getSharedPolicyEvaluator(logger, &cfg.sharedPolicyCfg)
}
}
68 changes: 43 additions & 25 deletions processor/tailsamplingprocessor/composite_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,48 +31,66 @@ func TestCompositeHelper(t *testing.T) {
ExpectedNewTracesPerSec: 10,
PolicyCfgs: []PolicyCfg{
{
Name: "composite-policy-1",
Type: Composite,
sharedPolicyCfg: sharedPolicyCfg{
Name: "composite-policy-1",
Type: Composite,
},
CompositeCfg: CompositeCfg{
MaxTotalSpansPerSecond: 1000,
PolicyOrder: []string{"test-composite-policy-1", "test-composite-policy-2", "test-composite-policy-3", "test-composite-policy-4", "test-composite-policy-5", "test-composite-policy-6", "test-composite-policy-7", "test-composite-policy-8"},
SubPolicyCfg: []SubPolicyCfg{
SubPolicyCfg: []CompositeSubPolicyCfg{
{
Name: "test-composite-policy-1",
Type: NumericAttribute,
NumericAttributeCfg: NumericAttributeCfg{Key: "key1", MinValue: 50, MaxValue: 100},
sharedPolicyCfg: sharedPolicyCfg{
Name: "test-composite-policy-1",
Type: NumericAttribute,
NumericAttributeCfg: NumericAttributeCfg{Key: "key1", MinValue: 50, MaxValue: 100},
},
},
{
Name: "test-composite-policy-2",
Type: StringAttribute,
StringAttributeCfg: StringAttributeCfg{Key: "key2", Values: []string{"value1", "value2"}},
sharedPolicyCfg: sharedPolicyCfg{
Name: "test-composite-policy-2",
Type: StringAttribute,
StringAttributeCfg: StringAttributeCfg{Key: "key2", Values: []string{"value1", "value2"}},
},
},
{
Name: "test-composite-policy-3",
Type: RateLimiting,
RateLimitingCfg: RateLimitingCfg{SpansPerSecond: 10},
sharedPolicyCfg: sharedPolicyCfg{
Name: "test-composite-policy-3",
Type: RateLimiting,
RateLimitingCfg: RateLimitingCfg{SpansPerSecond: 10},
},
},
{
Name: "test-composite-policy-4",
Type: Probabilistic,
ProbabilisticCfg: ProbabilisticCfg{HashSalt: "salt", SamplingPercentage: 10},
sharedPolicyCfg: sharedPolicyCfg{
Name: "test-composite-policy-4",
Type: Probabilistic,
ProbabilisticCfg: ProbabilisticCfg{HashSalt: "salt", SamplingPercentage: 10},
},
},
{
Name: "test-composite-policy-5",
Type: Latency,
LatencyCfg: LatencyCfg{ThresholdMs: 10},
sharedPolicyCfg: sharedPolicyCfg{
Name: "test-composite-policy-5",
Type: Latency,
LatencyCfg: LatencyCfg{ThresholdMs: 10},
},
},
{
Name: "test-composite-policy-6",
Type: StatusCode,
StatusCodeCfg: StatusCodeCfg{StatusCodes: []string{"200", "404"}},
sharedPolicyCfg: sharedPolicyCfg{
Name: "test-composite-policy-6",
Type: StatusCode,
StatusCodeCfg: StatusCodeCfg{StatusCodes: []string{"200", "404"}},
},
},
{
Name: "test-composite-policy-7",
Type: AlwaysSample,
sharedPolicyCfg: sharedPolicyCfg{
Name: "test-composite-policy-7",
Type: AlwaysSample,
},
},
{
Name: "test-composite-policy-8",
sharedPolicyCfg: sharedPolicyCfg{
Name: "test-composite-policy-8",
},
},
},
RateAllocation: []RateAllocationCfg{
Expand All @@ -90,7 +108,7 @@ func TestCompositeHelper(t *testing.T) {
},
}
rlfCfg := cfg.PolicyCfgs[0].CompositeCfg
composite, e := getNewCompositePolicy(zap.NewNop(), rlfCfg)
composite, e := getNewCompositePolicy(zap.NewNop(), &rlfCfg)
require.NotNil(t, composite)
require.NotNil(t, cfg.ProcessorSettings)
require.Equal(t, 10*time.Second, cfg.DecisionWait)
Expand Down
73 changes: 22 additions & 51 deletions processor/tailsamplingprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,15 @@ const (
Composite PolicyType = "composite"
// And allows defining a And policy, combining the other policies in one
And PolicyType = "and"
// Span Count sample traces that are have more spans per Trace than a given threshold.
// SpanCount sample traces that are have more spans per Trace than a given threshold.
SpanCount PolicyType = "span_count"
// TraceState sample traces with specified values by the given key
TraceState PolicyType = "trace_state"
)

// SubPolicyCfg holds the common configuration to all policies under composite policy.
type SubPolicyCfg struct {
// sharedPolicyCfg holds the common configuration to all policies that are used in derivative policy configurations
// such as the and & composite policies.
type sharedPolicyCfg struct {
// Name given to the instance of the policy to make easy to identify it in metrics and logs.
Name string `mapstructure:"name"`
// Type of the policy this will be used to match the proper configuration of the policy.
Expand All @@ -68,35 +69,23 @@ type SubPolicyCfg struct {
StringAttributeCfg StringAttributeCfg `mapstructure:"string_attribute"`
// Configs for rate limiting filter sampling policy evaluator.
RateLimitingCfg RateLimitingCfg `mapstructure:"rate_limiting"`
// Configs for and policy evaluator.
AndCfg AndCfg `mapstructure:"and"`
// Configs for span counter filter sampling policy evaluator.
// Configs for span count filter sampling policy evaluator.
SpanCountCfg SpanCountCfg `mapstructure:"span_count"`
// Configs for trace_state policy evaluator.
// Configs for defining trace_state policy
TraceStateCfg TraceStateCfg `mapstructure:"trace_state"`
}

// CompositeSubPolicyCfg holds the common configuration to all policies under composite policy.
type CompositeSubPolicyCfg struct {
sharedPolicyCfg `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct

// Configs for and policy evaluator.
AndCfg AndCfg `mapstructure:"and"`
}

// AndSubPolicyCfg holds the common configuration to all policies under and policy.
type AndSubPolicyCfg struct {
// Name given to the instance of the policy to make easy to identify it in metrics and logs.
Name string `mapstructure:"name"`
// Type of the policy this will be used to match the proper configuration of the policy.
Type PolicyType `mapstructure:"type"`
// Configs for numeric attribute filter sampling policy evaluator.
NumericAttributeCfg NumericAttributeCfg `mapstructure:"numeric_attribute"`
// Configs for probabilistic sampling policy evaluator.
ProbabilisticCfg ProbabilisticCfg `mapstructure:"probabilistic"`
// Configs for string attribute filter sampling policy evaluator.
StringAttributeCfg StringAttributeCfg `mapstructure:"string_attribute"`
// Configs for rate limiting filter sampling policy evaluator.
RateLimitingCfg RateLimitingCfg `mapstructure:"rate_limiting"`
// Configs for latency filter sampling policy evaluator.
LatencyCfg LatencyCfg `mapstructure:"latency"`
// Configs for status code filter sampling policy evaluator.
StatusCodeCfg StatusCodeCfg `mapstructure:"status_code"`
// Configs for span counter filter sampling policy evaluator.
SpanCountCfg SpanCountCfg `mapstructure:"span_count"`
// Configs for trace_state filter sampling policy evaluator
TraceStateCfg TraceStateCfg `mapstructure:"trace_state"`
sharedPolicyCfg `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
}

type TraceStateCfg struct {
Expand All @@ -113,10 +102,10 @@ type AndCfg struct {
// CompositeCfg holds the configurable settings to create a composite
// sampling policy evaluator.
type CompositeCfg struct {
MaxTotalSpansPerSecond int64 `mapstructure:"max_total_spans_per_second"`
PolicyOrder []string `mapstructure:"policy_order"`
SubPolicyCfg []SubPolicyCfg `mapstructure:"composite_sub_policy"`
RateAllocation []RateAllocationCfg `mapstructure:"rate_allocation"`
MaxTotalSpansPerSecond int64 `mapstructure:"max_total_spans_per_second"`
PolicyOrder []string `mapstructure:"policy_order"`
SubPolicyCfg []CompositeSubPolicyCfg `mapstructure:"composite_sub_policy"`
RateAllocation []RateAllocationCfg `mapstructure:"rate_allocation"`
}

// RateAllocationCfg used within composite policy
Expand All @@ -127,30 +116,12 @@ type RateAllocationCfg struct {

// PolicyCfg holds the common configuration to all policies.
type PolicyCfg struct {
// Name given to the instance of the policy to make easy to identify it in metrics and logs.
Name string `mapstructure:"name"`
// Type of the policy this will be used to match the proper configuration of the policy.
Type PolicyType `mapstructure:"type"`
// Configs for latency filter sampling policy evaluator.
LatencyCfg LatencyCfg `mapstructure:"latency"`
// Configs for numeric attribute filter sampling policy evaluator.
NumericAttributeCfg NumericAttributeCfg `mapstructure:"numeric_attribute"`
// Configs for probabilistic sampling policy evaluator.
ProbabilisticCfg ProbabilisticCfg `mapstructure:"probabilistic"`
// Configs for status code filter sampling policy evaluator.
StatusCodeCfg StatusCodeCfg `mapstructure:"status_code"`
// Configs for string attribute filter sampling policy evaluator.
StringAttributeCfg StringAttributeCfg `mapstructure:"string_attribute"`
// Configs for rate limiting filter sampling policy evaluator.
RateLimitingCfg RateLimitingCfg `mapstructure:"rate_limiting"`
sharedPolicyCfg `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct

// Configs for defining composite policy
CompositeCfg CompositeCfg `mapstructure:"composite"`
// Configs for defining and policy
AndCfg AndCfg `mapstructure:"and"`
// Configs for span count filter sampling policy evaluator.
SpanCountCfg SpanCountCfg `mapstructure:"span_count"`
// Configs for defining trace_state policy
TraceStateCfg TraceStateCfg `mapstructure:"trace_state"`
}

// LatencyCfg holds the configurable settings to create a latency filter sampling policy
Expand Down
Loading

0 comments on commit c5ee68f

Please sign in to comment.