Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Merge branch 'master' into r/drop-mapping-rule
Browse files Browse the repository at this point in the history
  • Loading branch information
Rob Skillington committed Jun 29, 2018
2 parents 145f75e + 0d43540 commit e591aef
Show file tree
Hide file tree
Showing 6 changed files with 278 additions and 26 deletions.
40 changes: 37 additions & 3 deletions metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var (
DefaultPipelineMetadata PipelineMetadata

// DefaultPipelineMetadatas is a default list of pipeline metadatas.
DefaultPipelineMetadatas = []PipelineMetadata{DefaultPipelineMetadata}
DefaultPipelineMetadatas = PipelineMetadatas{DefaultPipelineMetadata}

// DefaultMetadata is a default metadata.
DefaultMetadata = Metadata{Pipelines: DefaultPipelineMetadatas}
Expand Down Expand Up @@ -91,6 +91,15 @@ func (m PipelineMetadata) IsDropPolicyApplied() bool {
!m.DropPolicy.IsDefault()
}

// Clone clones the pipeline metadata.
func (m PipelineMetadata) Clone() PipelineMetadata {
return PipelineMetadata{
AggregationID: m.AggregationID,
StoragePolicies: m.StoragePolicies.Clone(),
Pipeline: m.Pipeline.Clone(),
}
}

// ToProto converts the pipeline metadata to a protobuf message in place.
func (m PipelineMetadata) ToProto(pb *metricpb.PipelineMetadata) error {
if err := m.AggregationID.ToProto(&pb.AggregationId); err != nil {
Expand Down Expand Up @@ -137,9 +146,34 @@ func (m *PipelineMetadata) FromProto(pb metricpb.PipelineMetadata) error {
return nil
}

// PipelineMetadatas is a list of pipeline metadatas.
type PipelineMetadatas []PipelineMetadata

// Equal returns true if two pipline metadatas are considered equal.
func (metadatas PipelineMetadatas) Equal(other PipelineMetadatas) bool {
if len(metadatas) != len(other) {
return false
}
for i := 0; i < len(metadatas); i++ {
if !metadatas[i].Equal(other[i]) {
return false
}
}
return true
}

// Clone clones the list of pipeline metadatas.
func (metadatas PipelineMetadatas) Clone() PipelineMetadatas {
cloned := make(PipelineMetadatas, 0, len(metadatas))
for i := 0; i < len(metadatas); i++ {
cloned = append(cloned, metadatas[i].Clone())
}
return cloned
}

// Metadata represents the metadata associated with a metric.
type Metadata struct {
Pipelines []PipelineMetadata `json:"pipelines"`
Pipelines PipelineMetadatas `json:"pipelines"`
}

// IsDefault returns whether this is the default metadata.
Expand Down Expand Up @@ -175,7 +209,7 @@ func (m *Metadata) FromProto(pb metricpb.Metadata) error {
if cap(m.Pipelines) >= numPipelines {
m.Pipelines = m.Pipelines[:numPipelines]
} else {
m.Pipelines = make([]PipelineMetadata, numPipelines)
m.Pipelines = make(PipelineMetadatas, numPipelines)
}
for i := 0; i < numPipelines; i++ {
if err := m.Pipelines[i].FromProto(pb.Pipelines[i]); err != nil {
Expand Down
28 changes: 28 additions & 0 deletions metadata/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,18 @@ func TestForwardMetadataFromProtoBadMetadataProto(t *testing.T) {
require.Error(t, res.FromProto(testBadForwardMetadataProto))
}

func TestPipelineMetadataClone(t *testing.T) {
cloned1 := testLargePipelineMetadata.Clone()
cloned2 := testLargePipelineMetadata.Clone()
require.True(t, cloned1.Equal(testLargePipelineMetadata))
require.True(t, cloned2.Equal(testLargePipelineMetadata))

// Assert that modifying the clone does not mutate the original pipeline metadata.
cloned1.StoragePolicies[0] = policy.MustParseStoragePolicy("1h:1h")
require.False(t, cloned1.Equal(testLargePipelineMetadata))
require.True(t, cloned2.Equal(testLargePipelineMetadata))
}

func TestPipelineMetadataToProto(t *testing.T) {
inputs := []struct {
sequence []PipelineMetadata
Expand Down Expand Up @@ -917,6 +929,22 @@ func TestPipelineMetadataFromProtoBadMetadataProto(t *testing.T) {
require.Error(t, res.FromProto(testBadPipelineMetadataProto))
}

func TestPipelineMetadatasClone(t *testing.T) {
input := PipelineMetadatas{
testSmallPipelineMetadata,
testLargePipelineMetadata,
}
cloned1 := input.Clone()
cloned2 := input.Clone()
require.True(t, cloned1.Equal(input))
require.True(t, cloned2.Equal(input))

// Assert that modifying the clone does not mutate the original pipeline metadata.
cloned1[0].StoragePolicies[0] = policy.MustParseStoragePolicy("1h:1h")
require.False(t, cloned1.Equal(input))
require.True(t, cloned2.Equal(input))
}

func TestStagedMetadatasToProto(t *testing.T) {
inputs := []struct {
sequence []StagedMetadatas
Expand Down
8 changes: 7 additions & 1 deletion rules/active_ruleset.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ func (as *activeRuleSet) mappingsForNonRollupID(
}
pipelines = append(pipelines, pipeline)
}
// NB: The pipeline list should never be empty as the resulting pipelines are
// used to determine how the *existing* ID is aggregated and retained. If there
// are no rule match, the default pipeline list is used.
if len(pipelines) == 0 {
pipelines = metadata.DefaultPipelineMetadatas.Clone()
}
return mappingResults{
forExistingID: ruleMatchResults{cutoverNanos: cutoverNanos, pipelines: pipelines},
}
Expand Down Expand Up @@ -491,7 +497,7 @@ func (as *activeRuleSet) reverseMappingsForNonRollupID(
at aggregation.Type,
) (metadata.StagedMetadata, bool) {
mappingRes := as.mappingsForNonRollupID(id, timeNanos).forExistingID
filteredPipelines := filteredPipelinesWithAggregationType(mappingRes.resolvedPipelines(), mt, at, as.aggTypeOpts)
filteredPipelines := filteredPipelinesWithAggregationType(mappingRes.pipelines, mt, at, as.aggTypeOpts)
if len(filteredPipelines) == 0 {
return metadata.DefaultStagedMetadata, false
}
Expand Down
29 changes: 21 additions & 8 deletions rules/active_ruleset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ import (

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var (
testStagedMetadatasCmptOpts = []cmp.Option{
cmpopts.EquateEmpty(),
}
testIDWithMetadatasCmpOpts = []cmp.Option{
cmpopts.EquateEmpty(),
}
Expand Down Expand Up @@ -521,11 +523,12 @@ func TestActiveRuleSetForwardMatchWithMappingRules(t *testing.T) {
nil,
aggregation.NewTypesOptions(),
)

for i, input := range inputs {
t.Run(fmt.Sprintf("input %d", i), func(t *testing.T) {
res := as.ForwardMatch(b(input.id), input.matchFrom, input.matchTo)
require.Equal(t, input.expireAtNanos, res.expireAtNanos)
require.Equal(t, input.forExistingIDResult, res.ForExistingIDAt(0))
require.True(t, cmp.Equal(input.forExistingIDResult, res.ForExistingIDAt(0), testStagedMetadatasCmptOpts...))
require.Equal(t, 0, res.NumNewRollupIDs())
})
}
Expand All @@ -544,6 +547,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) {
Tombstoned: false,
Metadata: metadata.Metadata{
Pipelines: []metadata.PipelineMetadata{
metadata.DefaultPipelineMetadata,
{
AggregationID: aggregation.MustCompressTypes(aggregation.Sum),
StoragePolicies: policy.StoragePolicies{
Expand Down Expand Up @@ -711,6 +715,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) {
Tombstoned: false,
Metadata: metadata.Metadata{
Pipelines: []metadata.PipelineMetadata{
metadata.DefaultPipelineMetadata,
{
AggregationID: aggregation.MustCompressTypes(aggregation.Sum),
StoragePolicies: policy.StoragePolicies{
Expand All @@ -736,6 +741,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) {
Tombstoned: false,
Metadata: metadata.Metadata{
Pipelines: []metadata.PipelineMetadata{
metadata.DefaultPipelineMetadata,
{
AggregationID: aggregation.MustCompressTypes(aggregation.Sum),
StoragePolicies: policy.StoragePolicies{
Expand Down Expand Up @@ -783,6 +789,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) {
Tombstoned: false,
Metadata: metadata.Metadata{
Pipelines: []metadata.PipelineMetadata{
metadata.DefaultPipelineMetadata,
{
AggregationID: aggregation.MustCompressTypes(aggregation.Sum),
StoragePolicies: policy.StoragePolicies{
Expand Down Expand Up @@ -830,6 +837,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) {
Tombstoned: false,
Metadata: metadata.Metadata{
Pipelines: []metadata.PipelineMetadata{
metadata.DefaultPipelineMetadata,
{
AggregationID: aggregation.DefaultID,
StoragePolicies: policy.StoragePolicies{
Expand Down Expand Up @@ -881,6 +889,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) {
Tombstoned: false,
Metadata: metadata.Metadata{
Pipelines: []metadata.PipelineMetadata{
metadata.DefaultPipelineMetadata,
{
AggregationID: aggregation.DefaultID,
StoragePolicies: policy.StoragePolicies{
Expand Down Expand Up @@ -932,6 +941,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) {
Tombstoned: false,
Metadata: metadata.Metadata{
Pipelines: []metadata.PipelineMetadata{
metadata.DefaultPipelineMetadata,
{
AggregationID: aggregation.DefaultID,
StoragePolicies: policy.StoragePolicies{
Expand Down Expand Up @@ -961,6 +971,7 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) {
Tombstoned: false,
Metadata: metadata.Metadata{
Pipelines: []metadata.PipelineMetadata{
metadata.DefaultPipelineMetadata,
{
AggregationID: aggregation.DefaultID,
StoragePolicies: policy.StoragePolicies{
Expand Down Expand Up @@ -1284,11 +1295,12 @@ func TestActiveRuleSetForwardMatchWithRollupRules(t *testing.T) {
nil,
aggregation.NewTypesOptions(),
)

for i, input := range inputs {
t.Run(fmt.Sprintf("input %d", i), func(t *testing.T) {
res := as.ForwardMatch(b(input.id), input.matchFrom, input.matchTo)
require.Equal(t, input.expireAtNanos, res.expireAtNanos)
require.Equal(t, input.forExistingIDResult, res.ForExistingIDAt(0))
require.True(t, cmp.Equal(input.forExistingIDResult, res.ForExistingIDAt(0), testStagedMetadatasCmptOpts...))
require.Equal(t, len(input.forNewRollupIDsResult), res.NumNewRollupIDs())
for i := 0; i < len(input.forNewRollupIDsResult); i++ {
rollup := res.ForNewRollupIDsAt(i, 0)
Expand Down Expand Up @@ -2381,15 +2393,16 @@ func TestActiveRuleSetForwardMatchWithMappingRulesAndRollupRules(t *testing.T) {
nil,
aggregation.NewTypesOptions(),
)

for i, input := range inputs {
t.Run(fmt.Sprintf("input %d", i), func(t *testing.T) {
res := as.ForwardMatch(b(input.id), input.matchFrom, input.matchTo)
assert.Equal(t, input.expireAtNanos, res.expireAtNanos)
assert.Equal(t, input.forExistingIDResult, res.ForExistingIDAt(0))
require.Equal(t, len(input.forNewRollupIDsResult), res.NumNewRollupIDs(), "failed num rollup ids expected=%d, actual=%d", len(input.forNewRollupIDsResult), res.NumNewRollupIDs())
require.Equal(t, input.expireAtNanos, res.expireAtNanos)
require.True(t, cmp.Equal(input.forExistingIDResult, res.ForExistingIDAt(0), testStagedMetadatasCmptOpts...))
require.Equal(t, len(input.forNewRollupIDsResult), res.NumNewRollupIDs())
for i := 0; i < len(input.forNewRollupIDsResult); i++ {
rollup := res.ForNewRollupIDsAt(i, 0)
assert.True(t, cmp.Equal(input.forNewRollupIDsResult[i], rollup, testIDWithMetadatasCmpOpts...))
require.True(t, cmp.Equal(input.forNewRollupIDsResult[i], rollup, testIDWithMetadatasCmpOpts...))
}
})
}
Expand Down Expand Up @@ -2807,7 +2820,7 @@ func TestActiveRuleSetReverseMatchWithMappingRulesForNonRollupID(t *testing.T) {
for _, input := range inputs {
res := as.ReverseMatch(b(input.id), input.matchFrom, input.matchTo, input.metricType, input.aggregationType)
require.Equal(t, input.expireAtNanos, res.expireAtNanos)
require.Equal(t, input.forExistingIDResult, res.ForExistingIDAt(0))
require.True(t, cmp.Equal(input.forExistingIDResult, res.ForExistingIDAt(0), testStagedMetadatasCmptOpts...))
}
}

Expand Down
40 changes: 29 additions & 11 deletions rules/validator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,28 +59,46 @@ func (c Configuration) NewValidator(
if err != nil {
return nil, err
}
opts := c.newValidatorOptions(nsValidator)
return NewValidator(opts), nil
}

func (c Configuration) newValidatorOptions(
nsValidator namespace.Validator,
) Options {
opts := NewOptions().
SetNamespaceValidator(nsValidator).
SetRequiredRollupTags(c.RequiredRollupTags).
SetMetricTypesFn(c.MetricTypes.NewMetricTypesFn()).
SetDefaultAllowedStoragePolicies(c.Policies.DefaultAllowed.StoragePolicies).
SetDefaultAllowedFirstLevelAggregationTypes(c.Policies.DefaultAllowed.FirstLevelAggregationTypes).
SetDefaultAllowedNonFirstLevelAggregationTypes(c.Policies.DefaultAllowed.NonFirstLevelAggregationTypes).
SetTagNameInvalidChars(toRunes(c.TagNameInvalidChars)).
SetMetricNameInvalidChars(toRunes(c.MetricNameInvalidChars))
if c.Policies.DefaultAllowed.StoragePolicies != nil {
opts = opts.SetDefaultAllowedStoragePolicies(*c.Policies.DefaultAllowed.StoragePolicies)
}
if c.Policies.DefaultAllowed.FirstLevelAggregationTypes != nil {
opts = opts.SetDefaultAllowedFirstLevelAggregationTypes(*c.Policies.DefaultAllowed.FirstLevelAggregationTypes)
}
if c.Policies.DefaultAllowed.NonFirstLevelAggregationTypes != nil {
opts = opts.SetDefaultAllowedNonFirstLevelAggregationTypes(*c.Policies.DefaultAllowed.NonFirstLevelAggregationTypes)
}
for _, override := range c.Policies.Overrides {
opts = opts.
SetAllowedStoragePoliciesFor(override.Type, override.Allowed.StoragePolicies).
SetAllowedFirstLevelAggregationTypesFor(override.Type, override.Allowed.FirstLevelAggregationTypes).
SetAllowedNonFirstLevelAggregationTypesFor(override.Type, override.Allowed.NonFirstLevelAggregationTypes)
if override.Allowed.StoragePolicies != nil {
opts = opts.SetAllowedStoragePoliciesFor(override.Type, *override.Allowed.StoragePolicies)
}
if override.Allowed.FirstLevelAggregationTypes != nil {
opts = opts.SetAllowedFirstLevelAggregationTypesFor(override.Type, *override.Allowed.FirstLevelAggregationTypes)
}
if override.Allowed.NonFirstLevelAggregationTypes != nil {
opts = opts.SetAllowedNonFirstLevelAggregationTypesFor(override.Type, *override.Allowed.NonFirstLevelAggregationTypes)
}
}
if c.MaxTransformationDerivativeOrder != nil {
opts = opts.SetMaxTransformationDerivativeOrder(*c.MaxTransformationDerivativeOrder)
}
if c.MaxRollupLevels != nil {
opts = opts.SetMaxRollupLevels(*c.MaxRollupLevels)
}
return NewValidator(opts), nil
return opts
}

type namespaceValidatorConfiguration struct {
Expand Down Expand Up @@ -152,9 +170,9 @@ type policiesOverrideConfiguration struct {

// policiesConfiguration is the configuration for storage policies and aggregation types.
type policiesConfiguration struct {
StoragePolicies []policy.StoragePolicy `yaml:"storagePolicies"`
FirstLevelAggregationTypes []aggregation.Type `yaml:"firstLevelAggregationTypes"`
NonFirstLevelAggregationTypes []aggregation.Type `yaml:"nonFirstLevelAggregationTypes"`
StoragePolicies *[]policy.StoragePolicy `yaml:"storagePolicies"`
FirstLevelAggregationTypes *[]aggregation.Type `yaml:"firstLevelAggregationTypes"`
NonFirstLevelAggregationTypes *[]aggregation.Type `yaml:"nonFirstLevelAggregationTypes"`
}

func toRunes(s string) []rune {
Expand Down
Loading

0 comments on commit e591aef

Please sign in to comment.