-
Notifications
You must be signed in to change notification settings - Fork 458
/
Copy pathactive_ruleset.go
829 lines (776 loc) · 29.1 KB
/
active_ruleset.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package rules
import (
"bytes"
"fmt"
"sort"
"github.com/m3db/m3/src/metrics/aggregation"
"github.com/m3db/m3/src/metrics/filters"
"github.com/m3db/m3/src/metrics/metadata"
"github.com/m3db/m3/src/metrics/metric"
metricID "github.com/m3db/m3/src/metrics/metric/id"
mpipeline "github.com/m3db/m3/src/metrics/pipeline"
"github.com/m3db/m3/src/metrics/pipeline/applied"
xerrors "github.com/m3db/m3x/errors"
)
// Matcher matches metrics against rules to determine applicable policies.
type Matcher interface {
// ForwardMatch matches the applicable policies for a metric id between [fromNanos, toNanos).
ForwardMatch(id []byte, fromNanos, toNanos int64) MatchResult
// ReverseMatch reverse matches the applicable policies for a metric id between [fromNanos, toNanos),
// with aware of the metric type and aggregation type for the given id.
ReverseMatch(
id []byte,
fromNanos, toNanos int64,
mt metric.Type,
at aggregation.Type,
isMultiAggregationTypesAllowed bool,
aggTypesOpts aggregation.TypesOptions,
) MatchResult
}
type activeRuleSet struct {
version int
mappingRules []*mappingRule
rollupRules []*rollupRule
cutoverTimesAsc []int64
tagsFilterOpts filters.TagsFilterOptions
newRollupIDFn metricID.NewIDFn
isRollupIDFn metricID.MatchIDFn
}
func newActiveRuleSet(
version int,
mappingRules []*mappingRule,
rollupRules []*rollupRule,
tagsFilterOpts filters.TagsFilterOptions,
newRollupIDFn metricID.NewIDFn,
isRollupIDFn metricID.MatchIDFn,
) *activeRuleSet {
uniqueCutoverTimes := make(map[int64]struct{})
for _, mappingRule := range mappingRules {
for _, snapshot := range mappingRule.snapshots {
uniqueCutoverTimes[snapshot.cutoverNanos] = struct{}{}
}
}
for _, rollupRule := range rollupRules {
for _, snapshot := range rollupRule.snapshots {
uniqueCutoverTimes[snapshot.cutoverNanos] = struct{}{}
}
}
cutoverTimesAsc := make([]int64, 0, len(uniqueCutoverTimes))
for t := range uniqueCutoverTimes {
cutoverTimesAsc = append(cutoverTimesAsc, t)
}
sort.Sort(int64Asc(cutoverTimesAsc))
return &activeRuleSet{
version: version,
mappingRules: mappingRules,
rollupRules: rollupRules,
cutoverTimesAsc: cutoverTimesAsc,
tagsFilterOpts: tagsFilterOpts,
newRollupIDFn: newRollupIDFn,
isRollupIDFn: isRollupIDFn,
}
}
// The forward matching logic goes like this:
//
// Imagine you have the list of rules in the ruleset lined up vertically. Each rule may have one
// or more snapshots, each of which represents a change to that rule (e.g., filter change, policy
// change, etc.). These snapshots are naturally non-overlapping in time since only one snapshot
// can be active at a given point in time. As a result, if we use the x axis to represent time,
// then for each rule, a snapshot is active for some amount of time. IOW, if you pick a time and
// draw a vertical line across the set of rules, the snapshots of different ruels that intersect
// with the vertical line are the active rule snapshots for the ruleset.
//
// Now you have a list of times you need to perform rule matching at. Each matching time
// corresponds to a cutover time of a rule in the ruleset, because that's when matching the metric
// ID against this rule may lead to a different metadata including different storage policies and
// new rollup IDs to be generated or existing rollup IDs to stop being generated. The final match
// result is a collection of such metadata sorted by time in ascending order.
//
// NB(xichen): can further consolidate consecutive staged metadata to deduplicate.
func (as *activeRuleSet) ForwardMatch(
id []byte,
fromNanos, toNanos int64,
) MatchResult {
var (
currMatchRes = as.forwardMatchAt(id, fromNanos)
forExistingID = metadata.StagedMetadatas{currMatchRes.forExistingID}
forNewRollupIDs = currMatchRes.forNewRollupIDs
nextIdx = as.nextCutoverIdx(fromNanos)
nextCutoverNanos = as.cutoverNanosAt(nextIdx)
)
for nextIdx < len(as.cutoverTimesAsc) && nextCutoverNanos < toNanos {
nextMatchRes := as.forwardMatchAt(id, nextCutoverNanos)
forExistingID = mergeResultsForExistingID(forExistingID, nextMatchRes.forExistingID, nextCutoverNanos)
forNewRollupIDs = mergeResultsForNewRollupIDs(forNewRollupIDs, nextMatchRes.forNewRollupIDs, nextCutoverNanos)
nextIdx++
nextCutoverNanos = as.cutoverNanosAt(nextIdx)
}
// The result expires when the beginning of the match time range reaches the first cutover time
// after `fromNanos`, or the end of the match time range reaches the first cutover time after
// `toNanos` among all active rules because the metric may then be matched against a different
// set of rules.
return NewMatchResult(as.version, nextCutoverNanos, forExistingID, forNewRollupIDs)
}
func (as *activeRuleSet) ReverseMatch(
id []byte,
fromNanos, toNanos int64,
mt metric.Type,
at aggregation.Type,
isMultiAggregationTypesAllowed bool,
aggTypesOpts aggregation.TypesOptions,
) MatchResult {
var (
nextIdx = as.nextCutoverIdx(fromNanos)
nextCutoverNanos = as.cutoverNanosAt(nextIdx)
forExistingID metadata.StagedMetadatas
isRollupID bool
)
// Determine whether the ID is a rollup metric ID.
name, tags, err := as.tagsFilterOpts.NameAndTagsFn(id)
if err == nil {
isRollupID = as.isRollupIDFn(name, tags)
}
if currForExistingID, found := as.reverseMappingsFor(id, name, tags, isRollupID, fromNanos, mt, at, isMultiAggregationTypesAllowed, aggTypesOpts); found {
forExistingID = mergeResultsForExistingID(forExistingID, currForExistingID, fromNanos)
}
for nextIdx < len(as.cutoverTimesAsc) && nextCutoverNanos < toNanos {
if nextForExistingID, found := as.reverseMappingsFor(id, name, tags, isRollupID, nextCutoverNanos, mt, at, isMultiAggregationTypesAllowed, aggTypesOpts); found {
forExistingID = mergeResultsForExistingID(forExistingID, nextForExistingID, nextCutoverNanos)
}
nextIdx++
nextCutoverNanos = as.cutoverNanosAt(nextIdx)
}
return NewMatchResult(as.version, nextCutoverNanos, forExistingID, nil)
}
// NB(xichen): can further consolidate pipelines with the same aggregation ID
// and same applied pipeline but different storage policies to reduce amount of
// data that needed to be stored in memory and sent across the wire.
func (as *activeRuleSet) forwardMatchAt(
id []byte,
timeNanos int64,
) forwardMatchResult {
mappingResults := as.mappingsForNonRollupID(id, timeNanos)
rollupResults := as.rollupResultsFor(id, timeNanos)
forExistingID := mappingResults.forExistingID.
merge(rollupResults.forExistingID).
unique().
toStagedMetadata()
forNewRollupIDs := make([]IDWithMetadatas, 0, len(rollupResults.forNewRollupIDs))
for _, idWithMatchResult := range rollupResults.forNewRollupIDs {
stagedMetadata := idWithMatchResult.matchResults.unique().toStagedMetadata()
newIDWithMetadatas := IDWithMetadatas{
ID: idWithMatchResult.id,
Metadatas: metadata.StagedMetadatas{stagedMetadata},
}
forNewRollupIDs = append(forNewRollupIDs, newIDWithMetadatas)
}
sort.Sort(IDWithMetadatasByIDAsc(forNewRollupIDs))
return forwardMatchResult{
forExistingID: forExistingID,
forNewRollupIDs: forNewRollupIDs,
}
}
func (as *activeRuleSet) mappingsForNonRollupID(
id []byte,
timeNanos int64,
) mappingResults {
var (
cutoverNanos int64
pipelines []metadata.PipelineMetadata
)
for _, mappingRule := range as.mappingRules {
snapshot := mappingRule.activeSnapshot(timeNanos)
if snapshot == nil {
continue
}
if !snapshot.filter.Matches(id) {
continue
}
// Make sure the cutover time tracks the latest cutover time among all matching
// mapping rules to represent the correct time of rule change.
if cutoverNanos < snapshot.cutoverNanos {
cutoverNanos = snapshot.cutoverNanos
}
// If the mapping rule snapshot is a tombstoned snapshot, its cutover time is
// recorded to indicate a rule change, but its policies are no longer in effect.
if snapshot.tombstoned {
continue
}
pipeline := metadata.PipelineMetadata{
AggregationID: snapshot.aggregationID,
StoragePolicies: snapshot.storagePolicies.Clone(),
DropPolicy: snapshot.dropPolicy,
}
pipelines = append(pipelines, pipeline)
}
pipelines, _ = metadata.PipelineMetadatas(pipelines).ApplyOrRemoveDropPolicies()
// NB: The pipeline list should never be empty as the resulting pipelines are
// used to determine how the *existing* ID is aggregated and retained. If there
// are no rule match, the default pipeline list is used.
if len(pipelines) == 0 {
pipelines = metadata.DefaultPipelineMetadatas.Clone()
}
return mappingResults{
forExistingID: ruleMatchResults{cutoverNanos: cutoverNanos, pipelines: pipelines},
}
}
func (as *activeRuleSet) rollupResultsFor(id []byte, timeNanos int64) rollupResults {
var (
cutoverNanos int64
rollupTargets []rollupTarget
)
for _, rollupRule := range as.rollupRules {
snapshot := rollupRule.activeSnapshot(timeNanos)
if snapshot == nil {
continue
}
if !snapshot.filter.Matches(id) {
continue
}
// Make sure the cutover time tracks the latest cutover time among all matching
// rollup rules to represent the correct time of rule change.
if cutoverNanos < snapshot.cutoverNanos {
cutoverNanos = snapshot.cutoverNanos
}
// If the rollup rule snapshot is a tombstoned snapshot, its cutover time is
// recorded to indicate a rule change, but its rollup targets are no longer in effect.
if snapshot.tombstoned {
continue
}
for _, target := range snapshot.targets {
rollupTargets = append(rollupTargets, target.clone())
}
}
// NB: could log the matching error here if needed.
res, _ := as.toRollupResults(id, cutoverNanos, rollupTargets)
return res
}
// toRollupMatchResult applies the rollup operation in each rollup pipelines contained
// in the rollup targets against the matching ID to determine the resulting new rollup
// ID. It additionally distinguishes rollup pipelines whose first operation is a rollup
// operation from those that aren't since the former pipelines are applied against the
// original metric ID and the latter are applied against new rollup IDs due to the
// application of the rollup operation.
// nolint: unparam
func (as *activeRuleSet) toRollupResults(
id []byte,
cutoverNanos int64,
targets []rollupTarget,
) (rollupResults, error) {
if len(targets) == 0 {
return rollupResults{}, nil
}
// If we cannot extract tags from the id, this is likely an invalid
// metric and we bail early.
_, sortedTagPairBytes, err := as.tagsFilterOpts.NameAndTagsFn(id)
if err != nil {
return rollupResults{}, err
}
var (
multiErr = xerrors.NewMultiError()
pipelines = make([]metadata.PipelineMetadata, 0, len(targets))
newRollupIDResults = make([]idWithMatchResults, 0, len(targets))
tagPairs []metricID.TagPair
)
for _, target := range targets {
pipeline := target.Pipeline
// A rollup target should always have a non-empty pipeline but
// just being defensive here.
if pipeline.IsEmpty() {
err = fmt.Errorf("target %v has empty pipeline", target)
multiErr = multiErr.Add(err)
continue
}
var (
aggregationID aggregation.ID
rollupID []byte
numSteps = pipeline.Len()
firstOp = pipeline.At(0)
toApply mpipeline.Pipeline
)
switch firstOp.Type {
case mpipeline.AggregationOpType:
aggregationID, err = aggregation.CompressTypes(firstOp.Aggregation.Type)
if err != nil {
err = fmt.Errorf("target %v operation 0 aggregation type compression error: %v", target, err)
multiErr = multiErr.Add(err)
continue
}
toApply = pipeline.SubPipeline(1, numSteps)
case mpipeline.TransformationOpType:
aggregationID = aggregation.DefaultID
toApply = pipeline
case mpipeline.RollupOpType:
tagPairs = tagPairs[:0]
var matched bool
rollupID, matched = as.matchRollupTarget(
sortedTagPairBytes,
firstOp.Rollup.NewName,
firstOp.Rollup.Tags,
tagPairs,
matchRollupTargetOptions{generateRollupID: true},
)
if !matched {
// The incoming metric ID did not match the rollup target.
continue
}
aggregationID = firstOp.Rollup.AggregationID
toApply = pipeline.SubPipeline(1, numSteps)
default:
err = fmt.Errorf("target %v operation 0 has unknown type: %v", target, firstOp.Type)
multiErr = multiErr.Add(err)
continue
}
tagPairs = tagPairs[:0]
applied, err := as.applyIDToPipeline(sortedTagPairBytes, toApply, tagPairs)
if err != nil {
err = fmt.Errorf("failed to apply id %s to pipeline %v: %v", id, toApply, err)
multiErr = multiErr.Add(err)
continue
}
newPipeline := metadata.PipelineMetadata{
AggregationID: aggregationID,
StoragePolicies: target.StoragePolicies,
Pipeline: applied,
}
if rollupID == nil {
// The applied pipeline applies to the incoming ID.
pipelines = append(pipelines, newPipeline)
} else {
// The applied pipeline applies to a new rollup ID.
matchResults := ruleMatchResults{
cutoverNanos: cutoverNanos,
pipelines: []metadata.PipelineMetadata{newPipeline},
}
newRollupIDResult := idWithMatchResults{id: rollupID, matchResults: matchResults}
newRollupIDResults = append(newRollupIDResults, newRollupIDResult)
}
}
return rollupResults{
forExistingID: ruleMatchResults{cutoverNanos: cutoverNanos, pipelines: pipelines},
forNewRollupIDs: newRollupIDResults,
}, multiErr.FinalError()
}
// matchRollupTarget matches an incoming metric ID against a rollup target,
// returns the new rollup ID if the metric ID contains the full list of rollup
// tags, and nil otherwise.
func (as *activeRuleSet) matchRollupTarget(
sortedTagPairBytes []byte,
newName []byte,
rollupTags [][]byte,
tagPairs []metricID.TagPair, // buffer for reuse to generate rollup ID across calls
opts matchRollupTargetOptions,
) ([]byte, bool) {
var (
sortedTagIter = as.tagsFilterOpts.SortedTagIteratorFn(sortedTagPairBytes)
hasMoreTags = sortedTagIter.Next()
currTagIdx = 0
)
for hasMoreTags && currTagIdx < len(rollupTags) {
tagName, tagVal := sortedTagIter.Current()
res := bytes.Compare(tagName, rollupTags[currTagIdx])
if res == 0 {
if opts.generateRollupID {
tagPairs = append(tagPairs, metricID.TagPair{Name: tagName, Value: tagVal})
}
currTagIdx++
hasMoreTags = sortedTagIter.Next()
continue
}
// If one of the target tags is not found in the ID, this is considered
// a non-match so bail immediately.
if res > 0 {
break
}
hasMoreTags = sortedTagIter.Next()
}
sortedTagIter.Close()
// If not all the target tags are found, this is considered a no match.
if currTagIdx < len(rollupTags) {
return nil, false
}
if !opts.generateRollupID {
return nil, true
}
return as.newRollupIDFn(newName, tagPairs), true
}
func (as *activeRuleSet) applyIDToPipeline(
sortedTagPairBytes []byte,
pipeline mpipeline.Pipeline,
tagPairs []metricID.TagPair, // buffer for reuse across calls
) (applied.Pipeline, error) {
operations := make([]applied.OpUnion, 0, pipeline.Len())
for i := 0; i < pipeline.Len(); i++ {
pipelineOp := pipeline.At(i)
var opUnion applied.OpUnion
switch pipelineOp.Type {
case mpipeline.TransformationOpType:
opUnion = applied.OpUnion{
Type: mpipeline.TransformationOpType,
Transformation: pipelineOp.Transformation,
}
case mpipeline.RollupOpType:
rollupOp := pipelineOp.Rollup
var matched bool
rollupID, matched := as.matchRollupTarget(
sortedTagPairBytes,
rollupOp.NewName,
rollupOp.Tags,
tagPairs,
matchRollupTargetOptions{generateRollupID: true},
)
if !matched {
err := fmt.Errorf("existing tag pairs %s do not contain all rollup tags %s", sortedTagPairBytes, rollupOp.Tags)
return applied.Pipeline{}, err
}
opUnion = applied.OpUnion{
Type: mpipeline.RollupOpType,
Rollup: applied.RollupOp{ID: rollupID, AggregationID: rollupOp.AggregationID},
}
default:
return applied.Pipeline{}, fmt.Errorf("unexpected pipeline op type: %v", pipelineOp.Type)
}
operations = append(operations, opUnion)
}
return applied.NewPipeline(operations), nil
}
func (as *activeRuleSet) reverseMappingsFor(
id, name, tags []byte,
isRollupID bool,
timeNanos int64,
mt metric.Type,
at aggregation.Type,
isMultiAggregationTypesAllowed bool,
aggTypesOpts aggregation.TypesOptions,
) (metadata.StagedMetadata, bool) {
if !isRollupID {
return as.reverseMappingsForNonRollupID(id, timeNanos, mt, at, aggTypesOpts)
}
return as.reverseMappingsForRollupID(name, tags, timeNanos, mt, at, isMultiAggregationTypesAllowed, aggTypesOpts)
}
// reverseMappingsForNonRollupID returns the staged metadata for the given non-rollup ID at
// the given time, and true if a non-empty list of pipelines are found, and false otherwise.
func (as *activeRuleSet) reverseMappingsForNonRollupID(
id []byte,
timeNanos int64,
mt metric.Type,
at aggregation.Type,
aggTypesOpts aggregation.TypesOptions,
) (metadata.StagedMetadata, bool) {
mappingRes := as.mappingsForNonRollupID(id, timeNanos).forExistingID
// Always filter pipelines with aggregation types because for non rollup IDs, it is possible
// that none of the rules would match based on the aggregation types, in which case we fall
// back to the default staged metadata.
filteredPipelines := filteredPipelinesWithAggregationType(mappingRes.pipelines, mt, at, aggTypesOpts)
if len(filteredPipelines) == 0 {
return metadata.DefaultStagedMetadata, false
}
return metadata.StagedMetadata{
CutoverNanos: mappingRes.cutoverNanos,
Tombstoned: false,
Metadata: metadata.Metadata{Pipelines: filteredPipelines},
}, true
}
// NB(xichen): in order to determine the applicable policies for a rollup metric, we need to
// match the id against rollup rules to determine which rollup rules are applicable, under the
// assumption that no two rollup targets in the same namespace may have the same rollup metric
// name and the list of rollup tags. Otherwise, a rollup metric could potentially match more
// than one rollup rule with different policies even though only one of the matched rules was
// used to produce the given rollup metric id due to its tag filters, thereby causing the wrong
// staged policies to be returned. This also implies at any given time, at most one rollup target
// may match the given rollup id.
// Since we may have rollup pipelines with different aggregation types defined for a roll up rule,
// and each aggregation type would generate a new id. So when doing reverse mapping, not only do
// we need to match the roll up tags, we also need to check the aggregation type against
// each rollup pipeline to see if the aggregation type was actually contained in the pipeline.
func (as *activeRuleSet) reverseMappingsForRollupID(
name, sortedTagPairBytes []byte,
timeNanos int64,
mt metric.Type,
at aggregation.Type,
isMultiAggregationTypesAllowed bool,
aggTypesOpts aggregation.TypesOptions,
) (metadata.StagedMetadata, bool) {
for _, rollupRule := range as.rollupRules {
snapshot := rollupRule.activeSnapshot(timeNanos)
if snapshot == nil || snapshot.tombstoned {
continue
}
for _, target := range snapshot.targets {
for i := 0; i < target.Pipeline.Len(); i++ {
pipelineOp := target.Pipeline.At(i)
if pipelineOp.Type != mpipeline.RollupOpType {
continue
}
rollupOp := pipelineOp.Rollup
if !bytes.Equal(rollupOp.NewName, name) {
continue
}
if _, matched := as.matchRollupTarget(
sortedTagPairBytes,
rollupOp.NewName,
rollupOp.Tags,
nil,
matchRollupTargetOptions{generateRollupID: false},
); !matched {
continue
}
// NB: the list of pipeline steps is not important and thus not computed and returned.
pipeline := metadata.PipelineMetadata{
AggregationID: rollupOp.AggregationID,
StoragePolicies: target.StoragePolicies.Clone(),
}
// Only further filter the pipelines with aggregation types if the given metric type
// supports multiple aggregation types. This is because if a metric type only supports
// a single aggregation type, this is the only pipline that could possibly produce this
// rollup metric and as such is chosen. The aggregation type passed in is not used because
// it maybe not be accurate because it may not be possible to infer the actual aggregation
// type only from the metric ID.
filteredPipelines := []metadata.PipelineMetadata{pipeline}
if isMultiAggregationTypesAllowed {
filteredPipelines = filteredPipelinesWithAggregationType(filteredPipelines, mt, at, aggTypesOpts)
}
if len(filteredPipelines) == 0 {
return metadata.DefaultStagedMetadata, false
}
return metadata.StagedMetadata{
CutoverNanos: snapshot.cutoverNanos,
Tombstoned: false,
Metadata: metadata.Metadata{Pipelines: filteredPipelines},
}, true
}
}
}
return metadata.DefaultStagedMetadata, false
}
// nextCutoverIdx returns the next snapshot index whose cutover time is after t.
// NB(xichen): not using sort.Search to avoid a lambda capture.
func (as *activeRuleSet) nextCutoverIdx(t int64) int {
i, j := 0, len(as.cutoverTimesAsc)
for i < j {
h := i + (j-i)/2
if as.cutoverTimesAsc[h] <= t {
i = h + 1
} else {
j = h
}
}
return i
}
// cutoverNanosAt returns the cutover time at given index.
func (as *activeRuleSet) cutoverNanosAt(idx int) int64 {
if idx < len(as.cutoverTimesAsc) {
return as.cutoverTimesAsc[idx]
}
return timeNanosMax
}
// filterByAggregationType takes a list of pipelines as input and returns those
// containing the given aggregation type.
func filteredPipelinesWithAggregationType(
pipelines []metadata.PipelineMetadata,
mt metric.Type,
at aggregation.Type,
opts aggregation.TypesOptions,
) []metadata.PipelineMetadata {
var cur int
for i := 0; i < len(pipelines); i++ {
var containsAggType bool
if aggID := pipelines[i].AggregationID; aggID.IsDefault() {
containsAggType = opts.IsContainedInDefaultAggregationTypes(at, mt)
} else {
containsAggType = aggID.Contains(at)
}
if !containsAggType {
continue
}
if cur != i {
pipelines[cur] = pipelines[i]
}
cur++
}
return pipelines[:cur]
}
// mergeResultsForExistingID merges the next staged metadata into the current list of staged
// metadatas while ensuring the cutover times of the staged metadatas are non-decreasing. This
// is needed because the cutover times of staged metadata results produced by mapping rule matching
// may not always be in ascending order. For example, if at time T0 a metric matches against a
// mapping rule, and the filter of such rule changed at T1 such that the metric no longer matches
// the rule, this would indicate the staged metadata at T0 would have a cutover time of T0,
// whereas the staged metadata at T1 would have a cutover time of 0 (due to no rule match),
// in which case we need to set the cutover time of the staged metadata at T1 to T1 to ensure
// the mononicity of cutover times.
func mergeResultsForExistingID(
currMetadatas metadata.StagedMetadatas,
nextMetadata metadata.StagedMetadata,
nextCutoverNanos int64,
) metadata.StagedMetadatas {
if len(currMetadatas) == 0 {
return metadata.StagedMetadatas{nextMetadata}
}
currCutoverNanos := currMetadatas[len(currMetadatas)-1].CutoverNanos
if currCutoverNanos > nextMetadata.CutoverNanos {
nextMetadata.CutoverNanos = nextCutoverNanos
}
currMetadatas = append(currMetadatas, nextMetadata)
return currMetadatas
}
// mergeResultsForNewRollupIDs merges the current list of staged metadatas for new rollup IDs
// with the list of staged metadatas for new rollup IDs at the next rule cutover time, assuming
// that both the current metadatas list and the next metadatas list are sorted by rollup IDs
// in ascending order.
// NB: each item in the `nextResults` array has a single staged metadata in the `metadatas` array
// as the staged metadata for the associated rollup ID at the next cutover time.
func mergeResultsForNewRollupIDs(
currResults []IDWithMetadatas,
nextResults []IDWithMetadatas,
nextCutoverNanos int64,
) []IDWithMetadatas {
var (
currLen, nextLen = len(currResults), len(nextResults)
currIdx, nextIdx int
)
for currIdx < currLen || nextIdx < nextLen {
var compareResult int
if currIdx >= currLen {
compareResult = 1
} else if nextIdx >= nextLen {
compareResult = -1
} else {
compareResult = bytes.Compare(currResults[currIdx].ID, nextResults[nextIdx].ID)
}
// If the current result and the next result have the same ID, we append the next metadata
// to the end of the metadata list.
if compareResult == 0 {
currResults[currIdx].Metadatas = append(currResults[currIdx].Metadatas, nextResults[nextIdx].Metadatas[0])
currIdx++
nextIdx++
continue
}
// If the current ID is smaller, it means the current rollup ID is tombstoned at the next
// cutover time.
if compareResult < 0 {
tombstonedMetadata := metadata.StagedMetadata{CutoverNanos: nextCutoverNanos, Tombstoned: true}
currResults[currIdx].Metadatas = append(currResults[currIdx].Metadatas, tombstonedMetadata)
currIdx++
continue
}
// Otherwise the current ID is larger, meaning a new ID is added at the next cutover time.
currResults = append(currResults, nextResults[nextIdx])
nextIdx++
}
sort.Sort(IDWithMetadatasByIDAsc(currResults))
return currResults
}
type int64Asc []int64
func (a int64Asc) Len() int { return len(a) }
func (a int64Asc) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a int64Asc) Less(i, j int) bool { return a[i] < a[j] }
type matchRollupTargetOptions struct {
generateRollupID bool
}
type ruleMatchResults struct {
cutoverNanos int64
pipelines []metadata.PipelineMetadata
}
// merge merges in another rule match results in place.
func (res *ruleMatchResults) merge(other ruleMatchResults) *ruleMatchResults {
if res.cutoverNanos < other.cutoverNanos {
res.cutoverNanos = other.cutoverNanos
}
res.pipelines = append(res.pipelines, other.pipelines...)
return res
}
// unique de-duplicates the pipelines.
func (res *ruleMatchResults) unique() *ruleMatchResults {
if len(res.pipelines) == 0 {
return res
}
// First resolve if drop policies are in effect
var (
evaluate = metadata.PipelineMetadatas(res.pipelines)
dropApplyResult metadata.ApplyOrRemoveDropPoliciesResult
)
res.pipelines, dropApplyResult = evaluate.ApplyOrRemoveDropPolicies()
if dropApplyResult == metadata.AppliedEffectiveDropPolicyResult {
return res
}
// Otherwise merge as per usual
curr := 0
for i := 1; i < len(res.pipelines); i++ {
foundDup := false
for j := 0; j <= curr; j++ {
if res.pipelines[j].Equal(res.pipelines[i]) {
foundDup = true
break
}
}
if foundDup {
continue
}
curr++
res.pipelines[curr] = res.pipelines[i]
}
for i := curr + 1; i < len(res.pipelines); i++ {
res.pipelines[i] = metadata.PipelineMetadata{}
}
res.pipelines = res.pipelines[:curr+1]
return res
}
// toStagedMetadata converts the match results to a staged metadata.
func (res *ruleMatchResults) toStagedMetadata() metadata.StagedMetadata {
return metadata.StagedMetadata{
CutoverNanos: res.cutoverNanos,
Tombstoned: false,
Metadata: metadata.Metadata{Pipelines: res.resolvedPipelines()},
}
}
func (res *ruleMatchResults) resolvedPipelines() []metadata.PipelineMetadata {
if len(res.pipelines) > 0 {
return res.pipelines
}
return metadata.DefaultPipelineMetadatas
}
type idWithMatchResults struct {
id []byte
matchResults ruleMatchResults
}
type mappingResults struct {
// This represent the match result that should be applied against the
// incoming metric ID the mapping rules were matched against.
forExistingID ruleMatchResults
}
type rollupResults struct {
// This represent the match result that should be applied against the
// incoming metric ID the rollup rules were matched against. This usually contains
// the match result produced by rollup rules containing rollup pipelines whose first
// pipeline operation is not a rollup operation.
forExistingID ruleMatchResults
// This represents the match result that should be applied against new rollup
// IDs generated during the rule matching process. This usually contains
// the match result produced by rollup rules containing rollup pipelines whose first
// pipeline operation is a rollup operation.
forNewRollupIDs []idWithMatchResults
}
type forwardMatchResult struct {
forExistingID metadata.StagedMetadata
forNewRollupIDs []IDWithMetadatas
}