Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Ensure cutover times of staged metadatas from mapping rule matches ar…
Browse files Browse the repository at this point in the history
…e non-decreasing
  • Loading branch information
xichen2020 committed Jul 2, 2018
1 parent 0d43540 commit 92eb72c
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 6 deletions.
31 changes: 28 additions & 3 deletions rules/active_ruleset.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (as *activeRuleSet) ForwardMatch(
)
for nextIdx < len(as.cutoverTimesAsc) && nextCutoverNanos < toNanos {
nextMatchRes := as.forwardMatchAt(id, nextCutoverNanos)
forExistingID = append(forExistingID, nextMatchRes.forExistingID)
forExistingID = mergeResultsForExistingID(forExistingID, nextMatchRes.forExistingID, nextCutoverNanos)
forNewRollupIDs = mergeResultsForNewRollupIDs(forNewRollupIDs, nextMatchRes.forNewRollupIDs, nextCutoverNanos)
nextIdx++
nextCutoverNanos = as.cutoverNanosAt(nextIdx)
Expand Down Expand Up @@ -160,11 +160,11 @@ func (as *activeRuleSet) ReverseMatch(
}

if currForExistingID, found := as.reverseMappingsFor(id, name, tags, isRollupID, fromNanos, mt, at); found {
forExistingID = append(forExistingID, currForExistingID)
forExistingID = mergeResultsForExistingID(forExistingID, currForExistingID, fromNanos)
}
for nextIdx < len(as.cutoverTimesAsc) && nextCutoverNanos < toNanos {
if nextForExistingID, found := as.reverseMappingsFor(id, name, tags, isRollupID, nextCutoverNanos, mt, at); found {
forExistingID = append(forExistingID, nextForExistingID)
forExistingID = mergeResultsForExistingID(forExistingID, nextForExistingID, nextCutoverNanos)
}
nextIdx++
nextCutoverNanos = as.cutoverNanosAt(nextIdx)
Expand Down Expand Up @@ -618,6 +618,31 @@ func filteredPipelinesWithAggregationType(
return pipelines[:cur]
}

// mergeResultsForExistingID merges the next staged metadata into the current list of staged
// metadatas while ensuring the cutover times of the staged metadatas are non-decreasing. This
// is needed because the cutover times of staged metadata results produced by mapping rule matching
// may not always be in ascending order. For example, if at time T0 a metric matches against a
// mapping rule, and the filter of such rule changed at T1 such that the metric no longer matches
// the rule, this would indicate the staged metadata at T0 would have a cutover time of T0,
// whereas the staged metadata at T1 would have a cutover time of 0 (due to no rule match),
// in which case we need to set the cutover time of the staged metadata at T1 to T1 to ensure
// the mononicity of cutover times.
func mergeResultsForExistingID(
currMetadatas metadata.StagedMetadatas,
nextMetadata metadata.StagedMetadata,
nextCutoverNanos int64,
) metadata.StagedMetadatas {
if len(currMetadatas) == 0 {
return metadata.StagedMetadatas{nextMetadata}
}
currCutoverNanos := currMetadatas[len(currMetadatas)-1].CutoverNanos
if currCutoverNanos > nextMetadata.CutoverNanos {
nextMetadata.CutoverNanos = nextCutoverNanos
}
currMetadatas = append(currMetadatas, nextMetadata)
return currMetadatas
}

// mergeResultsForNewRollupIDs merges the current list of staged metadatas for new rollup IDs
// with the list of staged metadatas for new rollup IDs at the next rule cutover time, assuming
// that both the current metadatas list and the next metadatas list are sorted by rollup IDs
Expand Down
100 changes: 97 additions & 3 deletions rules/active_ruleset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestActiveRuleSetCutoverTimesWithMappingRules(t *testing.T) {
nil,
aggregation.NewTypesOptions(),
)
expectedCutovers := []int64{10000, 15000, 20000, 22000, 24000, 30000, 34000, 35000, 100000}
expectedCutovers := []int64{5000, 8000, 10000, 15000, 20000, 22000, 24000, 30000, 34000, 35000, 100000}
require.Equal(t, expectedCutovers, as.cutoverTimesAsc)
}

Expand Down Expand Up @@ -87,7 +87,7 @@ func TestActiveRuleSetCutoverTimesWithMappingRulesAndRollupRules(t *testing.T) {
nil,
aggregation.NewTypesOptions(),
)
expectedCutovers := []int64{10000, 15000, 20000, 22000, 24000, 30000, 34000, 35000, 38000, 90000, 100000, 120000}
expectedCutovers := []int64{5000, 8000, 10000, 15000, 20000, 22000, 24000, 30000, 34000, 35000, 38000, 90000, 100000, 120000}
require.Equal(t, expectedCutovers, as.cutoverTimesAsc)
}

