diff --git a/policy/aggregation_type.go b/policy/aggregation_type.go index b8c4476..eb3508d 100644 --- a/policy/aggregation_type.go +++ b/policy/aggregation_type.go @@ -438,12 +438,16 @@ func (id AggregationID) Merge(other AggregationID) (AggregationID, bool) { return id, merged } +// AggregationTypes returns the aggregation types defined by the id. +func (id AggregationID) AggregationTypes() (AggregationTypes, error) { + return NewAggregationIDDecompressor().Decompress(id) +} + // String for debugging. func (id AggregationID) String() string { - aggTypes, err := NewAggregationIDDecompressor().Decompress(id) + aggTypes, err := id.AggregationTypes() if err != nil { return fmt.Sprintf("[invalid AggregationID: %v]", err) } - return aggTypes.String() } diff --git a/rules/validator.go b/rules/validator.go index 41494e9..77504e0 100644 --- a/rules/validator.go +++ b/rules/validator.go @@ -97,7 +97,7 @@ func (v *validator) validateMappingRules(rules MappingRules) error { // Validate that the policies are valid. t := v.opts.MetricTypeFn()(view.Filters) for _, p := range view.Policies { - if err := v.validatePolicy(p, t); err != nil { + if err := v.validatePolicy(t, p); err != nil { return err } } @@ -134,7 +134,7 @@ func (v *validator) validateRollupRules(rules RollupRules) error { t := v.opts.MetricTypeFn()(view.Filters) for _, target := range view.Targets { for _, p := range target.Policies { - if err := v.validatePolicy(p, t); err != nil { + if err := v.validatePolicy(t, p); err != nil { return err } } @@ -153,22 +153,24 @@ func (v *validator) validateFilters(f map[string]string) error { return nil } -func (v *validator) validatePolicy(p policy.Policy, t metric.Type) error { - // Validate policy resolution. - resolution := p.Resolution().Window - if minResolution := v.opts.MinPolicyResolutionFor(t); resolution < minResolution { - return fmt.Errorf("policy resolution %v is smaller than the minimum resolution supported (%v) for metric type %v", resolution, minResolution, t) - } - if maxResolution := v.opts.MaxPolicyResolutionFor(t); resolution > maxResolution { - return fmt.Errorf("policy resolution %v is larger than the maximum resolution supported (%v) for metric type %v", resolution, maxResolution, t) +func (v *validator) validatePolicy(t metric.Type, p policy.Policy) error { + // Validate storage policy. + if !v.opts.IsAllowedStoragePolicyFor(t, p.StoragePolicy) { + return fmt.Errorf("storage policy %v is not allowed for metric type %v", p.StoragePolicy, t) } // Validate aggregation function. if isDefaultAggFn := p.AggregationID.IsDefault(); isDefaultAggFn { return nil } - if customAggFnEnabled := v.opts.CustomAggregationFunctionEnabledFor(t); !customAggFnEnabled { - return fmt.Errorf("policy %v contains unsupported custom aggregation function %v for metric type %v", p, p.AggregationID, t) + aggTypes, err := p.AggregationID.AggregationTypes() + if err != nil { + return err + } + for _, aggType := range aggTypes { + if !v.opts.IsAllowedCustomAggregationTypeFor(t, aggType) { + return fmt.Errorf("aggregation type %v is not allowed for metric type %v", aggType, t) + } } return nil diff --git a/rules/validator_options.go b/rules/validator_options.go index 81553da..4cdc7f9 100644 --- a/rules/validator_options.go +++ b/rules/validator_options.go @@ -21,15 +21,8 @@ package rules import ( - "time" - "github.com/m3db/m3metrics/metric" -) - -const ( - defaultDefaultCustomAggFunctionsEnabled = false - defaultDefaultMinPolicyResolution = time.Second - defaultDefaultMaxPolicyResolution = time.Hour + "github.com/m3db/m3metrics/policy" ) // MetricTypeFn determines the metric type based on a set of tag based filters. @@ -37,25 +30,19 @@ type MetricTypeFn func(tagFilters map[string]string) metric.Type // ValidatorOptions provide a set of options for the validator. type ValidatorOptions interface { - // SetDefaultCustomAggregationFunctionEnabled sets whether custom aggregation functions - // are enabled by default. - SetDefaultCustomAggregationFunctionEnabled(value bool) ValidatorOptions + // SetDefaultAllowedStoragePolicies sets the default list of allowed storage policies. + SetDefaultAllowedStoragePolicies(value []policy.StoragePolicy) ValidatorOptions - // DefaultCustomAggregationFunctionEnabled returns whether custom aggregation functions - // are enabled by default. - DefaultCustomAggregationFunctionEnabled() bool + // SetDefaultAllowedCustomAggregationTypes sets the default list of allowed custom + // aggregation types. + SetDefaultAllowedCustomAggregationTypes(value policy.AggregationTypes) ValidatorOptions - // SetDefaultMinPolicyResolution sets the default minimum policy resolution. - SetDefaultMinPolicyResolution(value time.Duration) ValidatorOptions + // SetAllowedStoragePoliciesFor sets the list of allowed storage policies for a given metric type. + SetAllowedStoragePoliciesFor(t metric.Type, policies []policy.StoragePolicy) ValidatorOptions - // DefaultMinPolicyResolution returns the default minimum policy resolution. - DefaultMinPolicyResolution() time.Duration - - // SetDefaultMaxPolicyResolution sets the default maximum policy resolution. - SetDefaultMaxPolicyResolution(value time.Duration) ValidatorOptions - - // DefaultMaxPolicyResolution returns the default maximum policy resolution. - DefaultMaxPolicyResolution() time.Duration + // SetAllowedCustomAggregationTypesFor sets the list of allowed custom aggregation + // types for a given metric type. + SetAllowedCustomAggregationTypesFor(t metric.Type, aggTypes policy.AggregationTypes) ValidatorOptions // SetMetricTypeFn sets the metric type function. SetMetricTypeFn(value MetricTypeFn) ValidatorOptions @@ -63,76 +50,56 @@ type ValidatorOptions interface { // MetricTypeFn returns the metric type function. MetricTypeFn() MetricTypeFn - // SetCustomAggregationFunctionEnabledFor sets whether custom aggregation functions - // are enabled for a given metric type. - SetCustomAggregationFunctionEnabledFor(t metric.Type, value bool) ValidatorOptions - - // CustomAggregationFunctionEnabledFor returns whether custom aggregation functions - // are enabled for a given metric type. - CustomAggregationFunctionEnabledFor(t metric.Type) bool + // IsAllowedStoragePolicyFor determines whether a given storage policy is allowed for the + // given metric type. + IsAllowedStoragePolicyFor(t metric.Type, p policy.StoragePolicy) bool - // SetMinPolicyResolutionFor sets the minimum policy resolution for a given metric type. - SetMinPolicyResolutionFor(t metric.Type, value time.Duration) ValidatorOptions - - // MinPolicyResolutionFor returns the minimum policy resolution for a given metric type. - MinPolicyResolutionFor(t metric.Type) time.Duration - - // SetMaxPolicyResolutionFor sets the maximum policy resolution for a given metric type. - SetMaxPolicyResolutionFor(t metric.Type, value time.Duration) ValidatorOptions - - // MaxPolicyResolutionFor returns the maximum policy resolution for a given metric type. - MaxPolicyResolutionFor(t metric.Type) time.Duration + // IsAllowedCustomAggregationTypeFor determines whether a given aggregation type is allowed for + // the given metric type. + IsAllowedCustomAggregationTypeFor(t metric.Type, aggType policy.AggregationType) bool } type validationMetadata struct { - customAggFunctionsEnabled bool - minPolicyResolution time.Duration - maxPolicyResolution time.Duration + allowedStoragePolicies map[policy.StoragePolicy]struct{} + allowedCustomAggTypes map[policy.AggregationType]struct{} } type validatorOptions struct { - defaultCustomAggFunctionsEnabled bool - defaultMinPolicyResolution time.Duration - defaultMaxPolicyResolution time.Duration - metricTypeFn MetricTypeFn - metadatasByType map[metric.Type]validationMetadata + defaultAllowedStoragePolicies map[policy.StoragePolicy]struct{} + defaultAllowedCustomAggregationTypes map[policy.AggregationType]struct{} + metricTypeFn MetricTypeFn + metadatasByType map[metric.Type]validationMetadata } // NewValidatorOptions create a new set of validator options. func NewValidatorOptions() ValidatorOptions { return &validatorOptions{ - defaultCustomAggFunctionsEnabled: defaultDefaultCustomAggFunctionsEnabled, - defaultMinPolicyResolution: defaultDefaultMinPolicyResolution, - defaultMaxPolicyResolution: defaultDefaultMaxPolicyResolution, - metadatasByType: make(map[metric.Type]validationMetadata), + metadatasByType: make(map[metric.Type]validationMetadata), } } -func (o *validatorOptions) SetDefaultCustomAggregationFunctionEnabled(value bool) ValidatorOptions { - o.defaultCustomAggFunctionsEnabled = value +func (o *validatorOptions) SetDefaultAllowedStoragePolicies(value []policy.StoragePolicy) ValidatorOptions { + o.defaultAllowedStoragePolicies = toStoragePolicySet(value) return o } -func (o *validatorOptions) DefaultCustomAggregationFunctionEnabled() bool { - return o.defaultCustomAggFunctionsEnabled -} - -func (o *validatorOptions) SetDefaultMinPolicyResolution(value time.Duration) ValidatorOptions { - o.defaultMinPolicyResolution = value +func (o *validatorOptions) SetDefaultAllowedCustomAggregationTypes(value policy.AggregationTypes) ValidatorOptions { + o.defaultAllowedCustomAggregationTypes = toAggregationTypeSet(value) return o } -func (o *validatorOptions) DefaultMinPolicyResolution() time.Duration { - return o.defaultMinPolicyResolution -} - -func (o *validatorOptions) SetDefaultMaxPolicyResolution(value time.Duration) ValidatorOptions { - o.defaultMaxPolicyResolution = value +func (o *validatorOptions) SetAllowedStoragePoliciesFor(t metric.Type, policies []policy.StoragePolicy) ValidatorOptions { + metadata := o.findOrCreateMetadata(t) + metadata.allowedStoragePolicies = toStoragePolicySet(policies) + o.metadatasByType[t] = metadata return o } -func (o *validatorOptions) DefaultMaxPolicyResolution() time.Duration { - return o.defaultMaxPolicyResolution +func (o *validatorOptions) SetAllowedCustomAggregationTypesFor(t metric.Type, aggTypes policy.AggregationTypes) ValidatorOptions { + metadata := o.findOrCreateMetadata(t) + metadata.allowedCustomAggTypes = toAggregationTypeSet(aggTypes) + o.metadatasByType[t] = metadata + return o } func (o *validatorOptions) SetMetricTypeFn(value MetricTypeFn) ValidatorOptions { @@ -144,46 +111,22 @@ func (o *validatorOptions) MetricTypeFn() MetricTypeFn { return o.metricTypeFn } -func (o *validatorOptions) SetCustomAggregationFunctionEnabledFor(t metric.Type, value bool) ValidatorOptions { - metadata := o.findOrCreateMetadata(t) - metadata.customAggFunctionsEnabled = value - o.metadatasByType[t] = metadata - return o -} - -func (o *validatorOptions) CustomAggregationFunctionEnabledFor(t metric.Type) bool { +func (o *validatorOptions) IsAllowedStoragePolicyFor(t metric.Type, p policy.StoragePolicy) bool { if metadata, exists := o.metadatasByType[t]; exists { - return metadata.customAggFunctionsEnabled + _, found := metadata.allowedStoragePolicies[p] + return found } - return o.defaultCustomAggFunctionsEnabled -} - -func (o *validatorOptions) SetMinPolicyResolutionFor(t metric.Type, value time.Duration) ValidatorOptions { - metadata := o.findOrCreateMetadata(t) - metadata.minPolicyResolution = value - o.metadatasByType[t] = metadata - return o + _, found := o.defaultAllowedStoragePolicies[p] + return found } -func (o *validatorOptions) MinPolicyResolutionFor(t metric.Type) time.Duration { +func (o *validatorOptions) IsAllowedCustomAggregationTypeFor(t metric.Type, aggType policy.AggregationType) bool { if metadata, exists := o.metadatasByType[t]; exists { - return metadata.minPolicyResolution + _, found := metadata.allowedCustomAggTypes[aggType] + return found } - return o.defaultMinPolicyResolution -} - -func (o *validatorOptions) SetMaxPolicyResolutionFor(t metric.Type, value time.Duration) ValidatorOptions { - metadata := o.findOrCreateMetadata(t) - metadata.maxPolicyResolution = value - o.metadatasByType[t] = metadata - return o -} - -func (o *validatorOptions) MaxPolicyResolutionFor(t metric.Type) time.Duration { - if metadata, exists := o.metadatasByType[t]; exists { - return metadata.maxPolicyResolution - } - return o.defaultMaxPolicyResolution + _, found := o.defaultAllowedCustomAggregationTypes[aggType] + return found } func (o *validatorOptions) findOrCreateMetadata(t metric.Type) validationMetadata { @@ -191,8 +134,23 @@ func (o *validatorOptions) findOrCreateMetadata(t metric.Type) validationMetadat return metadata } return validationMetadata{ - customAggFunctionsEnabled: o.defaultCustomAggFunctionsEnabled, - minPolicyResolution: o.defaultMinPolicyResolution, - maxPolicyResolution: o.defaultMaxPolicyResolution, + allowedStoragePolicies: o.defaultAllowedStoragePolicies, + allowedCustomAggTypes: o.defaultAllowedCustomAggregationTypes, + } +} + +func toStoragePolicySet(policies []policy.StoragePolicy) map[policy.StoragePolicy]struct{} { + m := make(map[policy.StoragePolicy]struct{}, len(policies)) + for _, p := range policies { + m[p] = struct{}{} + } + return m +} + +func toAggregationTypeSet(aggTypes policy.AggregationTypes) map[policy.AggregationType]struct{} { + m := make(map[policy.AggregationType]struct{}, len(aggTypes)) + for _, t := range aggTypes { + m[t] = struct{}{} } + return m } diff --git a/rules/validator_test.go b/rules/validator_test.go index bb3370c..c1176a0 100644 --- a/rules/validator_test.go +++ b/rules/validator_test.go @@ -26,6 +26,7 @@ import ( "github.com/m3db/m3metrics/generated/proto/schema" "github.com/m3db/m3metrics/metric" + "github.com/m3db/m3metrics/policy" "github.com/stretchr/testify/require" ) @@ -65,26 +66,30 @@ func TestValidatorValidateMappingRuleInvalidFilter(t *testing.T) { require.Error(t, rs.Validate(validator)) } -func TestValidatorValidateMappingRulePolicyResolution(t *testing.T) { +func TestValidatorValidateMappingRulePolicy(t *testing.T) { + testStoragePolicies := []policy.StoragePolicy{ + policy.MustParseStoragePolicy("10s:1d"), + } ruleSet := testRuleSetWithMappingRules(t, testPolicyResolutionMappingRulesConfig()) + inputs := []struct { opts ValidatorOptions expectErr bool }{ { - // The resolution of the policy is within bounds. + // By default policy is allowed. opts: testValidatorOptions(), - expectErr: false, + expectErr: true, }, { - // The resolution of the policy is smaller than minimum. - opts: testValidatorOptions().SetMinPolicyResolutionFor(metric.TimerType, time.Minute), - expectErr: true, + // Policy is allowed through the default list of policies. + opts: testValidatorOptions().SetDefaultAllowedStoragePolicies(testStoragePolicies), + expectErr: false, }, { - // The resolution of the policy is larger than maximum. - opts: testValidatorOptions().SetMaxPolicyResolutionFor(metric.TimerType, time.Second), - expectErr: true, + // Policy is allowed through the list of policies allowed for timers. + opts: testValidatorOptions().SetAllowedStoragePoliciesFor(metric.TimerType, testStoragePolicies), + expectErr: false, }, } @@ -98,25 +103,26 @@ func TestValidatorValidateMappingRulePolicyResolution(t *testing.T) { } } -func TestValidatorValidateMappingRuleCustomAggregationFunction(t *testing.T) { - ruleSet := testRuleSetWithMappingRules(t, testCustomAggregationFunctionMappingRulesConfig()) +func TestValidatorValidateMappingRuleCustomAggregationTypes(t *testing.T) { + testAggregationTypes := []policy.AggregationType{policy.Count, policy.Max} + ruleSet := testRuleSetWithMappingRules(t, testCustomAggregationTypeMappingRulesConfig()) inputs := []struct { opts ValidatorOptions expectErr bool }{ { - // The resolution of the policy is within bounds. + // By default no custom aggregation type is allowed. opts: testValidatorOptions(), expectErr: true, }, { - // The resolution of the policy is smaller than minimum. - opts: testValidatorOptions().SetCustomAggregationFunctionEnabledFor(metric.TimerType, false), - expectErr: true, + // Aggregation type is allowed through the default list of custom aggregation types. + opts: testValidatorOptions().SetDefaultAllowedCustomAggregationTypes(testAggregationTypes), + expectErr: false, }, { - // The resolution of the policy is larger than maximum. - opts: testValidatorOptions().SetCustomAggregationFunctionEnabledFor(metric.TimerType, true), + // Aggregation type is allowed through the list of custom aggregation types for timers. + opts: testValidatorOptions().SetAllowedCustomAggregationTypesFor(metric.TimerType, testAggregationTypes), expectErr: false, }, } @@ -159,26 +165,30 @@ func TestValidatorValidateRollupRuleInvalidFilter(t *testing.T) { require.Error(t, rs.Validate(validator)) } -func TestValidatorValidateRollupRulePolicyResolution(t *testing.T) { +func TestValidatorValidateRollupRulePolicy(t *testing.T) { + testStoragePolicies := []policy.StoragePolicy{ + policy.MustParseStoragePolicy("10s:1d"), + } ruleSet := testRuleSetWithRollupRules(t, testPolicyResolutionRollupRulesConfig()) + inputs := []struct { opts ValidatorOptions expectErr bool }{ { - // The resolution of the policy is within bounds. + // By default policy is allowed. opts: testValidatorOptions(), - expectErr: false, + expectErr: true, }, { - // The resolution of the policy is smaller than minimum. - opts: testValidatorOptions().SetMinPolicyResolutionFor(metric.TimerType, time.Minute), - expectErr: true, + // Policy is allowed through the default list of policies. + opts: testValidatorOptions().SetDefaultAllowedStoragePolicies(testStoragePolicies), + expectErr: false, }, { - // The resolution of the policy is larger than maximum. - opts: testValidatorOptions().SetMaxPolicyResolutionFor(metric.TimerType, time.Second), - expectErr: true, + // Policy is allowed through the list of policies allowed for timers. + opts: testValidatorOptions().SetAllowedStoragePoliciesFor(metric.TimerType, testStoragePolicies), + expectErr: false, }, } @@ -192,25 +202,26 @@ func TestValidatorValidateRollupRulePolicyResolution(t *testing.T) { } } -func TestValidatorValidateRollupRuleCustomAggregationFunction(t *testing.T) { - ruleSet := testRuleSetWithRollupRules(t, testCustomAggregationFunctionRollupRulesConfig()) +func TestValidatorValidateRollupRuleCustomAggregationTypes(t *testing.T) { + testAggregationTypes := []policy.AggregationType{policy.Count, policy.Max} + ruleSet := testRuleSetWithRollupRules(t, testCustomAggregationTypeRollupRulesConfig()) inputs := []struct { opts ValidatorOptions expectErr bool }{ { - // The resolution of the policy is within bounds. + // By default no custom aggregation type is allowed. opts: testValidatorOptions(), expectErr: true, }, { - // The resolution of the policy is smaller than minimum. - opts: testValidatorOptions().SetCustomAggregationFunctionEnabledFor(metric.TimerType, false), - expectErr: true, + // Aggregation type is allowed through the default list of custom aggregation types. + opts: testValidatorOptions().SetDefaultAllowedCustomAggregationTypes(testAggregationTypes), + expectErr: false, }, { - // The resolution of the policy is larger than maximum. - opts: testValidatorOptions().SetCustomAggregationFunctionEnabledFor(metric.TimerType, true), + // Aggregation type is allowed through the list of custom aggregation types for timers. + opts: testValidatorOptions().SetAllowedCustomAggregationTypesFor(metric.TimerType, testAggregationTypes), expectErr: false, }, } @@ -301,7 +312,7 @@ func testPolicyResolutionMappingRulesConfig() []*schema.MappingRule { } } -func testCustomAggregationFunctionMappingRulesConfig() []*schema.MappingRule { +func testCustomAggregationTypeMappingRulesConfig() []*schema.MappingRule { return []*schema.MappingRule{ &schema.MappingRule{ Uuid: "mappingRule1", @@ -320,7 +331,7 @@ func testCustomAggregationFunctionMappingRulesConfig() []*schema.MappingRule { Precision: int64(time.Second), }, Retention: &schema.Retention{ - Period: int64(24 * time.Hour), + Period: int64(6 * time.Hour), }, }, AggregationTypes: []schema.AggregationType{ @@ -417,7 +428,7 @@ func testPolicyResolutionRollupRulesConfig() []*schema.RollupRule { } } -func testCustomAggregationFunctionRollupRulesConfig() []*schema.RollupRule { +func testCustomAggregationTypeRollupRulesConfig() []*schema.RollupRule { return []*schema.RollupRule{ &schema.RollupRule{ Uuid: "rollupRule1", @@ -440,12 +451,12 @@ func testCustomAggregationFunctionRollupRulesConfig() []*schema.RollupRule { Precision: int64(time.Second), }, Retention: &schema.Retention{ - Period: int64(24 * time.Hour), + Period: int64(6 * time.Hour), }, }, AggregationTypes: []schema.AggregationType{ schema.AggregationType_COUNT, - schema.AggregationType_LAST, + schema.AggregationType_MAX, }, }, }, @@ -472,10 +483,12 @@ func testRuleSetWithRollupRules(t *testing.T, rrs []*schema.RollupRule) RuleSet } func testValidatorOptions() ValidatorOptions { + testStoragePolicies := []policy.StoragePolicy{ + policy.MustParseStoragePolicy("10s:6h"), + } return NewValidatorOptions(). - SetDefaultCustomAggregationFunctionEnabled(false). - SetDefaultMinPolicyResolution(time.Second). - SetDefaultMaxPolicyResolution(time.Hour). + SetDefaultAllowedStoragePolicies(testStoragePolicies). + SetDefaultAllowedCustomAggregationTypes(nil). SetMetricTypeFn(testMetricTypeFn()) }