Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Ensure the pipelines are non-empty for existing IDs
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Jun 28, 2018
1 parent 89fe26f commit c0c0a65
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 8 deletions.
40 changes: 37 additions & 3 deletions metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var (
DefaultPipelineMetadata PipelineMetadata

// DefaultPipelineMetadatas is a default list of pipeline metadatas.
DefaultPipelineMetadatas = []PipelineMetadata{DefaultPipelineMetadata}
DefaultPipelineMetadatas = PipelineMetadatas{DefaultPipelineMetadata}

// DefaultMetadata is a default metadata.
DefaultMetadata = Metadata{Pipelines: DefaultPipelineMetadatas}
Expand Down Expand Up @@ -71,6 +71,15 @@ func (m PipelineMetadata) IsDefault() bool {
m.Pipeline.IsEmpty()
}

// Clone clones the pipeline metadata.
func (m PipelineMetadata) Clone() PipelineMetadata {
return PipelineMetadata{
AggregationID: m.AggregationID,
StoragePolicies: m.StoragePolicies.Clone(),
Pipeline: m.Pipeline.Clone(),
}
}

// ToProto converts the pipeline metadata to a protobuf message in place.
func (m PipelineMetadata) ToProto(pb *metricpb.PipelineMetadata) error {
if err := m.AggregationID.ToProto(&pb.AggregationId); err != nil {
Expand Down Expand Up @@ -115,9 +124,34 @@ func (m *PipelineMetadata) FromProto(pb metricpb.PipelineMetadata) error {
return nil
}

// PipelineMetadatas is a list of pipeline metadatas.
type PipelineMetadatas []PipelineMetadata

// Equal returns true if two piplien metadatas are considered equal.
func (metadatas PipelineMetadatas) Equal(other PipelineMetadatas) bool {
if len(metadatas) != len(other) {
return false
}
for i := 0; i < len(metadatas); i++ {
if !metadatas[i].Equal(other[i]) {
return false
}
}
return true
}

// Clone clones the list of pipeline metadatas.
func (metadatas PipelineMetadatas) Clone() PipelineMetadatas {
cloned := make(PipelineMetadatas, 0, len(metadatas))
for i := 0; i < len(metadatas); i++ {
cloned = append(cloned, metadatas[i].Clone())
}
return cloned
}

// Metadata represents the metadata associated with a metric.
type Metadata struct {
Pipelines []PipelineMetadata `json:"pipelines"`
Pipelines PipelineMetadatas `json:"pipelines"`
}

// IsDefault returns whether this is the default metadata.
Expand Down Expand Up @@ -147,7 +181,7 @@ func (m *Metadata) FromProto(pb metricpb.Metadata) error {
if cap(m.Pipelines) >= numPipelines {
m.Pipelines = m.Pipelines[:numPipelines]
} else {
m.Pipelines = make([]PipelineMetadata, numPipelines)
m.Pipelines = make(PipelineMetadatas, numPipelines)
}
for i := 0; i < numPipelines; i++ {
if err := m.Pipelines[i].FromProto(pb.Pipelines[i]); err != nil {
Expand Down
28 changes: 28 additions & 0 deletions metadata/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,18 @@ func TestForwardMetadataFromProtoBadMetadataProto(t *testing.T) {
require.Error(t, res.FromProto(testBadForwardMetadataProto))
}

func TestPipelineMetadataClone(t *testing.T) {
cloned1 := testLargePipelineMetadata.Clone()
cloned2 := testLargePipelineMetadata.Clone()
require.True(t, cloned1.Equal(testLargePipelineMetadata))
require.True(t, cloned2.Equal(testLargePipelineMetadata))

// Assert that modifying the clone does not mutate the original pipeline metadata.
cloned1.StoragePolicies[0] = policy.MustParseStoragePolicy("1h:1h")
require.False(t, cloned1.Equal(testLargePipelineMetadata))
require.True(t, cloned2.Equal(testLargePipelineMetadata))
}

func TestPipelineMetadataToProto(t *testing.T) {
inputs := []struct {
sequence []PipelineMetadata
Expand Down Expand Up @@ -905,6 +917,22 @@ func TestPipelineMetadataFromProtoBadMetadataProto(t *testing.T) {
require.Error(t, res.FromProto(testBadPipelineMetadataProto))
}

func TestPipelineMetadatasClone(t *testing.T) {
input := PipelineMetadatas{
testSmallPipelineMetadata,
testLargePipelineMetadata,
}
cloned1 := input.Clone()
cloned2 := input.Clone()
require.True(t, cloned1.Equal(input))
require.True(t, cloned2.Equal(input))

// Assert that modifying the clone does not mutate the original pipeline metadata.
cloned1[0].StoragePolicies[0] = policy.MustParseStoragePolicy("1h:1h")
require.False(t, cloned1.Equal(input))
require.True(t, cloned2.Equal(input))
}

func TestStagedMetadatasToProto(t *testing.T) {
inputs := []struct {
sequence []StagedMetadatas
Expand Down
8 changes: 7 additions & 1 deletion rules/active_ruleset.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,12 @@ func (as *activeRuleSet) mappingsForNonRollupID(
}
pipelines = append(pipelines, pipeline)
}
// NB: The pipeline list should never be empty as the resulting pipelines are
// used to determine how the *existing* ID is aggregated and retained. If there
// are no rule match, the default pipeline list is used.
if len(pipelines) == 0 {
pipelines = metadata.DefaultPipelineMetadatas.Clone()
}
return mappingResults{
forExistingID: ruleMatchResults{cutoverNanos: cutoverNanos, pipelines: pipelines},
}
Expand Down Expand Up @@ -489,7 +495,7 @@ func (as *activeRuleSet) reverseMappingsForNonRollupID(
at aggregation.Type,
) (metadata.StagedMetadata, bool) {
mappingRes := as.mappingsForNonRollupID(id, timeNanos).forExistingID
filteredPipelines := filteredPipelinesWithAggregationType(mappingRes.resolvedPipelines(), mt, at, as.aggTypeOpts)
filteredPipelines := filteredPipelinesWithAggregationType(mappingRes.pipelines, mt, at, as.aggTypeOpts)
if len(filteredPipelines) == 0 {
return metadata.DefaultStagedMetadata, false
}
Expand Down
19 changes: 15 additions & 4 deletions rules/active_ruleset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ import (
)

var (
testStagedMetadatasCmptOpts = []cmp.Option{
cmpopts.EquateEmpty(),
}
testIDWithMetadatasCmpOpts = []cmp.Option{
cmpopts.EquateEmpty(),
}
Expand Down Expand Up @@ -470,7 +473,7 @@ func TestActiveRuleSetForwardMatchWithMappingRules(t *testing.T) {
for _, input := range inputs {
res := as.ForwardMatch(b(input.id), input.matchFrom, input.matchTo)
require.Equal(t, input.expireAtNanos, res.expireAtNanos)
require.Equal(t, input.forExistingIDResult, res.ForExistingIDAt(0))
require.True(t, cmp.Equal(input.forExistingIDResult, res.ForExistingIDAt(0), testStagedMetadatasCmptOpts...))
require.Equal(t, 0, res.NumNewRollupIDs())
}
}
Expand All @@ -488,6 +491,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) {
Tombstoned: false,
Metadata: metadata.Metadata{
Pipelines: []metadata.PipelineMetadata{
metadata.DefaultPipelineMetadata,
{
AggregationID: aggregation.MustCompressTypes(aggregation.Sum),
StoragePolicies: policy.StoragePolicies{
Expand Down Expand Up @@ -655,6 +659,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) {
Tombstoned: false,
Metadata: metadata.Metadata{
Pipelines: []metadata.PipelineMetadata{
metadata.DefaultPipelineMetadata,
{
AggregationID: aggregation.MustCompressTypes(aggregation.Sum),
StoragePolicies: policy.StoragePolicies{
Expand All @@ -680,6 +685,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) {
Tombstoned: false,
Metadata: metadata.Metadata{
Pipelines: []metadata.PipelineMetadata{
metadata.DefaultPipelineMetadata,
{
AggregationID: aggregation.MustCompressTypes(aggregation.Sum),
StoragePolicies: policy.StoragePolicies{
Expand Down Expand Up @@ -727,6 +733,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) {
Tombstoned: false,
Metadata: metadata.Metadata{
Pipelines: []metadata.PipelineMetadata{
metadata.DefaultPipelineMetadata,
{
AggregationID: aggregation.MustCompressTypes(aggregation.Sum),
StoragePolicies: policy.StoragePolicies{
Expand Down Expand Up @@ -774,6 +781,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) {
Tombstoned: false,
Metadata: metadata.Metadata{
Pipelines: []metadata.PipelineMetadata{
metadata.DefaultPipelineMetadata,
{
AggregationID: aggregation.DefaultID,
StoragePolicies: policy.StoragePolicies{
Expand Down Expand Up @@ -825,6 +833,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) {
Tombstoned: false,
Metadata: metadata.Metadata{
Pipelines: []metadata.PipelineMetadata{
metadata.DefaultPipelineMetadata,
{
AggregationID: aggregation.DefaultID,
StoragePolicies: policy.StoragePolicies{
Expand Down Expand Up @@ -876,6 +885,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) {
Tombstoned: false,
Metadata: metadata.Metadata{
Pipelines: []metadata.PipelineMetadata{
metadata.DefaultPipelineMetadata,
{
AggregationID: aggregation.DefaultID,
StoragePolicies: policy.StoragePolicies{
Expand Down Expand Up @@ -905,6 +915,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) {
Tombstoned: false,
Metadata: metadata.Metadata{
Pipelines: []metadata.PipelineMetadata{
metadata.DefaultPipelineMetadata,
{
AggregationID: aggregation.DefaultID,
StoragePolicies: policy.StoragePolicies{
Expand Down Expand Up @@ -1231,7 +1242,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) {
for _, input := range inputs {
res := as.ForwardMatch(b(input.id), input.matchFrom, input.matchTo)
require.Equal(t, input.expireAtNanos, res.expireAtNanos)
require.Equal(t, input.forExistingIDResult, res.ForExistingIDAt(0))
require.True(t, cmp.Equal(input.forExistingIDResult, res.ForExistingIDAt(0), testStagedMetadatasCmptOpts...))
require.Equal(t, len(input.forNewRollupIDsResult), res.NumNewRollupIDs())
for i := 0; i < len(input.forNewRollupIDsResult); i++ {
rollup := res.ForNewRollupIDsAt(i, 0)
Expand Down Expand Up @@ -2270,7 +2281,7 @@ func TestActiveRuleSetForwardMatchWithMappingRulesAndRollupRules(t *testing.T) {
for _, input := range inputs {
res := as.ForwardMatch(b(input.id), input.matchFrom, input.matchTo)
require.Equal(t, input.expireAtNanos, res.expireAtNanos)
require.Equal(t, input.forExistingIDResult, res.ForExistingIDAt(0))
require.True(t, cmp.Equal(input.forExistingIDResult, res.ForExistingIDAt(0), testStagedMetadatasCmptOpts...))
require.Equal(t, len(input.forNewRollupIDsResult), res.NumNewRollupIDs())
for i := 0; i < len(input.forNewRollupIDsResult); i++ {
rollup := res.ForNewRollupIDsAt(i, 0)
Expand Down Expand Up @@ -2691,7 +2702,7 @@ func TestActiveRuleSetReverseMatchWithMappingRulesForNonRollupID(t *testing.T) {
for _, input := range inputs {
res := as.ReverseMatch(b(input.id), input.matchFrom, input.matchTo, input.metricType, input.aggregationType)
require.Equal(t, input.expireAtNanos, res.expireAtNanos)
require.Equal(t, input.forExistingIDResult, res.ForExistingIDAt(0))
require.True(t, cmp.Equal(input.forExistingIDResult, res.ForExistingIDAt(0), testStagedMetadatasCmptOpts...))
}
}

Expand Down

0 comments on commit c0c0a65

Please sign in to comment.