Expand Down Expand Up @@ -394,6 +394,34 @@ func TestActiveRuleSetForwardMatchWithMappingRules(t *testing.T) {
},
},
},
{
id: "mtagName1=mtagValue3",
matchFrom: 4000,
matchTo: 9000,
expireAtNanos: 10000,
forExistingIDResult: metadata.StagedMetadatas{
metadata.DefaultStagedMetadata,
metadata.StagedMetadata{
CutoverNanos: 5000,
Tombstoned: false,
Metadata: metadata.Metadata{
Pipelines: []metadata.PipelineMetadata{
{
AggregationID: aggregation.DefaultID,
StoragePolicies: policy.StoragePolicies{
policy.NewStoragePolicy(10*time.Second, xtime.Second, 24*time.Hour),
},
},
},
},
},
metadata.StagedMetadata{
CutoverNanos: 8000,
Tombstoned: false,
Metadata: metadata.DefaultMetadata,
},
},
},
{
id: "mtagName1=mtagValue2",
matchFrom: 10000,
Expand Down Expand Up @@ -2267,6 +2295,34 @@ func TestActiveRuleSetForwardMatchWithMappingRulesAndRollupRules(t *testing.T) {
},
},
},
{
id: "mtagName1=mtagValue3",
matchFrom: 4000,
matchTo: 9000,
expireAtNanos: 10000,
forExistingIDResult: metadata.StagedMetadatas{
metadata.DefaultStagedMetadata,
metadata.StagedMetadata{
CutoverNanos: 5000,
Tombstoned: false,
Metadata: metadata.Metadata{
Pipelines: []metadata.PipelineMetadata{
{
AggregationID: aggregation.DefaultID,
StoragePolicies: policy.StoragePolicies{
policy.NewStoragePolicy(10*time.Second, xtime.Second, 24*time.Hour),
},
},
},
},
},
metadata.StagedMetadata{
CutoverNanos: 8000,
Tombstoned: false,
Metadata: metadata.DefaultMetadata,
},
},
},
}

as := newActiveRuleSet(
Expand Down Expand Up @@ -2920,6 +2976,18 @@ func testMappingRules(t *testing.T) []*mappingRule {
testTagsFilterOptions(),
)
require.NoError(t, err)
filter3, err := filters.NewTagsFilter(
filters.TagFilterValueMap{"mtagName1": filters.FilterValue{Pattern: "mtagValue3"}},
filters.Conjunction,
testTagsFilterOptions(),
)
require.NoError(t, err)
filter4, err := filters.NewTagsFilter(
filters.TagFilterValueMap{"mtagName1": filters.FilterValue{Pattern: "mtagValue4"}},
filters.Conjunction,
testTagsFilterOptions(),
)
require.NoError(t, err)

mappingRule1 := &mappingRule{
uuid: "mappingRule1",
Expand Down Expand Up @@ -3068,7 +3136,33 @@ func testMappingRules(t *testing.T) []*mappingRule {
},
}

return []*mappingRule{mappingRule1, mappingRule2, mappingRule3, mappingRule4, mappingRule5}
mappingRule6 := &mappingRule{
uuid: "mappingRule6",
snapshots: []*mappingRuleSnapshot{
&mappingRuleSnapshot{
name: "mappingRule6.snapshot1",
tombstoned: false,
cutoverNanos: 5000,
filter: filter3,
aggregationID: aggregation.DefaultID,
storagePolicies: policy.StoragePolicies{
policy.NewStoragePolicy(10*time.Second, xtime.Second, 24*time.Hour),
},
},
&mappingRuleSnapshot{
name: "mappingRule6.snapshot2",
tombstoned: false,
cutoverNanos: 8000,
filter: filter4,
aggregationID: aggregation.DefaultID,
storagePolicies: policy.StoragePolicies{
policy.NewStoragePolicy(time.Minute, xtime.Minute, time.Hour),
},
},
},
}

return []*mappingRule{mappingRule1, mappingRule2, mappingRule3, mappingRule4, mappingRule5, mappingRule6}
}

func testRollupRules(t *testing.T) []*rollupRule {
Expand Down

0 comments on commit 92eb72c

Please sign in to comment.