From 89fe26f9e2347a0983616bb9c4df8ae7c13a403b Mon Sep 17 00:00:00 2001 From: xichen2020 Date: Wed, 27 Jun 2018 17:14:33 -0400 Subject: [PATCH 1/2] Make override fields optional (#190) --- rules/validator/config.go | 40 ++++++--- rules/validator/config_test.go | 159 ++++++++++++++++++++++++++++++++- 2 files changed, 185 insertions(+), 14 deletions(-) diff --git a/rules/validator/config.go b/rules/validator/config.go index 6bae7b9..af4d980 100644 --- a/rules/validator/config.go +++ b/rules/validator/config.go @@ -59,20 +59,38 @@ func (c Configuration) NewValidator( if err != nil { return nil, err } + opts := c.newValidatorOptions(nsValidator) + return NewValidator(opts), nil +} + +func (c Configuration) newValidatorOptions( + nsValidator namespace.Validator, +) Options { opts := NewOptions(). SetNamespaceValidator(nsValidator). SetRequiredRollupTags(c.RequiredRollupTags). SetMetricTypesFn(c.MetricTypes.NewMetricTypesFn()). - SetDefaultAllowedStoragePolicies(c.Policies.DefaultAllowed.StoragePolicies). - SetDefaultAllowedFirstLevelAggregationTypes(c.Policies.DefaultAllowed.FirstLevelAggregationTypes). - SetDefaultAllowedNonFirstLevelAggregationTypes(c.Policies.DefaultAllowed.NonFirstLevelAggregationTypes). SetTagNameInvalidChars(toRunes(c.TagNameInvalidChars)). SetMetricNameInvalidChars(toRunes(c.MetricNameInvalidChars)) + if c.Policies.DefaultAllowed.StoragePolicies != nil { + opts = opts.SetDefaultAllowedStoragePolicies(*c.Policies.DefaultAllowed.StoragePolicies) + } + if c.Policies.DefaultAllowed.FirstLevelAggregationTypes != nil { + opts = opts.SetDefaultAllowedFirstLevelAggregationTypes(*c.Policies.DefaultAllowed.FirstLevelAggregationTypes) + } + if c.Policies.DefaultAllowed.NonFirstLevelAggregationTypes != nil { + opts = opts.SetDefaultAllowedNonFirstLevelAggregationTypes(*c.Policies.DefaultAllowed.NonFirstLevelAggregationTypes) + } for _, override := range c.Policies.Overrides { - opts = opts. - SetAllowedStoragePoliciesFor(override.Type, override.Allowed.StoragePolicies). - SetAllowedFirstLevelAggregationTypesFor(override.Type, override.Allowed.FirstLevelAggregationTypes). - SetAllowedNonFirstLevelAggregationTypesFor(override.Type, override.Allowed.NonFirstLevelAggregationTypes) + if override.Allowed.StoragePolicies != nil { + opts = opts.SetAllowedStoragePoliciesFor(override.Type, *override.Allowed.StoragePolicies) + } + if override.Allowed.FirstLevelAggregationTypes != nil { + opts = opts.SetAllowedFirstLevelAggregationTypesFor(override.Type, *override.Allowed.FirstLevelAggregationTypes) + } + if override.Allowed.NonFirstLevelAggregationTypes != nil { + opts = opts.SetAllowedNonFirstLevelAggregationTypesFor(override.Type, *override.Allowed.NonFirstLevelAggregationTypes) + } } if c.MaxTransformationDerivativeOrder != nil { opts = opts.SetMaxTransformationDerivativeOrder(*c.MaxTransformationDerivativeOrder) @@ -80,7 +98,7 @@ func (c Configuration) NewValidator( if c.MaxRollupLevels != nil { opts = opts.SetMaxRollupLevels(*c.MaxRollupLevels) } - return NewValidator(opts), nil + return opts } type namespaceValidatorConfiguration struct { @@ -152,9 +170,9 @@ type policiesOverrideConfiguration struct { // policiesConfiguration is the configuration for storage policies and aggregation types. type policiesConfiguration struct { - StoragePolicies []policy.StoragePolicy `yaml:"storagePolicies"` - FirstLevelAggregationTypes []aggregation.Type `yaml:"firstLevelAggregationTypes"` - NonFirstLevelAggregationTypes []aggregation.Type `yaml:"nonFirstLevelAggregationTypes"` + StoragePolicies *[]policy.StoragePolicy `yaml:"storagePolicies"` + FirstLevelAggregationTypes *[]aggregation.Type `yaml:"firstLevelAggregationTypes"` + NonFirstLevelAggregationTypes *[]aggregation.Type `yaml:"nonFirstLevelAggregationTypes"` } func toRunes(s string) []rune { diff --git a/rules/validator/config_test.go b/rules/validator/config_test.go index c9e7899..0a5acf5 100644 --- a/rules/validator/config_test.go +++ b/rules/validator/config_test.go @@ -26,8 +26,10 @@ import ( "github.com/m3db/m3cluster/client" "github.com/m3db/m3cluster/kv" "github.com/m3db/m3cluster/kv/mem" + "github.com/m3db/m3metrics/aggregation" "github.com/m3db/m3metrics/filters" "github.com/m3db/m3metrics/metric" + "github.com/m3db/m3metrics/policy" "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" @@ -82,10 +84,161 @@ kv: require.NoError(t, err) } -func TestNamespaceValidatorConfigurationStatic(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() +func TestNewValidator(t *testing.T) { + cfgStr := ` +namespace: + static: + validationResult: valid +requiredRollupTags: + - tag1 + - tag2 +maxTransformationDerivativeOrder: 2 +maxRollupLevels: 1 +metricTypes: + typeTag: type + allowed: + - counter + - timer + - gauge +policies: + defaultAllowed: + storagePolicies: + - 10s:2d + - 1m:40d + nonFirstLevelAggregationTypes: + - Sum + - Last + overrides: + - type: counter + allowed: + firstLevelAggregationTypes: + - Sum + - type: timer + allowed: + storagePolicies: + - 10s:2d + firstLevelAggregationTypes: + - P50 + - P9999 + - type: gauge + allowed: + firstLevelAggregationTypes: + - Last +` + var cfg Configuration + require.NoError(t, yaml.Unmarshal([]byte(cfgStr), &cfg)) + opts := cfg.newValidatorOptions(nil) + + inputs := []struct { + metricType metric.Type + allowedStoragePolicies policy.StoragePolicies + disallowedStoragePolicies policy.StoragePolicies + allowedFirstLevelAggTypes aggregation.Types + disallowedFirstLevelAggTypes aggregation.Types + allowedNonFirstLevelAggTypes aggregation.Types + disallowedNonFirstLevelAggTypes aggregation.Types + }{ + { + metricType: metric.CounterType, + allowedStoragePolicies: policy.StoragePolicies{ + policy.MustParseStoragePolicy("10s:2d"), + policy.MustParseStoragePolicy("1m:40d"), + }, + disallowedStoragePolicies: policy.StoragePolicies{ + policy.MustParseStoragePolicy("1m:2d"), + policy.MustParseStoragePolicy("10s:40d"), + }, + allowedFirstLevelAggTypes: aggregation.Types{ + aggregation.Sum, + }, + disallowedFirstLevelAggTypes: aggregation.Types{ + aggregation.Last, + }, + allowedNonFirstLevelAggTypes: aggregation.Types{ + aggregation.Sum, + aggregation.Last, + }, + disallowedNonFirstLevelAggTypes: aggregation.Types{ + aggregation.Min, + aggregation.P99, + }, + }, + { + metricType: metric.TimerType, + allowedStoragePolicies: policy.StoragePolicies{ + policy.MustParseStoragePolicy("10s:2d"), + }, + disallowedStoragePolicies: policy.StoragePolicies{ + policy.MustParseStoragePolicy("1m:2d"), + policy.MustParseStoragePolicy("1m:40d"), + }, + allowedFirstLevelAggTypes: aggregation.Types{ + aggregation.P50, + aggregation.P9999, + }, + disallowedFirstLevelAggTypes: aggregation.Types{ + aggregation.Last, + }, + allowedNonFirstLevelAggTypes: aggregation.Types{ + aggregation.Sum, + aggregation.Last, + }, + disallowedNonFirstLevelAggTypes: aggregation.Types{ + aggregation.Min, + aggregation.P99, + }, + }, + { + metricType: metric.GaugeType, + allowedStoragePolicies: policy.StoragePolicies{ + policy.MustParseStoragePolicy("10s:2d"), + policy.MustParseStoragePolicy("1m:40d"), + }, + disallowedStoragePolicies: policy.StoragePolicies{ + policy.MustParseStoragePolicy("1m:2d"), + policy.MustParseStoragePolicy("10s:40d"), + }, + allowedFirstLevelAggTypes: aggregation.Types{ + aggregation.Last, + }, + disallowedFirstLevelAggTypes: aggregation.Types{ + aggregation.Sum, + }, + allowedNonFirstLevelAggTypes: aggregation.Types{ + aggregation.Sum, + aggregation.Last, + }, + disallowedNonFirstLevelAggTypes: aggregation.Types{ + aggregation.Min, + aggregation.P99, + }, + }, + } + + for _, input := range inputs { + for _, storagePolicy := range input.allowedStoragePolicies { + require.True(t, opts.IsAllowedStoragePolicyFor(input.metricType, storagePolicy)) + } + for _, storagePolicy := range input.disallowedStoragePolicies { + require.False(t, opts.IsAllowedStoragePolicyFor(input.metricType, storagePolicy)) + } + for _, aggregationType := range input.allowedFirstLevelAggTypes { + require.True(t, opts.IsAllowedFirstLevelAggregationTypeFor(input.metricType, aggregationType)) + } + for _, aggregationType := range input.disallowedFirstLevelAggTypes { + require.False(t, opts.IsAllowedFirstLevelAggregationTypeFor(input.metricType, aggregationType)) + } + for _, aggregationType := range input.allowedNonFirstLevelAggTypes { + require.True(t, opts.IsAllowedNonFirstLevelAggregationTypeFor(input.metricType, aggregationType)) + } + for _, aggregationType := range input.disallowedNonFirstLevelAggTypes { + require.False(t, opts.IsAllowedNonFirstLevelAggregationTypeFor(input.metricType, aggregationType)) + } + } +} + +func TestNamespaceValidatorConfigurationStatic(t *testing.T) { cfgStr := ` static: validationResult: valid From 0d435401c5888b4dd16350e8604a51fc14630198 Mon Sep 17 00:00:00 2001 From: xichen2020 Date: Fri, 29 Jun 2018 08:42:18 -0400 Subject: [PATCH 2/2] Ensure the pipelines are non-empty for existing IDs (#192) --- metadata/metadata.go | 40 +++++++++++++++++++++++++++++++++--- metadata/metadata_test.go | 28 +++++++++++++++++++++++++ rules/active_ruleset.go | 8 +++++++- rules/active_ruleset_test.go | 19 +++++++++++++---- 4 files changed, 87 insertions(+), 8 deletions(-) diff --git a/metadata/metadata.go b/metadata/metadata.go index 59a01dd..5930c48 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -33,7 +33,7 @@ var ( DefaultPipelineMetadata PipelineMetadata // DefaultPipelineMetadatas is a default list of pipeline metadatas. - DefaultPipelineMetadatas = []PipelineMetadata{DefaultPipelineMetadata} + DefaultPipelineMetadatas = PipelineMetadatas{DefaultPipelineMetadata} // DefaultMetadata is a default metadata. DefaultMetadata = Metadata{Pipelines: DefaultPipelineMetadatas} @@ -71,6 +71,15 @@ func (m PipelineMetadata) IsDefault() bool { m.Pipeline.IsEmpty() } +// Clone clones the pipeline metadata. +func (m PipelineMetadata) Clone() PipelineMetadata { + return PipelineMetadata{ + AggregationID: m.AggregationID, + StoragePolicies: m.StoragePolicies.Clone(), + Pipeline: m.Pipeline.Clone(), + } +} + // ToProto converts the pipeline metadata to a protobuf message in place. func (m PipelineMetadata) ToProto(pb *metricpb.PipelineMetadata) error { if err := m.AggregationID.ToProto(&pb.AggregationId); err != nil { @@ -115,9 +124,34 @@ func (m *PipelineMetadata) FromProto(pb metricpb.PipelineMetadata) error { return nil } +// PipelineMetadatas is a list of pipeline metadatas. +type PipelineMetadatas []PipelineMetadata + +// Equal returns true if two pipline metadatas are considered equal. +func (metadatas PipelineMetadatas) Equal(other PipelineMetadatas) bool { + if len(metadatas) != len(other) { + return false + } + for i := 0; i < len(metadatas); i++ { + if !metadatas[i].Equal(other[i]) { + return false + } + } + return true +} + +// Clone clones the list of pipeline metadatas. +func (metadatas PipelineMetadatas) Clone() PipelineMetadatas { + cloned := make(PipelineMetadatas, 0, len(metadatas)) + for i := 0; i < len(metadatas); i++ { + cloned = append(cloned, metadatas[i].Clone()) + } + return cloned +} + // Metadata represents the metadata associated with a metric. type Metadata struct { - Pipelines []PipelineMetadata `json:"pipelines"` + Pipelines PipelineMetadatas `json:"pipelines"` } // IsDefault returns whether this is the default metadata. @@ -147,7 +181,7 @@ func (m *Metadata) FromProto(pb metricpb.Metadata) error { if cap(m.Pipelines) >= numPipelines { m.Pipelines = m.Pipelines[:numPipelines] } else { - m.Pipelines = make([]PipelineMetadata, numPipelines) + m.Pipelines = make(PipelineMetadatas, numPipelines) } for i := 0; i < numPipelines; i++ { if err := m.Pipelines[i].FromProto(pb.Pipelines[i]); err != nil { diff --git a/metadata/metadata_test.go b/metadata/metadata_test.go index 4873bef..10550e2 100644 --- a/metadata/metadata_test.go +++ b/metadata/metadata_test.go @@ -798,6 +798,18 @@ func TestForwardMetadataFromProtoBadMetadataProto(t *testing.T) { require.Error(t, res.FromProto(testBadForwardMetadataProto)) } +func TestPipelineMetadataClone(t *testing.T) { + cloned1 := testLargePipelineMetadata.Clone() + cloned2 := testLargePipelineMetadata.Clone() + require.True(t, cloned1.Equal(testLargePipelineMetadata)) + require.True(t, cloned2.Equal(testLargePipelineMetadata)) + + // Assert that modifying the clone does not mutate the original pipeline metadata. + cloned1.StoragePolicies[0] = policy.MustParseStoragePolicy("1h:1h") + require.False(t, cloned1.Equal(testLargePipelineMetadata)) + require.True(t, cloned2.Equal(testLargePipelineMetadata)) +} + func TestPipelineMetadataToProto(t *testing.T) { inputs := []struct { sequence []PipelineMetadata @@ -905,6 +917,22 @@ func TestPipelineMetadataFromProtoBadMetadataProto(t *testing.T) { require.Error(t, res.FromProto(testBadPipelineMetadataProto)) } +func TestPipelineMetadatasClone(t *testing.T) { + input := PipelineMetadatas{ + testSmallPipelineMetadata, + testLargePipelineMetadata, + } + cloned1 := input.Clone() + cloned2 := input.Clone() + require.True(t, cloned1.Equal(input)) + require.True(t, cloned2.Equal(input)) + + // Assert that modifying the clone does not mutate the original pipeline metadata. + cloned1[0].StoragePolicies[0] = policy.MustParseStoragePolicy("1h:1h") + require.False(t, cloned1.Equal(input)) + require.True(t, cloned2.Equal(input)) +} + func TestStagedMetadatasToProto(t *testing.T) { inputs := []struct { sequence []StagedMetadatas diff --git a/rules/active_ruleset.go b/rules/active_ruleset.go index 515bed5..877a6f6 100644 --- a/rules/active_ruleset.go +++ b/rules/active_ruleset.go @@ -233,6 +233,12 @@ func (as *activeRuleSet) mappingsForNonRollupID( } pipelines = append(pipelines, pipeline) } + // NB: The pipeline list should never be empty as the resulting pipelines are + // used to determine how the *existing* ID is aggregated and retained. If there + // are no rule match, the default pipeline list is used. + if len(pipelines) == 0 { + pipelines = metadata.DefaultPipelineMetadatas.Clone() + } return mappingResults{ forExistingID: ruleMatchResults{cutoverNanos: cutoverNanos, pipelines: pipelines}, } @@ -489,7 +495,7 @@ func (as *activeRuleSet) reverseMappingsForNonRollupID( at aggregation.Type, ) (metadata.StagedMetadata, bool) { mappingRes := as.mappingsForNonRollupID(id, timeNanos).forExistingID - filteredPipelines := filteredPipelinesWithAggregationType(mappingRes.resolvedPipelines(), mt, at, as.aggTypeOpts) + filteredPipelines := filteredPipelinesWithAggregationType(mappingRes.pipelines, mt, at, as.aggTypeOpts) if len(filteredPipelines) == 0 { return metadata.DefaultStagedMetadata, false } diff --git a/rules/active_ruleset_test.go b/rules/active_ruleset_test.go index a652260..ee468c8 100644 --- a/rules/active_ruleset_test.go +++ b/rules/active_ruleset_test.go @@ -41,6 +41,9 @@ import ( ) var ( + testStagedMetadatasCmptOpts = []cmp.Option{ + cmpopts.EquateEmpty(), + } testIDWithMetadatasCmpOpts = []cmp.Option{ cmpopts.EquateEmpty(), } @@ -470,7 +473,7 @@ func TestActiveRuleSetForwardMatchWithMappingRules(t *testing.T) { for _, input := range inputs { res := as.ForwardMatch(b(input.id), input.matchFrom, input.matchTo) require.Equal(t, input.expireAtNanos, res.expireAtNanos) - require.Equal(t, input.forExistingIDResult, res.ForExistingIDAt(0)) + require.True(t, cmp.Equal(input.forExistingIDResult, res.ForExistingIDAt(0), testStagedMetadatasCmptOpts...)) require.Equal(t, 0, res.NumNewRollupIDs()) } } @@ -488,6 +491,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) { Tombstoned: false, Metadata: metadata.Metadata{ Pipelines: []metadata.PipelineMetadata{ + metadata.DefaultPipelineMetadata, { AggregationID: aggregation.MustCompressTypes(aggregation.Sum), StoragePolicies: policy.StoragePolicies{ @@ -655,6 +659,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) { Tombstoned: false, Metadata: metadata.Metadata{ Pipelines: []metadata.PipelineMetadata{ + metadata.DefaultPipelineMetadata, { AggregationID: aggregation.MustCompressTypes(aggregation.Sum), StoragePolicies: policy.StoragePolicies{ @@ -680,6 +685,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) { Tombstoned: false, Metadata: metadata.Metadata{ Pipelines: []metadata.PipelineMetadata{ + metadata.DefaultPipelineMetadata, { AggregationID: aggregation.MustCompressTypes(aggregation.Sum), StoragePolicies: policy.StoragePolicies{ @@ -727,6 +733,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) { Tombstoned: false, Metadata: metadata.Metadata{ Pipelines: []metadata.PipelineMetadata{ + metadata.DefaultPipelineMetadata, { AggregationID: aggregation.MustCompressTypes(aggregation.Sum), StoragePolicies: policy.StoragePolicies{ @@ -774,6 +781,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) { Tombstoned: false, Metadata: metadata.Metadata{ Pipelines: []metadata.PipelineMetadata{ + metadata.DefaultPipelineMetadata, { AggregationID: aggregation.DefaultID, StoragePolicies: policy.StoragePolicies{ @@ -825,6 +833,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) { Tombstoned: false, Metadata: metadata.Metadata{ Pipelines: []metadata.PipelineMetadata{ + metadata.DefaultPipelineMetadata, { AggregationID: aggregation.DefaultID, StoragePolicies: policy.StoragePolicies{ @@ -876,6 +885,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) { Tombstoned: false, Metadata: metadata.Metadata{ Pipelines: []metadata.PipelineMetadata{ + metadata.DefaultPipelineMetadata, { AggregationID: aggregation.DefaultID, StoragePolicies: policy.StoragePolicies{ @@ -905,6 +915,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) { Tombstoned: false, Metadata: metadata.Metadata{ Pipelines: []metadata.PipelineMetadata{ + metadata.DefaultPipelineMetadata, { AggregationID: aggregation.DefaultID, StoragePolicies: policy.StoragePolicies{ @@ -1231,7 +1242,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) { for _, input := range inputs { res := as.ForwardMatch(b(input.id), input.matchFrom, input.matchTo) require.Equal(t, input.expireAtNanos, res.expireAtNanos) - require.Equal(t, input.forExistingIDResult, res.ForExistingIDAt(0)) + require.True(t, cmp.Equal(input.forExistingIDResult, res.ForExistingIDAt(0), testStagedMetadatasCmptOpts...)) require.Equal(t, len(input.forNewRollupIDsResult), res.NumNewRollupIDs()) for i := 0; i < len(input.forNewRollupIDsResult); i++ { rollup := res.ForNewRollupIDsAt(i, 0) @@ -2270,7 +2281,7 @@ func TestActiveRuleSetForwardMatchWithMappingRulesAndRollupRules(t *testing.T) { for _, input := range inputs { res := as.ForwardMatch(b(input.id), input.matchFrom, input.matchTo) require.Equal(t, input.expireAtNanos, res.expireAtNanos) - require.Equal(t, input.forExistingIDResult, res.ForExistingIDAt(0)) + require.True(t, cmp.Equal(input.forExistingIDResult, res.ForExistingIDAt(0), testStagedMetadatasCmptOpts...)) require.Equal(t, len(input.forNewRollupIDsResult), res.NumNewRollupIDs()) for i := 0; i < len(input.forNewRollupIDsResult); i++ { rollup := res.ForNewRollupIDsAt(i, 0) @@ -2691,7 +2702,7 @@ func TestActiveRuleSetReverseMatchWithMappingRulesForNonRollupID(t *testing.T) { for _, input := range inputs { res := as.ReverseMatch(b(input.id), input.matchFrom, input.matchTo, input.metricType, input.aggregationType) require.Equal(t, input.expireAtNanos, res.expireAtNanos) - require.Equal(t, input.forExistingIDResult, res.ForExistingIDAt(0)) + require.True(t, cmp.Equal(input.forExistingIDResult, res.ForExistingIDAt(0), testStagedMetadatasCmptOpts...)) } }