From c0c0a65efa5554d72057dd2f53da82e10086282c Mon Sep 17 00:00:00 2001 From: Xi Chen Date: Thu, 28 Jun 2018 15:36:19 -0400 Subject: [PATCH] Ensure the pipelines are non-empty for existing IDs --- metadata/metadata.go | 40 +++++++++++++++++++++++++++++++++--- metadata/metadata_test.go | 28 +++++++++++++++++++++++++ rules/active_ruleset.go | 8 +++++++- rules/active_ruleset_test.go | 19 +++++++++++++---- 4 files changed, 87 insertions(+), 8 deletions(-) diff --git a/metadata/metadata.go b/metadata/metadata.go index 59a01dd..b56a7e2 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -33,7 +33,7 @@ var ( DefaultPipelineMetadata PipelineMetadata // DefaultPipelineMetadatas is a default list of pipeline metadatas. - DefaultPipelineMetadatas = []PipelineMetadata{DefaultPipelineMetadata} + DefaultPipelineMetadatas = PipelineMetadatas{DefaultPipelineMetadata} // DefaultMetadata is a default metadata. DefaultMetadata = Metadata{Pipelines: DefaultPipelineMetadatas} @@ -71,6 +71,15 @@ func (m PipelineMetadata) IsDefault() bool { m.Pipeline.IsEmpty() } +// Clone clones the pipeline metadata. +func (m PipelineMetadata) Clone() PipelineMetadata { + return PipelineMetadata{ + AggregationID: m.AggregationID, + StoragePolicies: m.StoragePolicies.Clone(), + Pipeline: m.Pipeline.Clone(), + } +} + // ToProto converts the pipeline metadata to a protobuf message in place. func (m PipelineMetadata) ToProto(pb *metricpb.PipelineMetadata) error { if err := m.AggregationID.ToProto(&pb.AggregationId); err != nil { @@ -115,9 +124,34 @@ func (m *PipelineMetadata) FromProto(pb metricpb.PipelineMetadata) error { return nil } +// PipelineMetadatas is a list of pipeline metadatas. +type PipelineMetadatas []PipelineMetadata + +// Equal returns true if two piplien metadatas are considered equal. +func (metadatas PipelineMetadatas) Equal(other PipelineMetadatas) bool { + if len(metadatas) != len(other) { + return false + } + for i := 0; i < len(metadatas); i++ { + if !metadatas[i].Equal(other[i]) { + return false + } + } + return true +} + +// Clone clones the list of pipeline metadatas. +func (metadatas PipelineMetadatas) Clone() PipelineMetadatas { + cloned := make(PipelineMetadatas, 0, len(metadatas)) + for i := 0; i < len(metadatas); i++ { + cloned = append(cloned, metadatas[i].Clone()) + } + return cloned +} + // Metadata represents the metadata associated with a metric. type Metadata struct { - Pipelines []PipelineMetadata `json:"pipelines"` + Pipelines PipelineMetadatas `json:"pipelines"` } // IsDefault returns whether this is the default metadata. @@ -147,7 +181,7 @@ func (m *Metadata) FromProto(pb metricpb.Metadata) error { if cap(m.Pipelines) >= numPipelines { m.Pipelines = m.Pipelines[:numPipelines] } else { - m.Pipelines = make([]PipelineMetadata, numPipelines) + m.Pipelines = make(PipelineMetadatas, numPipelines) } for i := 0; i < numPipelines; i++ { if err := m.Pipelines[i].FromProto(pb.Pipelines[i]); err != nil { diff --git a/metadata/metadata_test.go b/metadata/metadata_test.go index 4873bef..10550e2 100644 --- a/metadata/metadata_test.go +++ b/metadata/metadata_test.go @@ -798,6 +798,18 @@ func TestForwardMetadataFromProtoBadMetadataProto(t *testing.T) { require.Error(t, res.FromProto(testBadForwardMetadataProto)) } +func TestPipelineMetadataClone(t *testing.T) { + cloned1 := testLargePipelineMetadata.Clone() + cloned2 := testLargePipelineMetadata.Clone() + require.True(t, cloned1.Equal(testLargePipelineMetadata)) + require.True(t, cloned2.Equal(testLargePipelineMetadata)) + + // Assert that modifying the clone does not mutate the original pipeline metadata. + cloned1.StoragePolicies[0] = policy.MustParseStoragePolicy("1h:1h") + require.False(t, cloned1.Equal(testLargePipelineMetadata)) + require.True(t, cloned2.Equal(testLargePipelineMetadata)) +} + func TestPipelineMetadataToProto(t *testing.T) { inputs := []struct { sequence []PipelineMetadata @@ -905,6 +917,22 @@ func TestPipelineMetadataFromProtoBadMetadataProto(t *testing.T) { require.Error(t, res.FromProto(testBadPipelineMetadataProto)) } +func TestPipelineMetadatasClone(t *testing.T) { + input := PipelineMetadatas{ + testSmallPipelineMetadata, + testLargePipelineMetadata, + } + cloned1 := input.Clone() + cloned2 := input.Clone() + require.True(t, cloned1.Equal(input)) + require.True(t, cloned2.Equal(input)) + + // Assert that modifying the clone does not mutate the original pipeline metadata. + cloned1[0].StoragePolicies[0] = policy.MustParseStoragePolicy("1h:1h") + require.False(t, cloned1.Equal(input)) + require.True(t, cloned2.Equal(input)) +} + func TestStagedMetadatasToProto(t *testing.T) { inputs := []struct { sequence []StagedMetadatas diff --git a/rules/active_ruleset.go b/rules/active_ruleset.go index 515bed5..877a6f6 100644 --- a/rules/active_ruleset.go +++ b/rules/active_ruleset.go @@ -233,6 +233,12 @@ func (as *activeRuleSet) mappingsForNonRollupID( } pipelines = append(pipelines, pipeline) } + // NB: The pipeline list should never be empty as the resulting pipelines are + // used to determine how the *existing* ID is aggregated and retained. If there + // are no rule match, the default pipeline list is used. + if len(pipelines) == 0 { + pipelines = metadata.DefaultPipelineMetadatas.Clone() + } return mappingResults{ forExistingID: ruleMatchResults{cutoverNanos: cutoverNanos, pipelines: pipelines}, } @@ -489,7 +495,7 @@ func (as *activeRuleSet) reverseMappingsForNonRollupID( at aggregation.Type, ) (metadata.StagedMetadata, bool) { mappingRes := as.mappingsForNonRollupID(id, timeNanos).forExistingID - filteredPipelines := filteredPipelinesWithAggregationType(mappingRes.resolvedPipelines(), mt, at, as.aggTypeOpts) + filteredPipelines := filteredPipelinesWithAggregationType(mappingRes.pipelines, mt, at, as.aggTypeOpts) if len(filteredPipelines) == 0 { return metadata.DefaultStagedMetadata, false } diff --git a/rules/active_ruleset_test.go b/rules/active_ruleset_test.go index a652260..ee468c8 100644 --- a/rules/active_ruleset_test.go +++ b/rules/active_ruleset_test.go @@ -41,6 +41,9 @@ import ( ) var ( + testStagedMetadatasCmptOpts = []cmp.Option{ + cmpopts.EquateEmpty(), + } testIDWithMetadatasCmpOpts = []cmp.Option{ cmpopts.EquateEmpty(), } @@ -470,7 +473,7 @@ func TestActiveRuleSetForwardMatchWithMappingRules(t *testing.T) { for _, input := range inputs { res := as.ForwardMatch(b(input.id), input.matchFrom, input.matchTo) require.Equal(t, input.expireAtNanos, res.expireAtNanos) - require.Equal(t, input.forExistingIDResult, res.ForExistingIDAt(0)) + require.True(t, cmp.Equal(input.forExistingIDResult, res.ForExistingIDAt(0), testStagedMetadatasCmptOpts...)) require.Equal(t, 0, res.NumNewRollupIDs()) } } @@ -488,6 +491,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) { Tombstoned: false, Metadata: metadata.Metadata{ Pipelines: []metadata.PipelineMetadata{ + metadata.DefaultPipelineMetadata, { AggregationID: aggregation.MustCompressTypes(aggregation.Sum), StoragePolicies: policy.StoragePolicies{ @@ -655,6 +659,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) { Tombstoned: false, Metadata: metadata.Metadata{ Pipelines: []metadata.PipelineMetadata{ + metadata.DefaultPipelineMetadata, { AggregationID: aggregation.MustCompressTypes(aggregation.Sum), StoragePolicies: policy.StoragePolicies{ @@ -680,6 +685,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) { Tombstoned: false, Metadata: metadata.Metadata{ Pipelines: []metadata.PipelineMetadata{ + metadata.DefaultPipelineMetadata, { AggregationID: aggregation.MustCompressTypes(aggregation.Sum), StoragePolicies: policy.StoragePolicies{ @@ -727,6 +733,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) { Tombstoned: false, Metadata: metadata.Metadata{ Pipelines: []metadata.PipelineMetadata{ + metadata.DefaultPipelineMetadata, { AggregationID: aggregation.MustCompressTypes(aggregation.Sum), StoragePolicies: policy.StoragePolicies{ @@ -774,6 +781,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) { Tombstoned: false, Metadata: metadata.Metadata{ Pipelines: []metadata.PipelineMetadata{ + metadata.DefaultPipelineMetadata, { AggregationID: aggregation.DefaultID, StoragePolicies: policy.StoragePolicies{ @@ -825,6 +833,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) { Tombstoned: false, Metadata: metadata.Metadata{ Pipelines: []metadata.PipelineMetadata{ + metadata.DefaultPipelineMetadata, { AggregationID: aggregation.DefaultID, StoragePolicies: policy.StoragePolicies{ @@ -876,6 +885,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) { Tombstoned: false, Metadata: metadata.Metadata{ Pipelines: []metadata.PipelineMetadata{ + metadata.DefaultPipelineMetadata, { AggregationID: aggregation.DefaultID, StoragePolicies: policy.StoragePolicies{ @@ -905,6 +915,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) { Tombstoned: false, Metadata: metadata.Metadata{ Pipelines: []metadata.PipelineMetadata{ + metadata.DefaultPipelineMetadata, { AggregationID: aggregation.DefaultID, StoragePolicies: policy.StoragePolicies{ @@ -1231,7 +1242,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) { for _, input := range inputs { res := as.ForwardMatch(b(input.id), input.matchFrom, input.matchTo) require.Equal(t, input.expireAtNanos, res.expireAtNanos) - require.Equal(t, input.forExistingIDResult, res.ForExistingIDAt(0)) + require.True(t, cmp.Equal(input.forExistingIDResult, res.ForExistingIDAt(0), testStagedMetadatasCmptOpts...)) require.Equal(t, len(input.forNewRollupIDsResult), res.NumNewRollupIDs()) for i := 0; i < len(input.forNewRollupIDsResult); i++ { rollup := res.ForNewRollupIDsAt(i, 0) @@ -2270,7 +2281,7 @@ func TestActiveRuleSetForwardMatchWithMappingRulesAndRollupRules(t *testing.T) { for _, input := range inputs { res := as.ForwardMatch(b(input.id), input.matchFrom, input.matchTo) require.Equal(t, input.expireAtNanos, res.expireAtNanos) - require.Equal(t, input.forExistingIDResult, res.ForExistingIDAt(0)) + require.True(t, cmp.Equal(input.forExistingIDResult, res.ForExistingIDAt(0), testStagedMetadatasCmptOpts...)) require.Equal(t, len(input.forNewRollupIDsResult), res.NumNewRollupIDs()) for i := 0; i < len(input.forNewRollupIDsResult); i++ { rollup := res.ForNewRollupIDsAt(i, 0) @@ -2691,7 +2702,7 @@ func TestActiveRuleSetReverseMatchWithMappingRulesForNonRollupID(t *testing.T) { for _, input := range inputs { res := as.ReverseMatch(b(input.id), input.matchFrom, input.matchTo, input.metricType, input.aggregationType) require.Equal(t, input.expireAtNanos, res.expireAtNanos) - require.Equal(t, input.forExistingIDResult, res.ForExistingIDAt(0)) + require.True(t, cmp.Equal(input.forExistingIDResult, res.ForExistingIDAt(0), testStagedMetadatasCmptOpts...)) } }