Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Finer-grained control on allowed policies and aggregation types
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Oct 10, 2017
1 parent 2950d4b commit cdf29d4
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 163 deletions.
8 changes: 6 additions & 2 deletions policy/aggregation_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,12 +438,16 @@ func (id AggregationID) Merge(other AggregationID) (AggregationID, bool) {
return id, merged
}

// AggregationTypes returns the aggregation types defined by the id.
func (id AggregationID) AggregationTypes() (AggregationTypes, error) {
return NewAggregationIDDecompressor().Decompress(id)
}

// String for debugging.
func (id AggregationID) String() string {
aggTypes, err := NewAggregationIDDecompressor().Decompress(id)
aggTypes, err := id.AggregationTypes()
if err != nil {
return fmt.Sprintf("[invalid AggregationID: %v]", err)
}

return aggTypes.String()
}
26 changes: 14 additions & 12 deletions rules/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (v *validator) validateMappingRules(rules MappingRules) error {
// Validate that the policies are valid.
t := v.opts.MetricTypeFn()(view.Filters)
for _, p := range view.Policies {
if err := v.validatePolicy(p, t); err != nil {
if err := v.validatePolicy(t, p); err != nil {
return err
}
}
Expand Down Expand Up @@ -134,7 +134,7 @@ func (v *validator) validateRollupRules(rules RollupRules) error {
t := v.opts.MetricTypeFn()(view.Filters)
for _, target := range view.Targets {
for _, p := range target.Policies {
if err := v.validatePolicy(p, t); err != nil {
if err := v.validatePolicy(t, p); err != nil {
return err
}
}
Expand All @@ -153,22 +153,24 @@ func (v *validator) validateFilters(f map[string]string) error {
return nil
}

func (v *validator) validatePolicy(p policy.Policy, t metric.Type) error {
// Validate policy resolution.
resolution := p.Resolution().Window
if minResolution := v.opts.MinPolicyResolutionFor(t); resolution < minResolution {
return fmt.Errorf("policy resolution %v is smaller than the minimum resolution supported (%v) for metric type %v", resolution, minResolution, t)
}
if maxResolution := v.opts.MaxPolicyResolutionFor(t); resolution > maxResolution {
return fmt.Errorf("policy resolution %v is larger than the maximum resolution supported (%v) for metric type %v", resolution, maxResolution, t)
func (v *validator) validatePolicy(t metric.Type, p policy.Policy) error {
// Validate storage policy.
if !v.opts.IsAllowedStoragePolicyFor(t, p.StoragePolicy) {
return fmt.Errorf("storage policy %v is not allowed for metric type %v", p.StoragePolicy, t)
}

// Validate aggregation function.
if isDefaultAggFn := p.AggregationID.IsDefault(); isDefaultAggFn {
return nil
}
if customAggFnEnabled := v.opts.CustomAggregationFunctionEnabledFor(t); !customAggFnEnabled {
return fmt.Errorf("policy %v contains unsupported custom aggregation function %v for metric type %v", p, p.AggregationID, t)
aggTypes, err := p.AggregationID.AggregationTypes()
if err != nil {
return err
}
for _, aggType := range aggTypes {
if !v.opts.IsAllowedCustomAggregationTypeFor(t, aggType) {
return fmt.Errorf("aggregation type %v is not allowed for metric type %v", aggType, t)
}
}

return nil
Expand Down
172 changes: 65 additions & 107 deletions rules/validator_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,118 +21,85 @@
package rules

import (
"time"

"github.com/m3db/m3metrics/metric"
)

const (
defaultDefaultCustomAggFunctionsEnabled = false
defaultDefaultMinPolicyResolution = time.Second
defaultDefaultMaxPolicyResolution = time.Hour
"github.com/m3db/m3metrics/policy"
)

// MetricTypeFn determines the metric type based on a set of tag based filters.
type MetricTypeFn func(tagFilters map[string]string) metric.Type

// ValidatorOptions provide a set of options for the validator.
type ValidatorOptions interface {
// SetDefaultCustomAggregationFunctionEnabled sets whether custom aggregation functions
// are enabled by default.
SetDefaultCustomAggregationFunctionEnabled(value bool) ValidatorOptions
// SetDefaultAllowedStoragePolicies sets the default list of allowed storage policies.
SetDefaultAllowedStoragePolicies(value []policy.StoragePolicy) ValidatorOptions

// DefaultCustomAggregationFunctionEnabled returns whether custom aggregation functions
// are enabled by default.
DefaultCustomAggregationFunctionEnabled() bool
// SetDefaultAllowedCustomAggregationTypes sets the default list of allowed custom
// aggregation types.
SetDefaultAllowedCustomAggregationTypes(value policy.AggregationTypes) ValidatorOptions

// SetDefaultMinPolicyResolution sets the default minimum policy resolution.
SetDefaultMinPolicyResolution(value time.Duration) ValidatorOptions
// SetAllowedStoragePoliciesFor sets the list of allowed storage policies for a given metric type.
SetAllowedStoragePoliciesFor(t metric.Type, policies []policy.StoragePolicy) ValidatorOptions

// DefaultMinPolicyResolution returns the default minimum policy resolution.
DefaultMinPolicyResolution() time.Duration

// SetDefaultMaxPolicyResolution sets the default maximum policy resolution.
SetDefaultMaxPolicyResolution(value time.Duration) ValidatorOptions

// DefaultMaxPolicyResolution returns the default maximum policy resolution.
DefaultMaxPolicyResolution() time.Duration
// SetAllowedCustomAggregationTypesFor sets the list of allowed custom aggregation
// types for a given metric type.
SetAllowedCustomAggregationTypesFor(t metric.Type, aggTypes policy.AggregationTypes) ValidatorOptions

// SetMetricTypeFn sets the metric type function.
SetMetricTypeFn(value MetricTypeFn) ValidatorOptions

// MetricTypeFn returns the metric type function.
MetricTypeFn() MetricTypeFn

// SetCustomAggregationFunctionEnabledFor sets whether custom aggregation functions
// are enabled for a given metric type.
SetCustomAggregationFunctionEnabledFor(t metric.Type, value bool) ValidatorOptions

// CustomAggregationFunctionEnabledFor returns whether custom aggregation functions
// are enabled for a given metric type.
CustomAggregationFunctionEnabledFor(t metric.Type) bool
// IsAllowedStoragePolicyFor determines whether a given storage policy is allowed for the
// given metric type.
IsAllowedStoragePolicyFor(t metric.Type, p policy.StoragePolicy) bool

// SetMinPolicyResolutionFor sets the minimum policy resolution for a given metric type.
SetMinPolicyResolutionFor(t metric.Type, value time.Duration) ValidatorOptions

// MinPolicyResolutionFor returns the minimum policy resolution for a given metric type.
MinPolicyResolutionFor(t metric.Type) time.Duration

// SetMaxPolicyResolutionFor sets the maximum policy resolution for a given metric type.
SetMaxPolicyResolutionFor(t metric.Type, value time.Duration) ValidatorOptions

// MaxPolicyResolutionFor returns the maximum policy resolution for a given metric type.
MaxPolicyResolutionFor(t metric.Type) time.Duration
// IsAllowedCustomAggregationTypeFor determines whether a given aggregation type is allowed for
// the given metric type.
IsAllowedCustomAggregationTypeFor(t metric.Type, aggType policy.AggregationType) bool
}

type validationMetadata struct {
customAggFunctionsEnabled bool
minPolicyResolution time.Duration
maxPolicyResolution time.Duration
allowedStoragePolicies map[policy.StoragePolicy]struct{}
allowedCustomAggTypes map[policy.AggregationType]struct{}
}

type validatorOptions struct {
defaultCustomAggFunctionsEnabled bool
defaultMinPolicyResolution time.Duration
defaultMaxPolicyResolution time.Duration
metricTypeFn MetricTypeFn
metadatasByType map[metric.Type]validationMetadata
defaultAllowedStoragePolicies map[policy.StoragePolicy]struct{}
defaultAllowedCustomAggregationTypes map[policy.AggregationType]struct{}
metricTypeFn MetricTypeFn
metadatasByType map[metric.Type]validationMetadata
}

// NewValidatorOptions create a new set of validator options.
func NewValidatorOptions() ValidatorOptions {
return &validatorOptions{
defaultCustomAggFunctionsEnabled: defaultDefaultCustomAggFunctionsEnabled,
defaultMinPolicyResolution: defaultDefaultMinPolicyResolution,
defaultMaxPolicyResolution: defaultDefaultMaxPolicyResolution,
metadatasByType: make(map[metric.Type]validationMetadata),
metadatasByType: make(map[metric.Type]validationMetadata),
}
}

func (o *validatorOptions) SetDefaultCustomAggregationFunctionEnabled(value bool) ValidatorOptions {
o.defaultCustomAggFunctionsEnabled = value
func (o *validatorOptions) SetDefaultAllowedStoragePolicies(value []policy.StoragePolicy) ValidatorOptions {
o.defaultAllowedStoragePolicies = toStoragePolicySet(value)
return o
}

func (o *validatorOptions) DefaultCustomAggregationFunctionEnabled() bool {
return o.defaultCustomAggFunctionsEnabled
}

func (o *validatorOptions) SetDefaultMinPolicyResolution(value time.Duration) ValidatorOptions {
o.defaultMinPolicyResolution = value
func (o *validatorOptions) SetDefaultAllowedCustomAggregationTypes(value policy.AggregationTypes) ValidatorOptions {
o.defaultAllowedCustomAggregationTypes = toAggregationTypeSet(value)
return o
}

func (o *validatorOptions) DefaultMinPolicyResolution() time.Duration {
return o.defaultMinPolicyResolution
}

func (o *validatorOptions) SetDefaultMaxPolicyResolution(value time.Duration) ValidatorOptions {
o.defaultMaxPolicyResolution = value
func (o *validatorOptions) SetAllowedStoragePoliciesFor(t metric.Type, policies []policy.StoragePolicy) ValidatorOptions {
metadata := o.findOrCreateMetadata(t)
metadata.allowedStoragePolicies = toStoragePolicySet(policies)
o.metadatasByType[t] = metadata
return o
}

func (o *validatorOptions) DefaultMaxPolicyResolution() time.Duration {
return o.defaultMaxPolicyResolution
func (o *validatorOptions) SetAllowedCustomAggregationTypesFor(t metric.Type, aggTypes policy.AggregationTypes) ValidatorOptions {
metadata := o.findOrCreateMetadata(t)
metadata.allowedCustomAggTypes = toAggregationTypeSet(aggTypes)
o.metadatasByType[t] = metadata
return o
}

func (o *validatorOptions) SetMetricTypeFn(value MetricTypeFn) ValidatorOptions {
Expand All @@ -144,55 +111,46 @@ func (o *validatorOptions) MetricTypeFn() MetricTypeFn {
return o.metricTypeFn
}

func (o *validatorOptions) SetCustomAggregationFunctionEnabledFor(t metric.Type, value bool) ValidatorOptions {
metadata := o.findOrCreateMetadata(t)
metadata.customAggFunctionsEnabled = value
o.metadatasByType[t] = metadata
return o
}

func (o *validatorOptions) CustomAggregationFunctionEnabledFor(t metric.Type) bool {
func (o *validatorOptions) IsAllowedStoragePolicyFor(t metric.Type, p policy.StoragePolicy) bool {
if metadata, exists := o.metadatasByType[t]; exists {
return metadata.customAggFunctionsEnabled
_, found := metadata.allowedStoragePolicies[p]
return found
}
return o.defaultCustomAggFunctionsEnabled
}

func (o *validatorOptions) SetMinPolicyResolutionFor(t metric.Type, value time.Duration) ValidatorOptions {
metadata := o.findOrCreateMetadata(t)
metadata.minPolicyResolution = value
o.metadatasByType[t] = metadata
return o
_, found := o.defaultAllowedStoragePolicies[p]
return found
}

func (o *validatorOptions) MinPolicyResolutionFor(t metric.Type) time.Duration {
func (o *validatorOptions) IsAllowedCustomAggregationTypeFor(t metric.Type, aggType policy.AggregationType) bool {
if metadata, exists := o.metadatasByType[t]; exists {
return metadata.minPolicyResolution
_, found := metadata.allowedCustomAggTypes[aggType]
return found
}
return o.defaultMinPolicyResolution
}

func (o *validatorOptions) SetMaxPolicyResolutionFor(t metric.Type, value time.Duration) ValidatorOptions {
metadata := o.findOrCreateMetadata(t)
metadata.maxPolicyResolution = value
o.metadatasByType[t] = metadata
return o
}

func (o *validatorOptions) MaxPolicyResolutionFor(t metric.Type) time.Duration {
if metadata, exists := o.metadatasByType[t]; exists {
return metadata.maxPolicyResolution
}
return o.defaultMaxPolicyResolution
_, found := o.defaultAllowedCustomAggregationTypes[aggType]
return found
}

func (o *validatorOptions) findOrCreateMetadata(t metric.Type) validationMetadata {
if metadata, found := o.metadatasByType[t]; found {
return metadata
}
return validationMetadata{
customAggFunctionsEnabled: o.defaultCustomAggFunctionsEnabled,
minPolicyResolution: o.defaultMinPolicyResolution,
maxPolicyResolution: o.defaultMaxPolicyResolution,
allowedStoragePolicies: o.defaultAllowedStoragePolicies,
allowedCustomAggTypes: o.defaultAllowedCustomAggregationTypes,
}
}

func toStoragePolicySet(policies []policy.StoragePolicy) map[policy.StoragePolicy]struct{} {
m := make(map[policy.StoragePolicy]struct{}, len(policies))
for _, p := range policies {
m[p] = struct{}{}
}
return m
}

func toAggregationTypeSet(aggTypes policy.AggregationTypes) map[policy.AggregationType]struct{} {
m := make(map[policy.AggregationType]struct{}, len(aggTypes))
for _, t := range aggTypes {
m[t] = struct{}{}
}
return m
}
Loading

0 comments on commit cdf29d4

Please sign in to comment.