From 38323d3a02cab6c600f7bbf5e07159363838f8c8 Mon Sep 17 00:00:00 2001 From: Xi Chen Date: Tue, 2 May 2017 23:26:11 -0400 Subject: [PATCH] Provide APIs to match multiple policies within a time range and send multiple policies across network --- metric/aggregated/types.go | 14 +- metric/unaggregated/types.go | 18 +- policy/policy.go | 96 +++--- policy/policy_benchmark_test.go | 74 ++--- policy/policy_test.go | 38 +-- protocol/msgpack/aggregated_encoder.go | 4 +- protocol/msgpack/aggregated_encoder_test.go | 2 +- protocol/msgpack/aggregated_roundtrip_test.go | 22 +- protocol/msgpack/base_encoder.go | 24 +- protocol/msgpack/base_iterator.go | 33 +- protocol/msgpack/raw_metric.go | 39 ++- protocol/msgpack/raw_metric_test.go | 45 ++- protocol/msgpack/schema.go | 55 +-- protocol/msgpack/types.go | 37 +-- protocol/msgpack/unaggregated_encoder.go | 100 +++--- protocol/msgpack/unaggregated_encoder_test.go | 97 +++--- protocol/msgpack/unaggregated_iterator.go | 102 ++++-- .../msgpack/unaggregated_iterator_test.go | 235 +++++++------ .../msgpack/unaggregated_roundtrip_test.go | 237 ++++++------- protocol/msgpack/wire_format.md | 48 +-- rules/result.go | 59 ++-- rules/result_test.go | 60 ++-- rules/ruleset.go | 162 ++++++--- rules/ruleset_test.go | 314 +++++++++++------- 24 files changed, 1076 insertions(+), 839 deletions(-) diff --git a/metric/aggregated/types.go b/metric/aggregated/types.go index 2d5d7e6..62f67a6 100644 --- a/metric/aggregated/types.go +++ b/metric/aggregated/types.go @@ -31,8 +31,8 @@ import ( // Metric is a metric, which is essentially a named value at certain time. type Metric struct { metric.ID - Timestamp time.Time - Value float64 + TimeNs int64 + Value float64 } // String is the string representation of a metric. @@ -40,7 +40,7 @@ func (m Metric) String() string { return fmt.Sprintf( "{id:%s,timestamp:%s,value:%f}", m.ID.String(), - m.Timestamp.String(), + time.Unix(0, m.TimeNs).String(), m.Value, ) } @@ -48,8 +48,8 @@ func (m Metric) String() string { // ChunkedMetric is a metric with a chunked ID. type ChunkedMetric struct { metric.ChunkedID - Timestamp time.Time - Value float64 + TimeNs int64 + Value float64 } // RawMetric is a metric in its raw form (e.g., encoded bytes associated with @@ -58,8 +58,8 @@ type RawMetric interface { // ID is the metric identifier. ID() (metric.ID, error) - // Timestamp is the metric timestamp. - Timestamp() (time.Time, error) + // TimeNs is the metric timestamp in nanoseconds. + TimeNs() (int64, error) // Value is the metric value. Value() (float64, error) diff --git a/metric/unaggregated/types.go b/metric/unaggregated/types.go index 462800e..8e7c822 100644 --- a/metric/unaggregated/types.go +++ b/metric/unaggregated/types.go @@ -70,22 +70,22 @@ type Gauge struct { Value float64 } -// CounterWithPolicies is a counter with applicable policies. -type CounterWithPolicies struct { +// CounterWithPoliciesList is a counter with applicable policies list. +type CounterWithPoliciesList struct { Counter - policy.VersionedPolicies + policy.PoliciesList } -// BatchTimerWithPolicies is a batch timer with applicable policies. -type BatchTimerWithPolicies struct { +// BatchTimerWithPoliciesList is a batch timer with applicable policies list. +type BatchTimerWithPoliciesList struct { BatchTimer - policy.VersionedPolicies + policy.PoliciesList } -// GaugeWithPolicies is a gauge with applicable policies. -type GaugeWithPolicies struct { +// GaugeWithPoliciesList is a gauge with applicable policies list. +type GaugeWithPoliciesList struct { Gauge - policy.VersionedPolicies + policy.PoliciesList } // MetricUnion is a union of different types of metrics, only one of which is valid diff --git a/policy/policy.go b/policy/policy.go index 4e1d09b..d7a4c47 100644 --- a/policy/policy.go +++ b/policy/policy.go @@ -42,10 +42,11 @@ var ( // EmptyPolicy represents an empty policy. EmptyPolicy Policy - // EmptyVersionedPolicies represents an empty VersionPolicies. - EmptyVersionedPolicies VersionedPolicies + // EmptyStagedPolicies represents an empty staged policies. + EmptyStagedPolicies StagedPolicies - errNilPolicySchema = errors.New("nil policy schema") + // DefaultPoliciesList represents a default policies list. + DefaultPoliciesList = PoliciesList{EmptyStagedPolicies} // defaultPolicies are the default policies. // TODO(xichen): possibly make this dynamically configurable in the future. @@ -53,6 +54,8 @@ var ( NewPolicy(10*time.Second, xtime.Second, 2*24*time.Hour), NewPolicy(time.Minute, xtime.Minute, 30*24*time.Hour), } + + errNilPolicySchema = errors.New("nil policy schema") ) // Policy represents the resolution and retention period metric datapoints @@ -148,39 +151,60 @@ func (pr ByResolutionAsc) Less(i, j int) bool { return pr[i].Resolution().Precision < pr[i].Resolution().Precision } -// VersionedPolicies represent a list of policies at a specified version. -type VersionedPolicies struct { - // Version is the version of the policies. - Version int - +// StagedPolicies represent a list of policies at a specified version. +type StagedPolicies struct { // Cutover is when the policies take effect. - Cutover time.Time + CutoverNs int64 - // isDefault determines whether the policies are the default policies. - isDefault bool + // Tombstoned determines whether the associated (rollup) metric has been tombstoned. + Tombstoned bool // policies represent the list of policies. policies []Policy } -// IsDefault determines whether the policies are the default policies. -func (vp VersionedPolicies) IsDefault() bool { return vp.isDefault } +// NewStagedPolicies create a new staged policies. +func NewStagedPolicies(cutoverNs int64, tombstoned bool, policies []Policy) StagedPolicies { + return StagedPolicies{CutoverNs: cutoverNs, Tombstoned: tombstoned, policies: policies} +} + +// Reset resets the staged policies. +func (p *StagedPolicies) Reset() { *p = EmptyStagedPolicies } // Policies returns the policies. -func (vp VersionedPolicies) Policies() []Policy { - if vp.isDefault { +func (p StagedPolicies) Policies() []Policy { + if p.hasDefaultPolicies() { return defaultPolicies } - return vp.policies + return p.policies } -// String is the representation of versioned policies. -func (vp VersionedPolicies) String() string { +// SamePolicies returns whether two staged policies have the same policy list, +// assuming the policies are sorted in the same order. +func (p StagedPolicies) SamePolicies(other StagedPolicies) bool { + if p.hasDefaultPolicies() && other.hasDefaultPolicies() { + return true + } + currPolicies := p.Policies() + otherPolicies := other.Policies() + if len(currPolicies) != len(otherPolicies) { + return false + } + for i := 0; i < len(currPolicies); i++ { + if currPolicies[i] != otherPolicies[i] { + return false + } + } + return true +} + +// String is the representation of staged policies. +func (p StagedPolicies) String() string { var buf bytes.Buffer - buf.WriteString(fmt.Sprintf("{version:%d,cutover:%s,isDefault:%v,policies:[", vp.Version, vp.Cutover.String(), vp.isDefault)) - for i := range vp.policies { - buf.WriteString(vp.policies[i].String()) - if i < len(vp.policies)-1 { + buf.WriteString(fmt.Sprintf("{cutover:%s,tombstoned:%v,policies:[", time.Unix(0, p.CutoverNs).String(), p.Tombstoned)) + for i := range p.policies { + buf.WriteString(p.policies[i].String()) + if i < len(p.policies)-1 { buf.WriteString(",") } } @@ -188,26 +212,18 @@ func (vp VersionedPolicies) String() string { return buf.String() } -// Reset resets the versioned policies. -func (vp *VersionedPolicies) Reset() { - *vp = EmptyVersionedPolicies +func (p StagedPolicies) isEmpty() bool { + return p.CutoverNs == 0 && !p.Tombstoned && p.hasDefaultPolicies() } -// DefaultVersionedPolicies creates a new default versioned policies. -func DefaultVersionedPolicies(version int, cutover time.Time) VersionedPolicies { - return VersionedPolicies{ - Version: version, - Cutover: cutover, - isDefault: true, - } +func (p StagedPolicies) hasDefaultPolicies() bool { + return len(p.policies) == 0 } -// CustomVersionedPolicies creates a new custom versioned policies. -func CustomVersionedPolicies(version int, cutover time.Time, policies []Policy) VersionedPolicies { - return VersionedPolicies{ - Version: version, - Cutover: cutover, - isDefault: false, - policies: policies, - } +// PoliciesList is a list of staged policies. +type PoliciesList []StagedPolicies + +// IsDefault determines whether this is a default policies list. +func (l PoliciesList) IsDefault() bool { + return len(l) == 1 && l[0].isEmpty() } diff --git a/policy/policy_benchmark_test.go b/policy/policy_benchmark_test.go index 50879e1..1f6bf02 100644 --- a/policy/policy_benchmark_test.go +++ b/policy/policy_benchmark_test.go @@ -25,81 +25,77 @@ import ( "time" ) -func BenchmarkVersionedPoliciesAsStruct(b *testing.B) { - vp := CustomVersionedPolicies(InitPolicyVersion, time.Now(), defaultPolicies) +var ( + testNowNs = time.Now().UnixNano() +) + +func BenchmarkStagedPoliciesAsStruct(b *testing.B) { + sp := NewStagedPolicies(testNowNs, false, defaultPolicies) for n := 0; n < b.N; n++ { - validatePolicyByValue(b, vp) + validatePolicyByValue(b, sp) } } -func BenchmarkVersionedPoliciesAsPointer(b *testing.B) { - vp := CustomVersionedPolicies(InitPolicyVersion, time.Now(), defaultPolicies) +func BenchmarkStagedPoliciesAsPointer(b *testing.B) { + sp := NewStagedPolicies(testNowNs, false, defaultPolicies) for n := 0; n < b.N; n++ { - validatePolicyByPointer(b, &vp) + validatePolicyByPointer(b, &sp) } } -func BenchmarkVersionedPoliciesAsInterface(b *testing.B) { - vp := &testVersionedPolicies{version: InitPolicyVersion, cutover: time.Now(), policies: defaultPolicies} +func BenchmarkStagedPoliciesAsInterface(b *testing.B) { + sp := &testStagedPolicies{cutoverNs: testNowNs, policies: defaultPolicies} for n := 0; n < b.N; n++ { - validatePolicyByInterface(b, vp) + validatePolicyByInterface(b, sp) } } -func BenchmarkVersionedPoliciesAsStructExported(b *testing.B) { - vp := testVersionedPolicies{version: InitPolicyVersion, cutover: time.Now(), policies: defaultPolicies} +func BenchmarkStagedPoliciesAsStructExported(b *testing.B) { + sp := testStagedPolicies{cutoverNs: testNowNs, policies: defaultPolicies} for n := 0; n < b.N; n++ { - validatePolicyByStructExported(b, vp) + validatePolicyByStructExported(b, sp) } } -type testVersionedPoliciesInt interface { - Version() int +type testStagedPoliciesInt64 interface { + CutoverNs() int64 } -// VersionedPolicies represent a list of policies at a specified version. -type testVersionedPolicies struct { - // Version is the version of the policies. - version int - - // Cutover is when the policies take effect. - cutover time.Time - - // isDefault determines whether the policies are the default policies. - isDefault bool - - // policies represent the list of policies. - policies []Policy +// StagedPolicies represent a list of policies at a specified version. +type testStagedPolicies struct { + cutoverNs int64 + tombstoned bool + policies []Policy } -func (v testVersionedPolicies) ValVersion() int { - return v.version +func (v testStagedPolicies) ValCutoverNs() int64 { + return v.cutoverNs } -func (v *testVersionedPolicies) Version() int { - return v.version +func (v *testStagedPolicies) CutoverNs() int64 { + return v.cutoverNs } -func validatePolicyByValue(b *testing.B, vps VersionedPolicies) { - if vps.Version != InitPolicyVersion { +func validatePolicyByValue(b *testing.B, sp StagedPolicies) { + if sp.CutoverNs != testNowNs { b.FailNow() } } -func validatePolicyByPointer(b *testing.B, vps *VersionedPolicies) { - if vps.Version != InitPolicyVersion { +func validatePolicyByPointer(b *testing.B, sp *StagedPolicies) { + if sp.CutoverNs != testNowNs { b.FailNow() } } -func validatePolicyByInterface(b *testing.B, vps testVersionedPoliciesInt) { - if vps.Version() != InitPolicyVersion { +func validatePolicyByInterface(b *testing.B, sp testStagedPoliciesInt64) { + if sp.CutoverNs() != testNowNs { b.FailNow() } } -func validatePolicyByStructExported(b *testing.B, vps testVersionedPolicies) { - if vps.ValVersion() != InitPolicyVersion { +func validatePolicyByStructExported(b *testing.B, sp testStagedPolicies) { + if sp.ValCutoverNs() != testNowNs { b.FailNow() } } diff --git a/policy/policy_test.go b/policy/policy_test.go index 6d911d6..17c73fa 100644 --- a/policy/policy_test.go +++ b/policy/policy_test.go @@ -45,30 +45,20 @@ func TestPoliciesByResolutionAsc(t *testing.T) { require.Equal(t, expected, inputs) } -func TestDefaultVersionedPolicies(t *testing.T) { - var ( - version = 2 - cutover = time.Now() - ) - vp := DefaultVersionedPolicies(version, cutover) - require.Equal(t, version, vp.Version) - require.Equal(t, cutover, vp.Cutover) - require.True(t, vp.IsDefault()) - require.Equal(t, defaultPolicies, vp.Policies()) +func TestStagedPoliciesHasDefaultPolicies(t *testing.T) { + sp := NewStagedPolicies(testNowNs, true, nil) + require.Equal(t, testNowNs, sp.CutoverNs) + require.True(t, sp.hasDefaultPolicies()) + require.Equal(t, defaultPolicies, sp.Policies()) } -func TestCustomVersionedPolicies(t *testing.T) { - var ( - version = 2 - cutover = time.Now() - policies = []Policy{ - NewPolicy(10*time.Second, xtime.Second, 6*time.Hour), - NewPolicy(10*time.Second, xtime.Second, 2*time.Hour), - } - ) - vp := CustomVersionedPolicies(version, cutover, policies) - require.Equal(t, version, vp.Version) - require.Equal(t, cutover, vp.Cutover) - require.False(t, vp.IsDefault()) - require.Equal(t, policies, vp.Policies()) +func TestStagedPoliciesHasCustomPolicies(t *testing.T) { + policies := []Policy{ + NewPolicy(10*time.Second, xtime.Second, 6*time.Hour), + NewPolicy(10*time.Second, xtime.Second, 2*time.Hour), + } + sp := NewStagedPolicies(testNowNs, false, policies) + require.Equal(t, testNowNs, sp.CutoverNs) + require.False(t, sp.hasDefaultPolicies()) + require.Equal(t, policies, sp.Policies()) } diff --git a/protocol/msgpack/aggregated_encoder.go b/protocol/msgpack/aggregated_encoder.go index 63e57b4..d6500e3 100644 --- a/protocol/msgpack/aggregated_encoder.go +++ b/protocol/msgpack/aggregated_encoder.go @@ -103,7 +103,7 @@ func (enc *aggregatedEncoder) encodeMetricAsRaw(m aggregated.Metric) []byte { enc.buf.resetData() enc.encodeMetricProlog() enc.buf.encodeID(m.ID) - enc.buf.encodeTime(m.Timestamp) + enc.buf.encodeVarint(m.TimeNs) enc.buf.encodeFloat64(m.Value) return enc.buf.encoder().Bytes() } @@ -112,7 +112,7 @@ func (enc *aggregatedEncoder) encodeChunkedMetricAsRaw(m aggregated.ChunkedMetri enc.buf.resetData() enc.encodeMetricProlog() enc.buf.encodeChunkedID(m.ChunkedID) - enc.buf.encodeTime(m.Timestamp) + enc.buf.encodeVarint(m.TimeNs) enc.buf.encodeFloat64(m.Value) return enc.buf.encoder().Bytes() } diff --git a/protocol/msgpack/aggregated_encoder_test.go b/protocol/msgpack/aggregated_encoder_test.go index 25fb56e..96b4af1 100644 --- a/protocol/msgpack/aggregated_encoder_test.go +++ b/protocol/msgpack/aggregated_encoder_test.go @@ -73,7 +73,7 @@ func TestAggregatedEncodeMetric(t *testing.T) { int64(metricVersion), int(numFieldsForType(metricType)), []byte(testMetric.ID), - testMetric.Timestamp, + testMetric.TimeNs, testMetric.Value, } require.Equal(t, expected, *result) diff --git a/protocol/msgpack/aggregated_roundtrip_test.go b/protocol/msgpack/aggregated_roundtrip_test.go index e37afd7..b3bef59 100644 --- a/protocol/msgpack/aggregated_roundtrip_test.go +++ b/protocol/msgpack/aggregated_roundtrip_test.go @@ -37,9 +37,9 @@ import ( var ( testMetric = aggregated.Metric{ - ID: metric.ID("foo"), - Timestamp: time.Now(), - Value: 123.45, + ID: metric.ID("foo"), + TimeNs: time.Now().UnixNano(), + Value: 123.45, } testChunkedMetric = aggregated.ChunkedMetric{ ChunkedID: metric.ChunkedID{ @@ -47,13 +47,13 @@ var ( Data: []byte("bar"), Suffix: []byte(".baz"), }, - Timestamp: time.Now(), - Value: 123.45, + TimeNs: time.Now().UnixNano(), + Value: 123.45, } testMetric2 = aggregated.Metric{ - ID: metric.ID("bar"), - Timestamp: time.Now(), - Value: 678.90, + ID: metric.ID("bar"), + TimeNs: time.Now().UnixNano(), + Value: 678.90, } testPolicy = policy.NewPolicy(time.Second, xtime.Second, time.Hour) ) @@ -142,9 +142,9 @@ func validateAggregatedRoundtripWithEncoderAndIterator( id = append(id, inputMetric.ChunkedID.Suffix...) expected = append(expected, metricWithPolicy{ metric: aggregated.Metric{ - ID: id, - Timestamp: inputMetric.Timestamp, - Value: inputMetric.Value, + ID: id, + TimeNs: inputMetric.TimeNs, + Value: inputMetric.Value, }, policy: input.policy, }) diff --git a/protocol/msgpack/base_encoder.go b/protocol/msgpack/base_encoder.go index 036fbe8..dce5008 100644 --- a/protocol/msgpack/base_encoder.go +++ b/protocol/msgpack/base_encoder.go @@ -21,15 +21,13 @@ package msgpack import ( - "time" - "github.com/m3db/m3metrics/metric" "github.com/m3db/m3metrics/policy" ) type encodePolicyFn func(p policy.Policy) -type encodeTimeFn func(t time.Time) type encodeVarintFn func(value int64) +type encodeBoolFn func(value bool) type encodeFloat64Fn func(value float64) type encodeBytesFn func(value []byte) type encodeBytesLenFn func(value int) @@ -40,8 +38,8 @@ type baseEncoder struct { bufEncoder BufferedEncoder encodeErr error encodePolicyFn encodePolicyFn - encodeTimeFn encodeTimeFn encodeVarintFn encodeVarintFn + encodeBoolFn encodeBoolFn encodeFloat64Fn encodeFloat64Fn encodeBytesFn encodeBytesFn encodeBytesLenFn encodeBytesLenFn @@ -52,8 +50,8 @@ func newBaseEncoder(encoder BufferedEncoder) encoderBase { enc := &baseEncoder{bufEncoder: encoder} enc.encodePolicyFn = enc.encodePolicyInternal - enc.encodeTimeFn = enc.encodeTimeInternal enc.encodeVarintFn = enc.encodeVarintInternal + enc.encodeBoolFn = enc.encodeBoolInternal enc.encodeFloat64Fn = enc.encodeFloat64Internal enc.encodeBytesFn = enc.encodeBytesInternal enc.encodeBytesLenFn = enc.encodeBytesLenInternal @@ -70,8 +68,8 @@ func (enc *baseEncoder) encodeVersion(version int) { enc.encodeVarint( func (enc *baseEncoder) encodeObjectType(objType objectType) { enc.encodeVarint(int64(objType)) } func (enc *baseEncoder) encodeNumObjectFields(numFields int) { enc.encodeArrayLen(numFields) } func (enc *baseEncoder) encodeID(id metric.ID) { enc.encodeBytes([]byte(id)) } -func (enc *baseEncoder) encodeTime(t time.Time) { enc.encodeTimeFn(t) } func (enc *baseEncoder) encodeVarint(value int64) { enc.encodeVarintFn(value) } +func (enc *baseEncoder) encodeBool(value bool) { enc.encodeBoolFn(value) } func (enc *baseEncoder) encodeFloat64(value float64) { enc.encodeFloat64Fn(value) } func (enc *baseEncoder) encodeBytes(value []byte) { enc.encodeBytesFn(value) } func (enc *baseEncoder) encodeBytesLen(value int) { enc.encodeBytesLenFn(value) } @@ -132,13 +130,6 @@ func (enc *baseEncoder) encodeRetention(retention policy.Retention) { enc.encodeVarintFn(int64(retention)) } -func (enc *baseEncoder) encodeTimeInternal(value time.Time) { - if enc.encodeErr != nil { - return - } - enc.encodeErr = enc.bufEncoder.EncodeTime(value) -} - // NB(xichen): the underlying msgpack encoder implementation // always cast an integer value to an int64 and encodes integer // values as varints, regardless of the actual integer type. @@ -149,6 +140,13 @@ func (enc *baseEncoder) encodeVarintInternal(value int64) { enc.encodeErr = enc.bufEncoder.EncodeInt64(value) } +func (enc *baseEncoder) encodeBoolInternal(value bool) { + if enc.encodeErr != nil { + return + } + enc.encodeErr = enc.bufEncoder.EncodeBool(value) +} + func (enc *baseEncoder) encodeFloat64Internal(value float64) { if enc.encodeErr != nil { return diff --git a/protocol/msgpack/base_iterator.go b/protocol/msgpack/base_iterator.go index d470f9c..d497ded 100644 --- a/protocol/msgpack/base_iterator.go +++ b/protocol/msgpack/base_iterator.go @@ -85,7 +85,7 @@ func (it *baseIterator) decodePolicy() policy.Policy { func (it *baseIterator) decodeResolution() policy.Resolution { numActualFields := it.decodeNumObjectFields() resolutionType := it.decodeObjectType() - numExpectedFields, numActualFields, ok := it.checkNumFieldsForTypeWithActual( + numExpectedFields, ok := it.checkNumFieldsForTypeWithActual( resolutionType, numActualFields, ) @@ -127,7 +127,7 @@ func (it *baseIterator) decodeResolution() policy.Resolution { func (it *baseIterator) decodeRetention() policy.Retention { numActualFields := it.decodeNumObjectFields() retentionType := it.decodeObjectType() - numExpectedFields, numActualFields, ok := it.checkNumFieldsForTypeWithActual( + numExpectedFields, ok := it.checkNumFieldsForTypeWithActual( retentionType, numActualFields, ) @@ -174,15 +174,6 @@ func (it *baseIterator) decodeID() metric.ID { return metric.ID(it.decodeBytes()) } -func (it *baseIterator) decodeTime() time.Time { - if it.decodeErr != nil { - return time.Time{} - } - value, err := it.decoder.DecodeTime() - it.decodeErr = err - return value -} - // NB(xichen): the underlying msgpack decoder implementation // always decodes an int64 and looks at the actual decoded // value to determine the width of the integer (a.k.a. varint @@ -196,6 +187,15 @@ func (it *baseIterator) decodeVarint() int64 { return value } +func (it *baseIterator) decodeBool() bool { + if it.decodeErr != nil { + return false + } + value, err := it.decoder.DecodeBool() + it.decodeErr = err + return value +} + func (it *baseIterator) decodeFloat64() float64 { if it.decodeErr != nil { return 0.0 @@ -251,22 +251,23 @@ func (it *baseIterator) skip(numFields int) { func (it *baseIterator) checkNumFieldsForType(objType objectType) (int, int, bool) { numActualFields := it.decodeNumObjectFields() - return it.checkNumFieldsForTypeWithActual(objType, numActualFields) + numExpectedFields, ok := it.checkNumFieldsForTypeWithActual(objType, numActualFields) + return numExpectedFields, numActualFields, ok } func (it *baseIterator) checkNumFieldsForTypeWithActual( objType objectType, numActualFields int, -) (int, int, bool) { +) (int, bool) { if it.decodeErr != nil { - return 0, 0, false + return 0, false } numExpectedFields := numFieldsForType(objType) if numExpectedFields > numActualFields { it.decodeErr = fmt.Errorf("number of fields mismatch: expected %d actual %d", numExpectedFields, numActualFields) - return 0, 0, false + return 0, false } - return numExpectedFields, numActualFields, true + return numExpectedFields, true } // bufReader is a buffered reader. diff --git a/protocol/msgpack/raw_metric.go b/protocol/msgpack/raw_metric.go index f53faf4..1456f9f 100644 --- a/protocol/msgpack/raw_metric.go +++ b/protocol/msgpack/raw_metric.go @@ -24,7 +24,6 @@ import ( "bytes" "fmt" "io" - "time" "github.com/m3db/m3metrics/metric" "github.com/m3db/m3metrics/metric/aggregated" @@ -38,13 +37,13 @@ type readBytesFn func(start int, n int) []byte // rawMetric is a raw metric. type rawMetric struct { - data []byte // raw data containing encoded metric. - it iteratorBase // base iterator for lazily decoding metric fields. - metric aggregated.Metric // current metric. - idDecoded bool // whether id has been decoded. - timestampDecoded bool // whether timestamp has been decoded. - valueDecoded bool // whether value has been decoded. - readBytesFn readBytesFn // reading bytes function. + data []byte // raw data containing encoded metric. + it iteratorBase // base iterator for lazily decoding metric fields. + metric aggregated.Metric // current metric. + idDecoded bool // whether id has been decoded. + timeDecoded bool // whether time has been decoded. + valueDecoded bool // whether value has been decoded. + readBytesFn readBytesFn // reading bytes function. } // NewRawMetric creates a new raw metric. @@ -66,18 +65,18 @@ func (m *rawMetric) ID() (metric.ID, error) { return m.metric.ID, nil } -func (m *rawMetric) Timestamp() (time.Time, error) { +func (m *rawMetric) TimeNs() (int64, error) { m.decodeID() - m.decodeTimestamp() + m.decodeTime() if err := m.it.err(); err != nil { - return time.Time{}, err + return 0, err } - return m.metric.Timestamp, nil + return m.metric.TimeNs, nil } func (m *rawMetric) Value() (float64, error) { m.decodeID() - m.decodeTimestamp() + m.decodeTime() m.decodeValue() if err := m.it.err(); err != nil { return 0.0, err @@ -87,7 +86,7 @@ func (m *rawMetric) Value() (float64, error) { func (m *rawMetric) Metric() (aggregated.Metric, error) { m.decodeID() - m.decodeTimestamp() + m.decodeTime() m.decodeValue() if err := m.it.err(); err != nil { return emptyMetric, err @@ -102,7 +101,7 @@ func (m *rawMetric) Bytes() []byte { func (m *rawMetric) Reset(data []byte) { m.metric = emptyMetric m.idDecoded = false - m.timestampDecoded = false + m.timeDecoded = false m.valueDecoded = false m.data = data m.reader().Reset(data) @@ -148,16 +147,16 @@ func (m *rawMetric) decodeID() { m.idDecoded = true } -func (m *rawMetric) decodeTimestamp() { - if m.it.err() != nil || m.timestampDecoded { +func (m *rawMetric) decodeTime() { + if m.it.err() != nil || m.timeDecoded { return } - t := m.it.decodeTime() + timeNs := m.it.decodeVarint() if m.it.err() != nil { return } - m.metric.Timestamp = t - m.timestampDecoded = true + m.metric.TimeNs = timeNs + m.timeDecoded = true } func (m *rawMetric) decodeValue() { diff --git a/protocol/msgpack/raw_metric_test.go b/protocol/msgpack/raw_metric_test.go index 134dd45..587859a 100644 --- a/protocol/msgpack/raw_metric_test.go +++ b/protocol/msgpack/raw_metric_test.go @@ -25,7 +25,6 @@ import ( "errors" "io" "testing" - "time" "github.com/m3db/m3metrics/metric" "github.com/m3db/m3metrics/metric/aggregated" @@ -95,30 +94,30 @@ func TestRawMetricDecodeIDSuccess(t *testing.T) { func TestRawMetricDecodeTimestampExistingError(t *testing.T) { m := testRawMetric() m.it.setErr(errTestDecodeRawMetric) - _, err := m.Timestamp() + _, err := m.TimeNs() require.Equal(t, errTestDecodeRawMetric, err) } func TestRawMetricDecodeTimestampDecodeError(t *testing.T) { m := testRawMetric() - m.it.(*mockBaseIterator).decodeTimeFn = func() time.Time { + m.it.(*mockBaseIterator).decodeVarintFn = func() int64 { m.it.setErr(errTestDecodeRawMetric) - return time.Time{} + return 0 } - _, err := m.Timestamp() + _, err := m.TimeNs() require.Equal(t, errTestDecodeRawMetric, err) } func TestRawMetricDecodeTimestampSuccess(t *testing.T) { m := testRawMetric() - timestamp, err := m.Timestamp() + timeNs, err := m.TimeNs() require.NoError(t, err) - require.Equal(t, testMetric.Timestamp, timestamp) - require.True(t, m.timestampDecoded) + require.Equal(t, testMetric.TimeNs, timeNs) + require.True(t, m.timeDecoded) // Get timestamp again to make sure we don't re-decode the timestamp. require.NoError(t, err) - require.Equal(t, testMetric.Timestamp, timestamp) + require.Equal(t, testMetric.TimeNs, timeNs) } func TestRawMetricDecodeValueExistingError(t *testing.T) { @@ -163,7 +162,7 @@ func TestRawMetricDecodeMetricSuccess(t *testing.T) { require.NoError(t, err) require.Equal(t, testMetric, metric) require.True(t, m.idDecoded) - require.True(t, m.timestampDecoded) + require.True(t, m.timeDecoded) require.True(t, m.valueDecoded) // Get metric again to make sure we don't re-decode the metric. @@ -187,9 +186,9 @@ func TestRawMetricNilID(t *testing.T) { func TestRawMetricReset(t *testing.T) { metrics := []aggregated.Metric{ - {ID: metric.ID("foo"), Timestamp: testMetric.Timestamp, Value: 1.0}, - {ID: metric.ID("bar"), Timestamp: testMetric.Timestamp, Value: 2.3}, - {ID: metric.ID("baz"), Timestamp: testMetric.Timestamp, Value: 4234.234}, + {ID: metric.ID("foo"), TimeNs: testMetric.TimeNs, Value: 1.0}, + {ID: metric.ID("bar"), TimeNs: testMetric.TimeNs, Value: 2.3}, + {ID: metric.ID("baz"), TimeNs: testMetric.TimeNs, Value: 4234.234}, } rawMetric := NewRawMetric(nil, 16) for i := 0; i < len(metrics); i++ { @@ -202,9 +201,9 @@ func TestRawMetricReset(t *testing.T) { func TestRawMetricRoundtripStress(t *testing.T) { metrics := []aggregated.Metric{ - {ID: metric.ID("foo"), Timestamp: testMetric.Timestamp, Value: 1.0}, - {ID: metric.ID("bar"), Timestamp: testMetric.Timestamp, Value: 2.3}, - {ID: metric.ID("baz"), Timestamp: testMetric.Timestamp, Value: 4234.234}, + {ID: metric.ID("foo"), TimeNs: testMetric.TimeNs, Value: 1.0}, + {ID: metric.ID("bar"), TimeNs: testMetric.TimeNs, Value: 2.3}, + {ID: metric.ID("baz"), TimeNs: testMetric.TimeNs, Value: 4234.234}, } var ( inputs []aggregated.Metric @@ -224,7 +223,7 @@ func TestRawMetricRoundtripStress(t *testing.T) { type decodeVersionFn func() int type decodeBytesLenFn func() int -type decodeTimeFn func() time.Time +type decodeVarintFn func() int64 type decodeFloat64Fn func() float64 type mockBaseIterator struct { @@ -232,7 +231,7 @@ type mockBaseIterator struct { itErr error decodeVersionFn decodeVersionFn decodeBytesLenFn decodeBytesLenFn - decodeTimeFn decodeTimeFn + decodeVarintFn decodeVarintFn decodeFloat64Fn decodeFloat64Fn } @@ -245,8 +244,8 @@ func (it *mockBaseIterator) decodeVersion() int { return it.decodeVers func (it *mockBaseIterator) decodeObjectType() objectType { return unknownType } func (it *mockBaseIterator) decodeNumObjectFields() int { return 0 } func (it *mockBaseIterator) decodeID() metric.ID { return nil } -func (it *mockBaseIterator) decodeTime() time.Time { return it.decodeTimeFn() } -func (it *mockBaseIterator) decodeVarint() int64 { return 0 } +func (it *mockBaseIterator) decodeVarint() int64 { return it.decodeVarintFn() } +func (it *mockBaseIterator) decodeBool() bool { return false } func (it *mockBaseIterator) decodeFloat64() float64 { return it.decodeFloat64Fn() } func (it *mockBaseIterator) decodeBytes() []byte { return nil } func (it *mockBaseIterator) decodeBytesLen() int { return it.decodeBytesLenFn() } @@ -260,15 +259,15 @@ func (it *mockBaseIterator) checkNumFieldsForType(objType objectType) (int, int, func (it *mockBaseIterator) checkNumFieldsForTypeWithActual( objType objectType, numActualFields int, -) (int, int, bool) { - return 0, 0, true +) (int, bool) { + return 0, true } func testRawMetric() *rawMetric { mockIt := &mockBaseIterator{} mockIt.decodeVersionFn = func() int { return metricVersion } mockIt.decodeBytesLenFn = func() int { return len(testMetric.ID) } - mockIt.decodeTimeFn = func() time.Time { return testMetric.Timestamp } + mockIt.decodeVarintFn = func() int64 { return testMetric.TimeNs } mockIt.decodeFloat64Fn = func() float64 { return testMetric.Value } mockIt.bufReader = bytes.NewReader(testRawMetricData) diff --git a/protocol/msgpack/schema.go b/protocol/msgpack/schema.go index ab624b3..1c00dc2 100644 --- a/protocol/msgpack/schema.go +++ b/protocol/msgpack/schema.go @@ -40,9 +40,9 @@ const ( rootObjectType // Object types exposed to the encoder interface. - counterWithPoliciesType - batchTimerWithPoliciesType - gaugeWithPoliciesType + counterWithPoliciesListType + batchTimerWithPoliciesListType + gaugeWithPoliciesListType rawMetricWithPolicyType // Object types not exposed to the encoder interface. @@ -50,35 +50,37 @@ const ( batchTimerType gaugeType metricType + defaultPoliciesListType + customPoliciesListType + stagedPoliciesType policyType knownResolutionType unknownResolutionType knownRetentionType unknownRetentionType - defaultVersionedPoliciesType - customVersionedPoliciesType // Total number of object types. numObjectTypes = iota ) const ( - numRootObjectFields = 2 - numCounterWithPoliciesFields = 2 - numBatchTimerWithPoliciesFields = 2 - numGaugeWithPoliciesFields = 2 - numRawMetricWithPolicyFields = 2 - numCounterFields = 2 - numBatchTimerFields = 2 - numGaugeFields = 2 - numMetricFields = 3 - numPolicyFields = 2 - numKnownResolutionFields = 2 - numUnknownResolutionFields = 3 - numKnownRetentionFields = 2 - numUnknownRetentionFields = 2 - numDefaultVersionedPolicyFields = 3 - numCustomVersionedPolicyFields = 4 + numRootObjectFields = 2 + numCounterWithPoliciesListFields = 2 + numBatchTimerWithPoliciesListFields = 2 + numGaugeWithPoliciesListFields = 2 + numRawMetricWithPolicyFields = 2 + numCounterFields = 2 + numBatchTimerFields = 2 + numGaugeFields = 2 + numMetricFields = 3 + numDefaultStagedPoliciesListFields = 1 + numCustomStagedPoliciesListFields = 2 + numStagedPoliciesFields = 3 + numPolicyFields = 2 + numKnownResolutionFields = 2 + numUnknownResolutionFields = 3 + numKnownRetentionFields = 2 + numUnknownRetentionFields = 2 ) // NB(xichen): use a slice instead of a map to avoid lookup overhead. @@ -96,19 +98,20 @@ func init() { numObjectFields = make([]int, int(numObjectTypes)) setNumFieldsForType(rootObjectType, numRootObjectFields) - setNumFieldsForType(counterWithPoliciesType, numCounterWithPoliciesFields) - setNumFieldsForType(batchTimerWithPoliciesType, numBatchTimerWithPoliciesFields) - setNumFieldsForType(gaugeWithPoliciesType, numGaugeWithPoliciesFields) + setNumFieldsForType(counterWithPoliciesListType, numCounterWithPoliciesListFields) + setNumFieldsForType(batchTimerWithPoliciesListType, numBatchTimerWithPoliciesListFields) + setNumFieldsForType(gaugeWithPoliciesListType, numGaugeWithPoliciesListFields) setNumFieldsForType(rawMetricWithPolicyType, numRawMetricWithPolicyFields) setNumFieldsForType(counterType, numCounterFields) setNumFieldsForType(batchTimerType, numBatchTimerFields) setNumFieldsForType(gaugeType, numGaugeFields) setNumFieldsForType(metricType, numMetricFields) + setNumFieldsForType(defaultPoliciesListType, numDefaultStagedPoliciesListFields) + setNumFieldsForType(customPoliciesListType, numCustomStagedPoliciesListFields) + setNumFieldsForType(stagedPoliciesType, numStagedPoliciesFields) setNumFieldsForType(policyType, numPolicyFields) setNumFieldsForType(knownResolutionType, numKnownResolutionFields) setNumFieldsForType(unknownResolutionType, numUnknownResolutionFields) setNumFieldsForType(knownRetentionType, numKnownRetentionFields) setNumFieldsForType(unknownRetentionType, numKnownRetentionFields) - setNumFieldsForType(defaultVersionedPoliciesType, numDefaultVersionedPolicyFields) - setNumFieldsForType(customVersionedPoliciesType, numCustomVersionedPolicyFields) } diff --git a/protocol/msgpack/types.go b/protocol/msgpack/types.go index 6d370cc..be21228 100644 --- a/protocol/msgpack/types.go +++ b/protocol/msgpack/types.go @@ -23,7 +23,6 @@ package msgpack import ( "bytes" "io" - "time" "github.com/m3db/m3metrics/metric" "github.com/m3db/m3metrics/metric/aggregated" @@ -49,12 +48,12 @@ type Buffer interface { // Encoder is an encoder. type Encoder interface { - // EncodeTime encodes a time value. - EncodeTime(value time.Time) error - // EncodeInt64 encodes an int64 value. EncodeInt64(value int64) error + // EncodeBool encodes a boolean value. + EncodeBool(value bool) error + // EncodeFloat64 encodes a float64 value. EncodeFloat64(value float64) error @@ -121,12 +120,12 @@ type encoderBase interface { // encodeChunkedID encodes a chunked ID. encodeChunkedID(id metric.ChunkedID) - // encodeTime encodes a time. - encodeTime(t time.Time) - // encodeVarint encodes an integer value as varint. encodeVarint(value int64) + // encodeBool encodes a boolean value. + encodeBool(value bool) + // encodeFloat64 encodes a float64 value. encodeFloat64(value float64) @@ -169,12 +168,12 @@ type iteratorBase interface { // decodeID decodes an ID. decodeID() metric.ID - // decodeTime decodes a time. - decodeTime() time.Time - // decodeVarint decodes a variable-width integer value. decodeVarint() int64 + // decodeBool decodes a boolean value. + decodeBool() bool + // decodeFloat64 decodes a float64 value. decodeFloat64() float64 @@ -196,19 +195,19 @@ type iteratorBase interface { // checkNumFieldsForTypeWithActual compares the given number of actual fields with // the number of expected fields for a given object type. - checkNumFieldsForTypeWithActual(objType objectType, numActualFields int) (int, int, bool) + checkNumFieldsForTypeWithActual(objType objectType, numActualFields int) (int, bool) } // UnaggregatedEncoder is an encoder for encoding different types of unaggregated metrics. type UnaggregatedEncoder interface { - // EncodeCounterWithPolicies encodes a counter with applicable policies. - EncodeCounterWithPolicies(cp unaggregated.CounterWithPolicies) error + // EncodeCounterWithPoliciesList encodes a counter with applicable policies list. + EncodeCounterWithPoliciesList(cp unaggregated.CounterWithPoliciesList) error - // EncodeBatchTimerWithPolicies encodes a batched timer with applicable policies. - EncodeBatchTimerWithPolicies(btp unaggregated.BatchTimerWithPolicies) error + // EncodeBatchTimerWithPoliciesList encodes a batched timer with applicable policies list. + EncodeBatchTimerWithPoliciesList(btp unaggregated.BatchTimerWithPoliciesList) error - // EncodeGaugeWithPolicies encodes a gauge with applicable policies. - EncodeGaugeWithPolicies(gp unaggregated.GaugeWithPolicies) error + // EncodeGaugeWithPoliciesList encodes a gauge with applicable policies list. + EncodeGaugeWithPoliciesList(gp unaggregated.GaugeWithPoliciesList) error // Encoder returns the encoder. Encoder() BufferedEncoder @@ -222,9 +221,9 @@ type UnaggregatedIterator interface { // Next returns true if there are more items to decode. Next() bool - // Value returns the current metric and applicable policies. + // Value returns the current metric and applicable policies list. // The returned value remains valid until the next Next() call. - Value() (unaggregated.MetricUnion, policy.VersionedPolicies) + Value() (unaggregated.MetricUnion, policy.PoliciesList) // Err returns the error encountered during decoding, if any. Err() error diff --git a/protocol/msgpack/unaggregated_encoder.go b/protocol/msgpack/unaggregated_encoder.go index 63ee187..3b3ae49 100644 --- a/protocol/msgpack/unaggregated_encoder.go +++ b/protocol/msgpack/unaggregated_encoder.go @@ -27,27 +27,27 @@ import ( // Various object-level encoding functions to facilitate testing. type encodeRootObjectFn func(objType objectType) -type encodeCounterWithPoliciesFn func(cp unaggregated.CounterWithPolicies) -type encodeBatchTimerWithPoliciesFn func(btp unaggregated.BatchTimerWithPolicies) -type encodeGaugeWithPoliciesFn func(gp unaggregated.GaugeWithPolicies) +type encodeCounterWithPoliciesListFn func(cp unaggregated.CounterWithPoliciesList) +type encodeBatchTimerWithPoliciesListFn func(btp unaggregated.BatchTimerWithPoliciesList) +type encodeGaugeWithPoliciesListFn func(gp unaggregated.GaugeWithPoliciesList) type encodeCounterFn func(c unaggregated.Counter) type encodeBatchTimerFn func(bt unaggregated.BatchTimer) type encodeGaugeFn func(g unaggregated.Gauge) -type encodeVersionedPoliciesFn func(vp policy.VersionedPolicies) +type encodePoliciesListFn func(spl policy.PoliciesList) // unaggregatedEncoder uses MessagePack for encoding different types of unaggregated metrics. // It is not thread-safe. type unaggregatedEncoder struct { encoderBase - encodeRootObjectFn encodeRootObjectFn - encodeCounterWithPoliciesFn encodeCounterWithPoliciesFn - encodeBatchTimerWithPoliciesFn encodeBatchTimerWithPoliciesFn - encodeGaugeWithPoliciesFn encodeGaugeWithPoliciesFn - encodeCounterFn encodeCounterFn - encodeBatchTimerFn encodeBatchTimerFn - encodeGaugeFn encodeGaugeFn - encodeVersionedPoliciesFn encodeVersionedPoliciesFn + encodeRootObjectFn encodeRootObjectFn + encodeCounterWithPoliciesListFn encodeCounterWithPoliciesListFn + encodeBatchTimerWithPoliciesListFn encodeBatchTimerWithPoliciesListFn + encodeGaugeWithPoliciesListFn encodeGaugeWithPoliciesListFn + encodeCounterFn encodeCounterFn + encodeBatchTimerFn encodeBatchTimerFn + encodeGaugeFn encodeGaugeFn + encodePoliciesListFn encodePoliciesListFn } // NewUnaggregatedEncoder creates a new unaggregated encoder. @@ -55,13 +55,13 @@ func NewUnaggregatedEncoder(encoder BufferedEncoder) UnaggregatedEncoder { enc := &unaggregatedEncoder{encoderBase: newBaseEncoder(encoder)} enc.encodeRootObjectFn = enc.encodeRootObject - enc.encodeCounterWithPoliciesFn = enc.encodeCounterWithPolicies - enc.encodeBatchTimerWithPoliciesFn = enc.encodeBatchTimerWithPolicies - enc.encodeGaugeWithPoliciesFn = enc.encodeGaugeWithPolicies + enc.encodeCounterWithPoliciesListFn = enc.encodeCounterWithPoliciesList + enc.encodeBatchTimerWithPoliciesListFn = enc.encodeBatchTimerWithPoliciesList + enc.encodeGaugeWithPoliciesListFn = enc.encodeGaugeWithPoliciesList enc.encodeCounterFn = enc.encodeCounter enc.encodeBatchTimerFn = enc.encodeBatchTimer enc.encodeGaugeFn = enc.encodeGauge - enc.encodeVersionedPoliciesFn = enc.encodeVersionedPolicies + enc.encodePoliciesListFn = enc.encodePoliciesList return enc } @@ -69,30 +69,30 @@ func NewUnaggregatedEncoder(encoder BufferedEncoder) UnaggregatedEncoder { func (enc *unaggregatedEncoder) Encoder() BufferedEncoder { return enc.encoder() } func (enc *unaggregatedEncoder) Reset(encoder BufferedEncoder) { enc.reset(encoder) } -func (enc *unaggregatedEncoder) EncodeCounterWithPolicies(cp unaggregated.CounterWithPolicies) error { +func (enc *unaggregatedEncoder) EncodeCounterWithPoliciesList(cp unaggregated.CounterWithPoliciesList) error { if err := enc.err(); err != nil { return err } - enc.encodeRootObjectFn(counterWithPoliciesType) - enc.encodeCounterWithPoliciesFn(cp) + enc.encodeRootObjectFn(counterWithPoliciesListType) + enc.encodeCounterWithPoliciesListFn(cp) return enc.err() } -func (enc *unaggregatedEncoder) EncodeBatchTimerWithPolicies(btp unaggregated.BatchTimerWithPolicies) error { +func (enc *unaggregatedEncoder) EncodeBatchTimerWithPoliciesList(btp unaggregated.BatchTimerWithPoliciesList) error { if err := enc.err(); err != nil { return err } - enc.encodeRootObjectFn(batchTimerWithPoliciesType) - enc.encodeBatchTimerWithPoliciesFn(btp) + enc.encodeRootObjectFn(batchTimerWithPoliciesListType) + enc.encodeBatchTimerWithPoliciesListFn(btp) return enc.err() } -func (enc *unaggregatedEncoder) EncodeGaugeWithPolicies(gp unaggregated.GaugeWithPolicies) error { +func (enc *unaggregatedEncoder) EncodeGaugeWithPoliciesList(gp unaggregated.GaugeWithPoliciesList) error { if err := enc.err(); err != nil { return err } - enc.encodeRootObjectFn(gaugeWithPoliciesType) - enc.encodeGaugeWithPoliciesFn(gp) + enc.encodeRootObjectFn(gaugeWithPoliciesListType) + enc.encodeGaugeWithPoliciesListFn(gp) return enc.err() } @@ -102,22 +102,22 @@ func (enc *unaggregatedEncoder) encodeRootObject(objType objectType) { enc.encodeObjectType(objType) } -func (enc *unaggregatedEncoder) encodeCounterWithPolicies(cp unaggregated.CounterWithPolicies) { - enc.encodeNumObjectFields(numFieldsForType(counterWithPoliciesType)) +func (enc *unaggregatedEncoder) encodeCounterWithPoliciesList(cp unaggregated.CounterWithPoliciesList) { + enc.encodeNumObjectFields(numFieldsForType(counterWithPoliciesListType)) enc.encodeCounterFn(cp.Counter) - enc.encodeVersionedPoliciesFn(cp.VersionedPolicies) + enc.encodePoliciesListFn(cp.PoliciesList) } -func (enc *unaggregatedEncoder) encodeBatchTimerWithPolicies(btp unaggregated.BatchTimerWithPolicies) { - enc.encodeNumObjectFields(numFieldsForType(batchTimerWithPoliciesType)) +func (enc *unaggregatedEncoder) encodeBatchTimerWithPoliciesList(btp unaggregated.BatchTimerWithPoliciesList) { + enc.encodeNumObjectFields(numFieldsForType(batchTimerWithPoliciesListType)) enc.encodeBatchTimerFn(btp.BatchTimer) - enc.encodeVersionedPoliciesFn(btp.VersionedPolicies) + enc.encodePoliciesListFn(btp.PoliciesList) } -func (enc *unaggregatedEncoder) encodeGaugeWithPolicies(gp unaggregated.GaugeWithPolicies) { - enc.encodeNumObjectFields(numFieldsForType(gaugeWithPoliciesType)) +func (enc *unaggregatedEncoder) encodeGaugeWithPoliciesList(gp unaggregated.GaugeWithPoliciesList) { + enc.encodeNumObjectFields(numFieldsForType(gaugeWithPoliciesListType)) enc.encodeGaugeFn(gp.Gauge) - enc.encodeVersionedPoliciesFn(gp.VersionedPolicies) + enc.encodePoliciesListFn(gp.PoliciesList) } func (enc *unaggregatedEncoder) encodeCounter(c unaggregated.Counter) { @@ -141,22 +141,26 @@ func (enc *unaggregatedEncoder) encodeGauge(g unaggregated.Gauge) { enc.encodeFloat64(g.Value) } -func (enc *unaggregatedEncoder) encodeVersionedPolicies(vp policy.VersionedPolicies) { - // NB(xichen): if this is a default policy, we do not encode the actual policies - // to optimize for the common case. - if vp.IsDefault() { - enc.encodeNumObjectFields(numFieldsForType(defaultVersionedPoliciesType)) - enc.encodeObjectType(defaultVersionedPoliciesType) - enc.encodeVersion(vp.Version) - enc.encodeTime(vp.Cutover) +func (enc *unaggregatedEncoder) encodePoliciesList(pl policy.PoliciesList) { + if pl.IsDefault() { + enc.encodeNumObjectFields(numFieldsForType(defaultPoliciesListType)) + enc.encodeObjectType(defaultPoliciesListType) return } - // Otherwise fallback to encoding the entire object. - enc.encodeNumObjectFields(numFieldsForType(customVersionedPoliciesType)) - enc.encodeObjectType(customVersionedPoliciesType) - enc.encodeVersion(vp.Version) - enc.encodeTime(vp.Cutover) - policies := vp.Policies() + enc.encodeNumObjectFields(numFieldsForType(customPoliciesListType)) + enc.encodeObjectType(customPoliciesListType) + numPolicies := len(pl) + enc.encodeArrayLen(numPolicies) + for i := 0; i < numPolicies; i++ { + enc.encodeStagedPolicies(pl[i]) + } +} + +func (enc *unaggregatedEncoder) encodeStagedPolicies(sp policy.StagedPolicies) { + enc.encodeNumObjectFields(numFieldsForType(stagedPoliciesType)) + enc.encodeVarint(sp.CutoverNs) + enc.encodeBool(sp.Tombstoned) + policies := sp.Policies() enc.encodeArrayLen(len(policies)) for _, policy := range policies { enc.encodePolicy(policy) diff --git a/protocol/msgpack/unaggregated_encoder_test.go b/protocol/msgpack/unaggregated_encoder_test.go index c0a0fbd..d4b4904 100644 --- a/protocol/msgpack/unaggregated_encoder_test.go +++ b/protocol/msgpack/unaggregated_encoder_test.go @@ -41,7 +41,7 @@ var ( ) func TestUnaggregatedEncodeCounterWithDefaultPolicies(t *testing.T) { - policies := testDefaultVersionedPolicies + policies := testDefaultStagedPolicies encoder, results := testCapturingUnaggregatedEncoder(t) require.NoError(t, testUnaggregatedEncode(t, encoder, testCounter, policies)) expected := expectedResultsForUnaggregatedMetricWithPolicies(t, testCounter, policies) @@ -49,7 +49,7 @@ func TestUnaggregatedEncodeCounterWithDefaultPolicies(t *testing.T) { } func TestUnaggregatedEncodeBatchTimerWithDefaultPolicies(t *testing.T) { - policies := testDefaultVersionedPolicies + policies := testDefaultStagedPolicies encoder, results := testCapturingUnaggregatedEncoder(t) require.NoError(t, testUnaggregatedEncode(t, encoder, testBatchTimer, policies)) expected := expectedResultsForUnaggregatedMetricWithPolicies(t, testBatchTimer, policies) @@ -57,7 +57,7 @@ func TestUnaggregatedEncodeBatchTimerWithDefaultPolicies(t *testing.T) { } func TestUnaggregatedEncodeGaugeWithDefaultPolicies(t *testing.T) { - policies := testDefaultVersionedPolicies + policies := testDefaultStagedPolicies encoder, results := testCapturingUnaggregatedEncoder(t) require.NoError(t, testUnaggregatedEncode(t, encoder, testGauge, policies)) expected := expectedResultsForUnaggregatedMetricWithPolicies(t, testGauge, policies) @@ -68,19 +68,19 @@ func TestUnaggregatedEncodeAllTypesWithDefaultPolicies(t *testing.T) { var expected []interface{} encoder, results := testCapturingUnaggregatedEncoder(t) for _, input := range testInputWithAllTypesAndDefaultPolicies { - require.NoError(t, testUnaggregatedEncode(t, encoder, input.metric, input.versionedPolicies)) - expected = append(expected, expectedResultsForUnaggregatedMetricWithPolicies(t, input.metric, input.versionedPolicies)...) + require.NoError(t, testUnaggregatedEncode(t, encoder, input.metric, input.policiesList)) + expected = append(expected, expectedResultsForUnaggregatedMetricWithPolicies(t, input.metric, input.policiesList)...) } require.Equal(t, expected, *results) } -func TestUnaggregatedEncodeAllTypesWithCustomPolicies(t *testing.T) { +func TestUnaggregatedEncodeAllTypesWithSingleCustomPolicies(t *testing.T) { var expected []interface{} encoder, results := testCapturingUnaggregatedEncoder(t) - for _, input := range testInputWithAllTypesAndCustomPolicies { - require.NoError(t, testUnaggregatedEncode(t, encoder, input.metric, input.versionedPolicies)) - expected = append(expected, expectedResultsForUnaggregatedMetricWithPolicies(t, input.metric, input.versionedPolicies)...) + for _, input := range testInputWithAllTypesAndSingleCustomPolicies { + require.NoError(t, testUnaggregatedEncode(t, encoder, input.metric, input.policiesList)) + expected = append(expected, expectedResultsForUnaggregatedMetricWithPolicies(t, input.metric, input.policiesList)...) } require.Equal(t, expected, *results) @@ -88,7 +88,7 @@ func TestUnaggregatedEncodeAllTypesWithCustomPolicies(t *testing.T) { func TestUnaggregatedEncodeVarintError(t *testing.T) { counter := testCounter - policies := testDefaultVersionedPolicies + policies := testDefaultStagedPolicies // Intentionally return an error when encoding varint. encoder := testUnaggregatedEncoder(t).(*unaggregatedEncoder) @@ -106,7 +106,7 @@ func TestUnaggregatedEncodeVarintError(t *testing.T) { func TestUnaggregatedEncodeFloat64Error(t *testing.T) { gauge := testGauge - policies := testDefaultVersionedPolicies + policies := testDefaultStagedPolicies // Intentionally return an error when encoding float64. encoder := testUnaggregatedEncoder(t).(*unaggregatedEncoder) @@ -124,7 +124,7 @@ func TestUnaggregatedEncodeFloat64Error(t *testing.T) { func TestUnaggregatedEncodeBytesError(t *testing.T) { timer := testBatchTimer - policies := testDefaultVersionedPolicies + policies := testDefaultStagedPolicies // Intentionally return an error when encoding array length. encoder := testUnaggregatedEncoder(t).(*unaggregatedEncoder) @@ -142,13 +142,15 @@ func TestUnaggregatedEncodeBytesError(t *testing.T) { func TestUnaggregatedEncodeArrayLenError(t *testing.T) { gauge := testGauge - policies := policy.CustomVersionedPolicies( - 1, - time.Now(), - []policy.Policy{ - policy.NewPolicy(time.Second, xtime.Second, time.Hour), - }, - ) + policies := policy.PoliciesList{ + policy.NewStagedPolicies( + time.Now().UnixNano(), + false, + []policy.Policy{ + policy.NewPolicy(time.Second, xtime.Second, time.Hour), + }, + ), + } // Intentionally return an error when encoding array length. encoder := testUnaggregatedEncoder(t).(*unaggregatedEncoder) @@ -166,7 +168,7 @@ func TestUnaggregatedEncodeArrayLenError(t *testing.T) { func TestUnaggregatedEncoderReset(t *testing.T) { metric := testCounter - policies := testDefaultVersionedPolicies + policies := testDefaultStagedPolicies encoder := testUnaggregatedEncoder(t).(*unaggregatedEncoder) baseEncoder := encoder.encoderBase.(*baseEncoder) @@ -220,21 +222,12 @@ func expectedResultsForPolicy(t *testing.T, p policy.Policy) []interface{} { return results } -func expectedResultsForVersionedPolicies(t *testing.T, vp policy.VersionedPolicies) []interface{} { - if vp.IsDefault() { - return []interface{}{ - numFieldsForType(defaultVersionedPoliciesType), - int64(defaultVersionedPoliciesType), - int64(vp.Version), - vp.Cutover, - } - } - policies := vp.Policies() +func expectedResultsForStagedPolicies(t *testing.T, sp policy.StagedPolicies) []interface{} { + policies := sp.Policies() results := []interface{}{ - numFieldsForType(customVersionedPoliciesType), - int64(customVersionedPoliciesType), - int64(vp.Version), - vp.Cutover, + numFieldsForType(stagedPoliciesType), + sp.CutoverNs, + sp.Tombstoned, len(policies), } for _, p := range policies { @@ -243,10 +236,28 @@ func expectedResultsForVersionedPolicies(t *testing.T, vp policy.VersionedPolici return results } +func expectedResultsForPoliciesList(t *testing.T, pl policy.PoliciesList) []interface{} { + if pl.IsDefault() { + return []interface{}{ + numFieldsForType(defaultPoliciesListType), + int64(defaultPoliciesListType), + } + } + results := []interface{}{ + numFieldsForType(customPoliciesListType), + int64(customPoliciesListType), + len(pl), + } + for _, sp := range pl { + results = append(results, expectedResultsForStagedPolicies(t, sp)...) + } + return results +} + func expectedResultsForUnaggregatedMetricWithPolicies( t *testing.T, m unaggregated.MetricUnion, - vp policy.VersionedPolicies, + pl policy.PoliciesList, ) []interface{} { results := []interface{}{ int64(unaggregatedVersion), @@ -256,16 +267,16 @@ func expectedResultsForUnaggregatedMetricWithPolicies( switch m.Type { case unaggregated.CounterType: results = append(results, []interface{}{ - int64(counterWithPoliciesType), - numFieldsForType(counterWithPoliciesType), + int64(counterWithPoliciesListType), + numFieldsForType(counterWithPoliciesListType), numFieldsForType(counterType), []byte(m.ID), m.CounterVal, }...) case unaggregated.BatchTimerType: results = append(results, []interface{}{ - int64(batchTimerWithPoliciesType), - numFieldsForType(batchTimerWithPoliciesType), + int64(batchTimerWithPoliciesListType), + numFieldsForType(batchTimerWithPoliciesListType), numFieldsForType(batchTimerType), []byte(m.ID), len(m.BatchTimerVal), @@ -275,8 +286,8 @@ func expectedResultsForUnaggregatedMetricWithPolicies( } case unaggregated.GaugeType: results = append(results, []interface{}{ - int64(gaugeWithPoliciesType), - numFieldsForType(gaugeWithPoliciesType), + int64(gaugeWithPoliciesListType), + numFieldsForType(gaugeWithPoliciesListType), numFieldsForType(gaugeType), []byte(m.ID), m.GaugeVal, @@ -285,8 +296,8 @@ func expectedResultsForUnaggregatedMetricWithPolicies( require.Fail(t, fmt.Sprintf("unrecognized metric type %v", m.Type)) } - vpRes := expectedResultsForVersionedPolicies(t, vp) - results = append(results, vpRes...) + plRes := expectedResultsForPoliciesList(t, pl) + results = append(results, plRes...) return results } diff --git a/protocol/msgpack/unaggregated_iterator.go b/protocol/msgpack/unaggregated_iterator.go index 58d67b7..a88c9f9 100644 --- a/protocol/msgpack/unaggregated_iterator.go +++ b/protocol/msgpack/unaggregated_iterator.go @@ -45,12 +45,13 @@ type unaggregatedIterator struct { largeFloatsPool pool.FloatsPool iteratorPool UnaggregatedIteratorPool - closed bool - metric unaggregated.MetricUnion - versionedPolicies policy.VersionedPolicies - id metric.ID - policies []policy.Policy - timerValues []float64 + closed bool + metric unaggregated.MetricUnion + policiesList policy.PoliciesList + id metric.ID + timerValues []float64 + cachedPolicies [][]policy.Policy + cachedPoliciesList policy.PoliciesList } // NewUnaggregatedIterator creates a new unaggregated iterator. @@ -77,8 +78,8 @@ func (it *unaggregatedIterator) Reset(reader io.Reader) { it.reset(reader) } -func (it *unaggregatedIterator) Value() (unaggregated.MetricUnion, policy.VersionedPolicies) { - return it.metric, it.versionedPolicies +func (it *unaggregatedIterator) Value() (unaggregated.MetricUnion, policy.PoliciesList) { + return it.metric, it.policiesList } func (it *unaggregatedIterator) Next() bool { @@ -100,7 +101,9 @@ func (it *unaggregatedIterator) Close() { it.closed = true it.reset(emptyReader) it.metric.Reset() - it.versionedPolicies.Reset() + it.policiesList = nil + it.cachedPolicies = nil + it.cachedPoliciesList = nil if it.iteratorPool != nil { it.iteratorPool.Put(it) } @@ -131,8 +134,8 @@ func (it *unaggregatedIterator) decodeRootObject() bool { return false } switch objType { - case counterWithPoliciesType, batchTimerWithPoliciesType, gaugeWithPoliciesType: - it.decodeMetricWithPolicies(objType) + case counterWithPoliciesListType, batchTimerWithPoliciesListType, gaugeWithPoliciesListType: + it.decodeMetricWithPoliciesList(objType) default: it.setErr(fmt.Errorf("unrecognized object type %v", objType)) } @@ -141,23 +144,23 @@ func (it *unaggregatedIterator) decodeRootObject() bool { return it.err() == nil } -func (it *unaggregatedIterator) decodeMetricWithPolicies(objType objectType) { +func (it *unaggregatedIterator) decodeMetricWithPoliciesList(objType objectType) { numExpectedFields, numActualFields, ok := it.checkNumFieldsForType(objType) if !ok { return } switch objType { - case counterWithPoliciesType: + case counterWithPoliciesListType: it.decodeCounter() - case batchTimerWithPoliciesType: + case batchTimerWithPoliciesListType: it.decodeBatchTimer() - case gaugeWithPoliciesType: + case gaugeWithPoliciesListType: it.decodeGauge() default: it.setErr(fmt.Errorf("unrecognized metric with policies type %v", objType)) return } - it.decodeVersionedPolicies() + it.decodePoliciesList() it.skip(numActualFields - numExpectedFields) } @@ -219,37 +222,64 @@ func (it *unaggregatedIterator) decodeGauge() { it.skip(numActualFields - numExpectedFields) } -func (it *unaggregatedIterator) decodeVersionedPolicies() { +func (it *unaggregatedIterator) decodePoliciesList() { numActualFields := it.decodeNumObjectFields() - versionedPoliciesType := it.decodeObjectType() - numExpectedFields, numActualFields, ok := it.checkNumFieldsForTypeWithActual( - versionedPoliciesType, + policiesListType := it.decodeObjectType() + numExpectedFields, ok := it.checkNumFieldsForTypeWithActual( + policiesListType, numActualFields, ) if !ok { return } - version := it.decodeVersion() - cutover := it.decodeTime() - switch versionedPoliciesType { - case defaultVersionedPoliciesType: - it.versionedPolicies = policy.DefaultVersionedPolicies(version, cutover) - it.skip(numActualFields - numExpectedFields) - case customVersionedPoliciesType: - numPolicies := it.decodeArrayLen() - if cap(it.policies) < numPolicies { - it.policies = make([]policy.Policy, 0, numPolicies) + switch policiesListType { + case defaultPoliciesListType: + it.policiesList = policy.DefaultPoliciesList + case customPoliciesListType: + numStagedPolicies := it.decodeArrayLen() + if cap(it.cachedPoliciesList) < numStagedPolicies { + it.cachedPoliciesList = make(policy.PoliciesList, 0, numStagedPolicies) + } else { + it.cachedPoliciesList = it.cachedPoliciesList[:0] + } + if len(it.cachedPolicies) < numStagedPolicies { + it.cachedPolicies = make([][]policy.Policy, numStagedPolicies) } else { - it.policies = it.policies[:0] + it.cachedPolicies = it.cachedPolicies[:numStagedPolicies] } - for i := 0; i < numPolicies; i++ { - it.policies = append(it.policies, it.decodePolicy()) + for policyIdx := 0; policyIdx < numStagedPolicies; policyIdx++ { + decodedStagedPolicies := it.decodeStagedPolicies(policyIdx) + it.cachedPoliciesList = append(it.cachedPoliciesList, decodedStagedPolicies) } - it.versionedPolicies = policy.CustomVersionedPolicies(version, cutover, it.policies) - it.skip(numActualFields - numExpectedFields) + it.policiesList = it.cachedPoliciesList default: - it.setErr(fmt.Errorf("unrecognized versioned policies type: %v", versionedPoliciesType)) + it.setErr(fmt.Errorf("unrecognized policies list type: %v", policiesListType)) + } + it.skip(numActualFields - numExpectedFields) +} + +// decodeStagedPolicies decodes a staged policies using the cached policies +// to avoid memory reallocations and returns the decoded staged policies. +// If an error is encountered during decoding, it is stored in the iterator. +func (it *unaggregatedIterator) decodeStagedPolicies(policyIdx int) policy.StagedPolicies { + numExpectedFields, numActualFields, ok := it.checkNumFieldsForType(stagedPoliciesType) + if !ok { + return policy.EmptyStagedPolicies + } + cutoverNs := it.decodeVarint() + tombstoned := it.decodeBool() + numPolicies := it.decodeArrayLen() + if cap(it.cachedPolicies[policyIdx]) < numPolicies { + it.cachedPolicies[policyIdx] = make([]policy.Policy, 0, numPolicies) + } else { + it.cachedPolicies[policyIdx] = it.cachedPolicies[policyIdx][:0] + } + for i := 0; i < numPolicies; i++ { + it.cachedPolicies[policyIdx] = append(it.cachedPolicies[policyIdx], it.decodePolicy()) } + stagedPolicies := policy.NewStagedPolicies(cutoverNs, tombstoned, it.cachedPolicies[policyIdx]) + it.skip(numActualFields - numExpectedFields) + return stagedPolicies } func (it *unaggregatedIterator) decodeID() metric.ID { diff --git a/protocol/msgpack/unaggregated_iterator_test.go b/protocol/msgpack/unaggregated_iterator_test.go index df62a19..4e22f70 100644 --- a/protocol/msgpack/unaggregated_iterator_test.go +++ b/protocol/msgpack/unaggregated_iterator_test.go @@ -36,40 +36,42 @@ import ( "github.com/stretchr/testify/require" ) -func TestUnaggregatedIteratorDecodeDefaultPolicies(t *testing.T) { +func TestUnaggregatedIteratorDecodeDefaultPoliciesList(t *testing.T) { enc := testUnaggregatedEncoder(t).(*unaggregatedEncoder) - enc.encodeVersionedPolicies(testDefaultVersionedPolicies) + enc.encodePoliciesList(testDefaultStagedPolicies) require.NoError(t, enc.err()) it := testUnaggregatedIterator(t, enc.Encoder().Buffer()).(*unaggregatedIterator) - it.decodeVersionedPolicies() + it.decodePoliciesList() require.NoError(t, it.Err()) - _, vp := it.Value() - require.Equal(t, testDefaultVersionedPolicies, vp) + _, pl := it.Value() + require.Equal(t, testDefaultStagedPolicies, pl) } -func TestUnaggregatedIteratorDecodeCustomPoliciesWithAlloc(t *testing.T) { +func TestUnaggregatedIteratorDecodeSingleCustomPoliciesListWithAlloc(t *testing.T) { enc := testUnaggregatedEncoder(t).(*unaggregatedEncoder) - enc.encodeVersionedPolicies(testCustomVersionedPolicies) + enc.encodePoliciesList(testSingleCustomStagedPolicies) require.NoError(t, enc.err()) it := testUnaggregatedIterator(t, enc.Encoder().Buffer()).(*unaggregatedIterator) - it.decodeVersionedPolicies() + it.decodePoliciesList() require.NoError(t, it.Err()) - _, vp := it.Value() - require.Equal(t, testCustomVersionedPolicies, vp) - require.Equal(t, len(it.policies), len(testCustomVersionedPolicies.Policies())) + _, pl := it.Value() + require.Equal(t, testSingleCustomStagedPolicies, pl) + require.Equal(t, len(it.cachedPolicies), len(testSingleCustomStagedPolicies)) + require.Equal(t, it.cachedPolicies[0], testSingleCustomStagedPolicies[0].Policies()) } -func TestUnaggregatedIteratorDecodeCustomPoliciesNoAlloc(t *testing.T) { +func TestUnaggregatedIteratorDecodeSingleCustomPoliciesNoPoliciesListAlloc(t *testing.T) { enc := testUnaggregatedEncoder(t).(*unaggregatedEncoder) - enc.encodeVersionedPolicies(testCustomVersionedPolicies) + enc.encodePoliciesList(testSingleCustomStagedPolicies) require.NoError(t, enc.err()) it := testUnaggregatedIterator(t, enc.Encoder().Buffer()).(*unaggregatedIterator) - it.policies = make([]policy.Policy, len(testCustomVersionedPolicies.Policies())*3) - it.decodeVersionedPolicies() + it.cachedPoliciesList = make(policy.PoliciesList, len(testSingleCustomStagedPolicies)*3) + it.decodePoliciesList() require.NoError(t, it.Err()) - _, vp := it.Value() - require.Equal(t, testCustomVersionedPolicies, vp) - require.Equal(t, len(it.policies), len(testCustomVersionedPolicies.Policies())) + _, pl := it.Value() + require.Equal(t, testSingleCustomStagedPolicies, pl) + require.Equal(t, len(it.cachedPolicies), len(testSingleCustomStagedPolicies)) + require.Equal(t, it.cachedPolicies[0], testSingleCustomStagedPolicies[0].Policies()) } func TestUnaggregatedIteratorDecodeIDDecodeBytesLenError(t *testing.T) { @@ -260,9 +262,9 @@ func TestUnaggregatedIteratorDecodeBatchTimerWithAllocPoolAlloc(t *testing.T) { } func TestUnaggregatedIteratorDecodeNewerVersionThanSupported(t *testing.T) { - input := metricWithPolicies{ - metric: testCounter, - versionedPolicies: testDefaultVersionedPolicies, + input := metricWithPoliciesList{ + metric: testCounter, + policiesList: testDefaultStagedPolicies, } enc := testUnaggregatedEncoder(t).(*unaggregatedEncoder) @@ -272,16 +274,16 @@ func TestUnaggregatedIteratorDecodeNewerVersionThanSupported(t *testing.T) { enc.encodeNumObjectFields(numFieldsForType(rootObjectType)) enc.encodeObjectType(objType) } - require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.versionedPolicies)) + require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.policiesList)) // Now restore the encode top-level function and encode another counter. enc.encodeRootObjectFn = enc.encodeRootObject - require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.versionedPolicies)) + require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.policiesList)) // Check that we skipped the first counter and successfully decoded the second counter. it := testUnaggregatedIterator(t, bytes.NewBuffer(enc.Encoder().Bytes())) it.(*unaggregatedIterator).ignoreHigherVersion = true - validateUnaggregatedDecodeResults(t, it, []metricWithPolicies{input}, io.EOF) + validateUnaggregatedDecodeResults(t, it, []metricWithPoliciesList{input}, io.EOF) it.Reset(bytes.NewBuffer(enc.Encoder().Bytes())) it.(*unaggregatedIterator).ignoreHigherVersion = false @@ -289,9 +291,9 @@ func TestUnaggregatedIteratorDecodeNewerVersionThanSupported(t *testing.T) { } func TestUnaggregatedIteratorDecodeRootObjectMoreFieldsThanExpected(t *testing.T) { - input := metricWithPolicies{ - metric: testCounter, - versionedPolicies: testDefaultVersionedPolicies, + input := metricWithPoliciesList{ + metric: testCounter, + policiesList: testDefaultStagedPolicies, } enc := testUnaggregatedEncoder(t).(*unaggregatedEncoder) @@ -301,43 +303,43 @@ func TestUnaggregatedIteratorDecodeRootObjectMoreFieldsThanExpected(t *testing.T enc.encodeNumObjectFields(numFieldsForType(rootObjectType) + 1) enc.encodeObjectType(objType) } - testUnaggregatedEncode(t, enc, input.metric, input.versionedPolicies) + testUnaggregatedEncode(t, enc, input.metric, input.policiesList) enc.encodeVarint(0) require.NoError(t, enc.err()) it := testUnaggregatedIterator(t, enc.Encoder().Buffer()) // Check that we successfully decoded the counter. - validateUnaggregatedDecodeResults(t, it, []metricWithPolicies{input}, io.EOF) + validateUnaggregatedDecodeResults(t, it, []metricWithPoliciesList{input}, io.EOF) } func TestUnaggregatedIteratorDecodeCounterWithPoliciesMoreFieldsThanExpected(t *testing.T) { - input := metricWithPolicies{ - metric: testCounter, - versionedPolicies: testDefaultVersionedPolicies, + input := metricWithPoliciesList{ + metric: testCounter, + policiesList: testDefaultStagedPolicies, } enc := testUnaggregatedEncoder(t).(*unaggregatedEncoder) // Pretend we added an extra int field to the counter with policies object. - enc.encodeCounterWithPoliciesFn = func(cp unaggregated.CounterWithPolicies) { - enc.encodeNumObjectFields(numFieldsForType(counterWithPoliciesType) + 1) + enc.encodeCounterWithPoliciesListFn = func(cp unaggregated.CounterWithPoliciesList) { + enc.encodeNumObjectFields(numFieldsForType(counterWithPoliciesListType) + 1) enc.encodeCounterFn(cp.Counter) - enc.encodeVersionedPoliciesFn(cp.VersionedPolicies) + enc.encodePoliciesListFn(cp.PoliciesList) enc.encodeVarint(0) } - testUnaggregatedEncode(t, enc, input.metric, input.versionedPolicies) + testUnaggregatedEncode(t, enc, input.metric, input.policiesList) require.NoError(t, enc.err()) it := testUnaggregatedIterator(t, enc.Encoder().Buffer()) // Check that we successfully decoded the counter. - validateUnaggregatedDecodeResults(t, it, []metricWithPolicies{input}, io.EOF) + validateUnaggregatedDecodeResults(t, it, []metricWithPoliciesList{input}, io.EOF) } func TestUnaggregatedIteratorDecodeCounterMoreFieldsThanExpected(t *testing.T) { - input := metricWithPolicies{ - metric: testCounter, - versionedPolicies: testDefaultVersionedPolicies, + input := metricWithPoliciesList{ + metric: testCounter, + policiesList: testDefaultStagedPolicies, } enc := testUnaggregatedEncoder(t).(*unaggregatedEncoder) @@ -348,18 +350,18 @@ func TestUnaggregatedIteratorDecodeCounterMoreFieldsThanExpected(t *testing.T) { enc.encodeVarint(int64(c.Value)) enc.encodeVarint(0) } - require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.versionedPolicies)) + require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.policiesList)) it := testUnaggregatedIterator(t, enc.Encoder().Buffer()) // Check that we successfully decoded the counter. - validateUnaggregatedDecodeResults(t, it, []metricWithPolicies{input}, io.EOF) + validateUnaggregatedDecodeResults(t, it, []metricWithPoliciesList{input}, io.EOF) } func TestUnaggregatedIteratorDecodeBatchTimerMoreFieldsThanExpected(t *testing.T) { - input := metricWithPolicies{ - metric: testBatchTimer, - versionedPolicies: testDefaultVersionedPolicies, + input := metricWithPoliciesList{ + metric: testBatchTimer, + policiesList: testDefaultStagedPolicies, } enc := testUnaggregatedEncoder(t).(*unaggregatedEncoder) @@ -373,18 +375,18 @@ func TestUnaggregatedIteratorDecodeBatchTimerMoreFieldsThanExpected(t *testing.T } enc.encodeVarint(0) } - require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.versionedPolicies)) + require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.policiesList)) it := testUnaggregatedIterator(t, enc.Encoder().Buffer()) // Check that we successfully decoded the batch timer. - validateUnaggregatedDecodeResults(t, it, []metricWithPolicies{input}, io.EOF) + validateUnaggregatedDecodeResults(t, it, []metricWithPoliciesList{input}, io.EOF) } func TestUnaggregatedIteratorDecodeGaugeMoreFieldsThanExpected(t *testing.T) { - input := metricWithPolicies{ - metric: testGauge, - versionedPolicies: testDefaultVersionedPolicies, + input := metricWithPoliciesList{ + metric: testGauge, + policiesList: testDefaultStagedPolicies, } enc := testUnaggregatedEncoder(t).(*unaggregatedEncoder) @@ -395,64 +397,70 @@ func TestUnaggregatedIteratorDecodeGaugeMoreFieldsThanExpected(t *testing.T) { enc.encodeFloat64(g.Value) enc.encodeVarint(0) } - require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.versionedPolicies)) + require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.policiesList)) it := testUnaggregatedIterator(t, enc.Encoder().Buffer()) // Check that we successfully decoded the gauge. - validateUnaggregatedDecodeResults(t, it, []metricWithPolicies{input}, io.EOF) + validateUnaggregatedDecodeResults(t, it, []metricWithPoliciesList{input}, io.EOF) } func TestUnaggregatedIteratorDecodePolicyWithCustomResolution(t *testing.T) { - input := metricWithPolicies{ + input := metricWithPoliciesList{ metric: testGauge, - versionedPolicies: policy.CustomVersionedPolicies( - 1, - time.Now(), - []policy.Policy{ - policy.NewPolicy(3*time.Second, xtime.Second, time.Hour), - }, - ), + policiesList: policy.PoliciesList{ + policy.NewStagedPolicies( + time.Now().UnixNano(), + false, + []policy.Policy{ + policy.NewPolicy(3*time.Second, xtime.Second, time.Hour), + }, + ), + }, } enc := testUnaggregatedEncoder(t) - require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.versionedPolicies)) + require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.policiesList)) it := testUnaggregatedIterator(t, enc.Encoder().Buffer()) // Check that we successfully decoded the policy. - validateUnaggregatedDecodeResults(t, it, []metricWithPolicies{input}, io.EOF) + validateUnaggregatedDecodeResults(t, it, []metricWithPoliciesList{input}, io.EOF) } func TestUnaggregatedIteratorDecodePolicyWithCustomRetention(t *testing.T) { - input := metricWithPolicies{ + input := metricWithPoliciesList{ metric: testGauge, - versionedPolicies: policy.CustomVersionedPolicies( - 1, - time.Now(), - []policy.Policy{ - policy.NewPolicy(time.Second, xtime.Second, 289*time.Hour), - }, - ), + policiesList: policy.PoliciesList{ + policy.NewStagedPolicies( + time.Now().UnixNano(), + false, + []policy.Policy{ + policy.NewPolicy(time.Second, xtime.Second, 289*time.Hour), + }, + ), + }, } enc := testUnaggregatedEncoder(t) - require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.versionedPolicies)) + require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.policiesList)) it := testUnaggregatedIterator(t, enc.Encoder().Buffer()) // Check that we successfully decoded the policy. - validateUnaggregatedDecodeResults(t, it, []metricWithPolicies{input}, io.EOF) + validateUnaggregatedDecodeResults(t, it, []metricWithPoliciesList{input}, io.EOF) } func TestUnaggregatedIteratorDecodePolicyMoreFieldsThanExpected(t *testing.T) { - input := metricWithPolicies{ + input := metricWithPoliciesList{ metric: testGauge, - versionedPolicies: policy.CustomVersionedPolicies( - 1, - time.Now(), - []policy.Policy{ - policy.NewPolicy(time.Second, xtime.Second, time.Hour), - }, - ), + policiesList: policy.PoliciesList{ + policy.NewStagedPolicies( + time.Now().UnixNano(), + true, + []policy.Policy{ + policy.NewPolicy(time.Second, xtime.Second, time.Hour), + }, + ), + }, } enc := testUnaggregatedEncoder(t).(*unaggregatedEncoder) baseEncoder := enc.encoderBase.(*baseEncoder) @@ -464,52 +472,43 @@ func TestUnaggregatedIteratorDecodePolicyMoreFieldsThanExpected(t *testing.T) { baseEncoder.encodeRetention(p.Retention()) baseEncoder.encodeVarint(0) } - require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.versionedPolicies)) + require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.policiesList)) it := testUnaggregatedIterator(t, enc.Encoder().Buffer()) // Check that we successfully decoded the policy. - validateUnaggregatedDecodeResults(t, it, []metricWithPolicies{input}, io.EOF) + validateUnaggregatedDecodeResults(t, it, []metricWithPoliciesList{input}, io.EOF) } -func TestUnaggregatedIteratorDecodeVersionedPoliciesMoreFieldsThanExpected(t *testing.T) { - input := metricWithPolicies{ - metric: testGauge, - versionedPolicies: policy.CustomVersionedPolicies( - 1, - time.Now(), - []policy.Policy{ - policy.NewPolicy(time.Second, xtime.Second, time.Hour), - }, - ), +func TestUnaggregatedIteratorDecodePoliciesListMoreFieldsThanExpected(t *testing.T) { + input := metricWithPoliciesList{ + metric: testGauge, + policiesList: testSingleCustomStagedPolicies, } enc := testUnaggregatedEncoder(t).(*unaggregatedEncoder) // Pretend we added an extra int field to the policy object. - enc.encodeVersionedPoliciesFn = func(vp policy.VersionedPolicies) { - enc.encodeNumObjectFields(numFieldsForType(customVersionedPoliciesType) + 1) - enc.encodeObjectType(customVersionedPoliciesType) - enc.encodeVersion(vp.Version) - enc.encodeTime(vp.Cutover) - policies := vp.Policies() - enc.encodeArrayLen(len(policies)) - for _, policy := range policies { - enc.encodePolicy(policy) + enc.encodePoliciesListFn = func(pl policy.PoliciesList) { + enc.encodeNumObjectFields(numFieldsForType(customPoliciesListType) + 1) + enc.encodeObjectType(customPoliciesListType) + enc.encodeArrayLen(len(pl)) + for _, sp := range pl { + enc.encodeStagedPolicies(sp) } enc.encodeVarint(0) } - require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.versionedPolicies)) + require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.policiesList)) it := testUnaggregatedIterator(t, enc.Encoder().Buffer()) // Check that we successfully decoded the policy. - validateUnaggregatedDecodeResults(t, it, []metricWithPolicies{input}, io.EOF) + validateUnaggregatedDecodeResults(t, it, []metricWithPoliciesList{input}, io.EOF) } func TestUnaggregatedIteratorDecodeCounterFewerFieldsThanExpected(t *testing.T) { - input := metricWithPolicies{ - metric: testCounter, - versionedPolicies: testDefaultVersionedPolicies, + input := metricWithPoliciesList{ + metric: testCounter, + policiesList: testDefaultStagedPolicies, } enc := testUnaggregatedEncoder(t).(*unaggregatedEncoder) @@ -518,7 +517,7 @@ func TestUnaggregatedIteratorDecodeCounterFewerFieldsThanExpected(t *testing.T) enc.encodeNumObjectFields(numFieldsForType(counterType) - 1) enc.encodeID(c.ID) } - require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.versionedPolicies)) + require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.policiesList)) it := testUnaggregatedIterator(t, enc.Encoder().Buffer()) @@ -554,12 +553,12 @@ func TestUnaggregatedIteratorClose(t *testing.T) { } func TestUnaggregatedIteratorDecodeInvalidTimeUnit(t *testing.T) { - input := metricWithPolicies{ - metric: testCounter, - versionedPolicies: testVersionedPoliciesWithInvalidTimeUnit, + input := metricWithPoliciesList{ + metric: testCounter, + policiesList: testStagedPoliciesWithInvalidTimeUnit, } enc := testUnaggregatedEncoder(t) - require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.versionedPolicies)) + require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.policiesList)) it := testUnaggregatedIterator(t, enc.Encoder().Buffer()) validateUnaggregatedDecodeResults(t, it, nil, errors.New("invalid precision unknown")) } @@ -567,16 +566,16 @@ func TestUnaggregatedIteratorDecodeInvalidTimeUnit(t *testing.T) { func validateUnaggregatedDecodeResults( t *testing.T, it UnaggregatedIterator, - expectedResults []metricWithPolicies, + expectedResults []metricWithPoliciesList, expectedErr error, ) { - var results []metricWithPolicies + var results []metricWithPoliciesList for it.Next() { - value, policies := it.Value() - policies = toVersionedPolicies(t, policies) - results = append(results, metricWithPolicies{ - metric: value, - versionedPolicies: policies, + value, policiesList := it.Value() + policiesList = toPoliciesList(t, policiesList) + results = append(results, metricWithPoliciesList{ + metric: value, + policiesList: policiesList, }) } require.Equal(t, expectedErr, it.Err()) diff --git a/protocol/msgpack/unaggregated_roundtrip_test.go b/protocol/msgpack/unaggregated_roundtrip_test.go index f449020..ce05032 100644 --- a/protocol/msgpack/unaggregated_roundtrip_test.go +++ b/protocol/msgpack/unaggregated_roundtrip_test.go @@ -54,102 +54,109 @@ var ( GaugeVal: 123.456, } - testDefaultVersionedPolicies = policy.DefaultVersionedPolicies( - policy.DefaultPolicyVersion, - time.Now(), - ) - - testCustomVersionedPolicies = policy.CustomVersionedPolicies( - 2, - time.Now(), - []policy.Policy{ - policy.NewPolicy(20*time.Second, xtime.Second, 6*time.Hour), - policy.NewPolicy(time.Minute, xtime.Minute, 2*24*time.Hour), - policy.NewPolicy(10*time.Minute, xtime.Minute, 25*24*time.Hour), - }, - ) + testDefaultStagedPolicies = policy.DefaultPoliciesList - testVersionedPoliciesWithInvalidTimeUnit = policy.CustomVersionedPolicies( - 1, - time.Now(), - []policy.Policy{ - policy.NewPolicy(time.Second, xtime.Unit(100), time.Hour), - }, - ) + testSingleCustomStagedPolicies = policy.PoliciesList{ + policy.NewStagedPolicies( + time.Now().UnixNano(), + false, + []policy.Policy{ + policy.NewPolicy(20*time.Second, xtime.Second, 6*time.Hour), + policy.NewPolicy(time.Minute, xtime.Minute, 2*24*time.Hour), + policy.NewPolicy(10*time.Minute, xtime.Minute, 25*24*time.Hour), + }, + ), + } - testInputWithAllTypesAndDefaultPolicies = []metricWithPolicies{ + testStagedPoliciesWithInvalidTimeUnit = policy.PoliciesList{ + policy.NewStagedPolicies( + time.Now().UnixNano(), + true, + []policy.Policy{ + policy.NewPolicy(time.Second, xtime.Unit(100), time.Hour), + }, + ), + } + + testInputWithAllTypesAndDefaultPolicies = []metricWithPoliciesList{ { - metric: testCounter, - versionedPolicies: testDefaultVersionedPolicies, + metric: testCounter, + policiesList: testDefaultStagedPolicies, }, { - metric: testBatchTimer, - versionedPolicies: testDefaultVersionedPolicies, + metric: testBatchTimer, + policiesList: testDefaultStagedPolicies, }, { - metric: testGauge, - versionedPolicies: testDefaultVersionedPolicies, + metric: testGauge, + policiesList: testDefaultStagedPolicies, }, } - testInputWithAllTypesAndCustomPolicies = []metricWithPolicies{ + testInputWithAllTypesAndSingleCustomPolicies = []metricWithPoliciesList{ // Retain this metric at 20 second resolution for 6 hours, // then 1 minute for 2 days, then 10 minutes for 25 days. { metric: testBatchTimer, - versionedPolicies: policy.CustomVersionedPolicies( - 2, - time.Now(), - []policy.Policy{ - policy.NewPolicy(20*time.Second, xtime.Second, 6*time.Hour), - policy.NewPolicy(time.Minute, xtime.Minute, 2*24*time.Hour), - policy.NewPolicy(10*time.Minute, xtime.Minute, 25*24*time.Hour), - }, - ), + policiesList: policy.PoliciesList{ + policy.NewStagedPolicies( + time.Now().UnixNano(), + false, + []policy.Policy{ + policy.NewPolicy(20*time.Second, xtime.Second, 6*time.Hour), + policy.NewPolicy(time.Minute, xtime.Minute, 2*24*time.Hour), + policy.NewPolicy(10*time.Minute, xtime.Minute, 25*24*time.Hour), + }, + ), + }, }, // Retain this metric at 1 second resolution for 1 hour. { metric: testCounter, - versionedPolicies: policy.CustomVersionedPolicies( - 1, - time.Now(), - []policy.Policy{ - policy.NewPolicy(time.Second, xtime.Second, time.Hour), - }, - ), + policiesList: policy.PoliciesList{ + policy.NewStagedPolicies( + time.Now().UnixNano(), + true, + []policy.Policy{ + policy.NewPolicy(time.Second, xtime.Second, time.Hour), + }, + ), + }, }, // Retain this metric at 10 minute resolution for 45 days. { metric: testGauge, - versionedPolicies: policy.CustomVersionedPolicies( - 2, - time.Now(), - []policy.Policy{ - policy.NewPolicy(10*time.Minute, xtime.Minute, 45*24*time.Hour), - }, - ), + policiesList: policy.PoliciesList{ + policy.NewStagedPolicies( + time.Now().UnixNano(), + false, + []policy.Policy{ + policy.NewPolicy(10*time.Minute, xtime.Minute, 45*24*time.Hour), + }, + ), + }, }, } ) func TestUnaggregatedEncodeDecodeCounterWithDefaultPolicies(t *testing.T) { - validateUnaggregatedRoundtrip(t, metricWithPolicies{ - metric: testCounter, - versionedPolicies: testDefaultVersionedPolicies, + validateUnaggregatedRoundtrip(t, metricWithPoliciesList{ + metric: testCounter, + policiesList: testDefaultStagedPolicies, }) } func TestUnaggregatedEncodeDecodeBatchTimerWithDefaultPolicies(t *testing.T) { - validateUnaggregatedRoundtrip(t, metricWithPolicies{ - metric: testBatchTimer, - versionedPolicies: testDefaultVersionedPolicies, + validateUnaggregatedRoundtrip(t, metricWithPoliciesList{ + metric: testBatchTimer, + policiesList: testDefaultStagedPolicies, }) } func TestUnaggregatedEncodeDecodeGaugeWithDefaultPolicies(t *testing.T) { - validateUnaggregatedRoundtrip(t, metricWithPolicies{ - metric: testGauge, - versionedPolicies: testDefaultVersionedPolicies, + validateUnaggregatedRoundtrip(t, metricWithPoliciesList{ + metric: testGauge, + policiesList: testDefaultStagedPolicies, }) } @@ -158,51 +165,53 @@ func TestUnaggregatedEncodeDecodeAllTypesWithDefaultPolicies(t *testing.T) { } func TestUnaggregatedEncodeDecodeAllTypesWithCustomPolicies(t *testing.T) { - validateUnaggregatedRoundtrip(t, testInputWithAllTypesAndCustomPolicies...) + validateUnaggregatedRoundtrip(t, testInputWithAllTypesAndSingleCustomPolicies...) } func TestUnaggregatedEncodeDecodeStress(t *testing.T) { numIter := 10 numMetrics := 10000 allMetrics := []unaggregated.MetricUnion{testCounter, testBatchTimer, testGauge} - allPolicies := []policy.VersionedPolicies{ - testDefaultVersionedPolicies, - policy.CustomVersionedPolicies( - 2, - time.Now(), - []policy.Policy{ - policy.NewPolicy(time.Second, xtime.Second, 6*time.Hour), - policy.NewPolicy(time.Minute, xtime.Minute, 2*24*time.Hour), - }, - ), + allPolicies := []policy.PoliciesList{ + testDefaultStagedPolicies, + policy.PoliciesList{ + policy.NewStagedPolicies( + time.Now().UnixNano(), + false, + []policy.Policy{ + policy.NewPolicy(time.Second, xtime.Second, 6*time.Hour), + policy.NewPolicy(time.Minute, xtime.Minute, 2*24*time.Hour), + }, + ), + }, } encoder := testUnaggregatedEncoder(t) iterator := testUnaggregatedIterator(t, nil) for i := 0; i < numIter; i++ { - var inputs []metricWithPolicies + var inputs []metricWithPoliciesList for j := 0; j < numMetrics; j++ { m := allMetrics[rand.Int63n(int64(len(allMetrics)))] p := allPolicies[rand.Int63n(int64(len(allPolicies)))] - inputs = append(inputs, metricWithPolicies{metric: m, versionedPolicies: p}) + inputs = append(inputs, metricWithPoliciesList{metric: m, policiesList: p}) } validateUnaggregatedRoundtripWithEncoderAndIterator(t, encoder, iterator, inputs...) } } -type metricWithPolicies struct { - metric unaggregated.MetricUnion - versionedPolicies policy.VersionedPolicies +type metricWithPoliciesList struct { + metric unaggregated.MetricUnion + policiesList policy.PoliciesList } func testCapturingBaseEncoder(encoder encoderBase) *[]interface{} { baseEncoder := encoder.(*baseEncoder) var result []interface{} - baseEncoder.encodeTimeFn = func(value time.Time) { + baseEncoder.encodeVarintFn = func(value int64) { result = append(result, value) } - baseEncoder.encodeVarintFn = func(value int64) { + baseEncoder.encodeBoolFn = func(value bool) { result = append(result, value) } baseEncoder.encodeFloat64Fn = func(value float64) { @@ -230,22 +239,22 @@ func testUnaggregatedIterator(t *testing.T, reader io.Reader) UnaggregatedIterat return NewUnaggregatedIterator(reader, opts) } -func testUnaggregatedEncode(t *testing.T, encoder UnaggregatedEncoder, m unaggregated.MetricUnion, p policy.VersionedPolicies) error { +func testUnaggregatedEncode(t *testing.T, encoder UnaggregatedEncoder, m unaggregated.MetricUnion, pl policy.PoliciesList) error { switch m.Type { case unaggregated.CounterType: - return encoder.EncodeCounterWithPolicies(unaggregated.CounterWithPolicies{ - Counter: m.Counter(), - VersionedPolicies: p, + return encoder.EncodeCounterWithPoliciesList(unaggregated.CounterWithPoliciesList{ + Counter: m.Counter(), + PoliciesList: pl, }) case unaggregated.BatchTimerType: - return encoder.EncodeBatchTimerWithPolicies(unaggregated.BatchTimerWithPolicies{ - BatchTimer: m.BatchTimer(), - VersionedPolicies: p, + return encoder.EncodeBatchTimerWithPoliciesList(unaggregated.BatchTimerWithPoliciesList{ + BatchTimer: m.BatchTimer(), + PoliciesList: pl, }) case unaggregated.GaugeType: - return encoder.EncodeGaugeWithPolicies(unaggregated.GaugeWithPolicies{ - Gauge: m.Gauge(), - VersionedPolicies: p, + return encoder.EncodeGaugeWithPoliciesList(unaggregated.GaugeWithPoliciesList{ + Gauge: m.Gauge(), + PoliciesList: pl, }) default: return fmt.Errorf("unrecognized metric type %v", m.Type) @@ -266,7 +275,7 @@ func compareUnaggregatedMetric(t *testing.T, expected unaggregated.MetricUnion, } } -func validateUnaggregatedRoundtrip(t *testing.T, inputs ...metricWithPolicies) { +func validateUnaggregatedRoundtrip(t *testing.T, inputs ...metricWithPoliciesList) { encoder := testUnaggregatedEncoder(t) it := testUnaggregatedIterator(t, nil) validateUnaggregatedRoundtripWithEncoderAndIterator(t, encoder, it, inputs...) @@ -276,29 +285,29 @@ func validateUnaggregatedRoundtripWithEncoderAndIterator( t *testing.T, encoder UnaggregatedEncoder, it UnaggregatedIterator, - inputs ...metricWithPolicies, + inputs ...metricWithPoliciesList, ) { - var results []metricWithPolicies + var results []metricWithPoliciesList // Encode the batch of metrics. encoder.Reset(NewBufferedEncoder()) for _, input := range inputs { - testUnaggregatedEncode(t, encoder, input.metric, input.versionedPolicies) + testUnaggregatedEncode(t, encoder, input.metric, input.policiesList) } // Decode the batch of metrics. byteStream := bytes.NewBuffer(encoder.Encoder().Bytes()) it.Reset(byteStream) for it.Next() { - m, p := it.Value() + m, pl := it.Value() - // Make a copy of cached policies because it becomes invalid + // Make a copy of cached policies list because it becomes invalid // on the next Next() call. - p = toVersionedPolicies(t, p) + pl = toPoliciesList(t, pl) - results = append(results, metricWithPolicies{ - metric: m, - versionedPolicies: p, + results = append(results, metricWithPoliciesList{ + metric: m, + policiesList: pl, }) } @@ -307,27 +316,29 @@ func validateUnaggregatedRoundtripWithEncoderAndIterator( validateMetricsWithPolicies(t, inputs, results) } -func validateMetricsWithPolicies(t *testing.T, inputs, results []metricWithPolicies) { +func validateMetricsWithPolicies(t *testing.T, inputs, results []metricWithPoliciesList) { require.Equal(t, len(inputs), len(results)) for i := 0; i < len(inputs); i++ { compareUnaggregatedMetric(t, inputs[i].metric, results[i].metric) - require.Equal(t, inputs[i].versionedPolicies, results[i].versionedPolicies) + require.Equal(t, inputs[i].policiesList, results[i].policiesList) } } -func toVersionedPolicies(t *testing.T, p policy.VersionedPolicies) policy.VersionedPolicies { - var ( - policies []policy.Policy - versionedPolicies policy.VersionedPolicies - ) - versionedPolicies = p - if versionedPolicies.IsDefault() { - return versionedPolicies - } - policies = make([]policy.Policy, len(p.Policies())) +func toStagedPolicies(t *testing.T, p policy.StagedPolicies) policy.StagedPolicies { + policies := make([]policy.Policy, len(p.Policies())) for i, policy := range p.Policies() { policies[i] = policy } - versionedPolicies = policy.CustomVersionedPolicies(p.Version, p.Cutover, policies) - return versionedPolicies + return policy.NewStagedPolicies(p.CutoverNs, p.Tombstoned, policies) +} + +func toPoliciesList(t *testing.T, pl policy.PoliciesList) policy.PoliciesList { + if pl.IsDefault() { + return policy.DefaultPoliciesList + } + policiesList := make(policy.PoliciesList, 0, len(pl)) + for i := 0; i < len(pl); i++ { + policiesList = append(policiesList, toStagedPolicies(t, pl[i])) + } + return policiesList } diff --git a/protocol/msgpack/wire_format.md b/protocol/msgpack/wire_format.md index 4dd3052..908e3fc 100644 --- a/protocol/msgpack/wire_format.md +++ b/protocol/msgpack/wire_format.md @@ -5,24 +5,24 @@ * Number of root object fields * Root object type * Root object (can be one of the following): - * CounterWithPolicies - * BatchTimerWithPolicies - * GaugeWithPolicies + * CounterWithPoliciesList + * BatchTimerWithPoliciesList + * GaugeWithPoliciesList -* CounterWithPolicies object - * Number of CounterWithPolicies fields +* CounterWithPoliciesList object + * Number of CounterWithPoliciesList fields * Counter object - * VersionedPolicies object + * PoliciesList object -* BatchTimerWithPolicies object - * Number of BatchTimerWithPolicies fields +* BatchTimerWithPoliciesList object + * Number of BatchTimerWithPoliciesList fields * BatchTimer object - * VersionedPolicies object + * PoliciesList object -* GaugeWithPolicies object - * Number of GaugeWithPolicies fields +* GaugeWithPoliciesList object + * Number of GaugeWithPoliciesList fields * Gauge object - * VersionedPolicies object + * PoliciesList object * Counter object * Number of Counter fields @@ -39,16 +39,20 @@ * Gauge ID * Gauge value -* VersionedPolicies object - * Number of VersionedPolicies fields - * Versioned Policies (can be one of the following) - * DefaultVersionedPolicies - * Versioned Policies type - * CustomVersionedPolicies - * Versioned Policies type - * Version - * Cutover - * List of Policy objects +* PoliciesList object + * Number of PoliciesList fields + * PoliciesList (can be one of the following) + * DefaultPoliciesList + * PoliciesList type + * CustomPoliciesList + * PoliciesList type + * List of StagedPolicies objects + +* StagedPolicies object + * Number of StagedPolicies fields + * Cutover + * Tombstoned + * List of Policy objects * Policy object * Number of Policy fields diff --git a/rules/result.go b/rules/result.go index 273c4fb..d9bbb77 100644 --- a/rules/result.go +++ b/rules/result.go @@ -21,6 +21,7 @@ package rules import ( + "bytes" "time" "github.com/m3db/m3metrics/policy" @@ -28,35 +29,36 @@ import ( var ( // EmptyMatchResult is the result when no matches were found. - EmptyMatchResult = NewMatchResult(0, 0, timeNsMax, nil, nil) + EmptyMatchResult = NewMatchResult(timeNsMax, policy.DefaultPoliciesList, nil) ) -// RollupResult contains the rollup metric id and the associated policies. +// RollupResult contains the rollup metric id and the associated policies list. type RollupResult struct { - ID []byte - Policies []policy.Policy + ID []byte + PoliciesList policy.PoliciesList } +// RollupResultsByIDAsc sorts rollup results by id in ascending order. +type RollupResultsByIDAsc []RollupResult + +func (a RollupResultsByIDAsc) Len() int { return len(a) } +func (a RollupResultsByIDAsc) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a RollupResultsByIDAsc) Less(i, j int) bool { return bytes.Compare(a[i].ID, a[j].ID) < 0 } + // MatchResult represents a match result. type MatchResult struct { - version int - cutoverNs int64 expireAtNs int64 - mappings []policy.Policy + mappings policy.PoliciesList rollups []RollupResult } // NewMatchResult creates a new match result. func NewMatchResult( - version int, - cutoverNs int64, expireAtNs int64, - mappings []policy.Policy, + mappings policy.PoliciesList, rollups []RollupResult, ) MatchResult { return MatchResult{ - version: version, - cutoverNs: cutoverNs, expireAtNs: expireAtNs, mappings: mappings, rollups: rollups, @@ -66,26 +68,31 @@ func NewMatchResult( // HasExpired returns whether the match result has expired for a given time. func (r *MatchResult) HasExpired(t time.Time) bool { return r.expireAtNs <= t.UnixNano() } -// NumRollups returns the number of rollup result associated with the given id. +// NumRollups returns the number of rollup metrics. func (r *MatchResult) NumRollups() int { return len(r.rollups) } -// Mappings returns the mapping policies for the given id. -func (r *MatchResult) Mappings() policy.VersionedPolicies { - return r.versionedPolicies(r.mappings) +// MappingsAt returns the active mapping policies at a given time. +func (r *MatchResult) MappingsAt(t time.Time) policy.PoliciesList { + return activePoliciesAt(r.mappings, t) } -// Rollups returns the rollup metric id and corresponding policies at a given index. -func (r *MatchResult) Rollups(idx int) ([]byte, policy.VersionedPolicies) { +// RollupsAt returns the rollup metric id and corresponding policies at a given index and time. +func (r *MatchResult) RollupsAt(idx int, t time.Time) RollupResult { rollup := r.rollups[idx] - return rollup.ID, r.versionedPolicies(rollup.Policies) + return RollupResult{ + ID: rollup.ID, + PoliciesList: activePoliciesAt(rollup.PoliciesList, t), + } } -// TODO(xichen): change versioned policies to use int64 instead. -func (r *MatchResult) versionedPolicies(policies []policy.Policy) policy.VersionedPolicies { - // NB(xichen): if there are no policies for this id, we fall - // back to the default mapping policies. - if len(policies) == 0 { - return policy.DefaultVersionedPolicies(r.version, time.Unix(0, r.cutoverNs)) +// activePolicies returns the active policies at a given time, assuming +// the input policies are sorted by cutover time in ascending order. +func activePoliciesAt(policies policy.PoliciesList, t time.Time) policy.PoliciesList { + timeNs := t.UnixNano() + for idx := len(policies) - 1; idx >= 0; idx-- { + if policies[idx].CutoverNs <= timeNs { + return policies[idx:] + } } - return policy.CustomVersionedPolicies(r.version, time.Unix(0, r.cutoverNs), policies) + return policies } diff --git a/rules/result_test.go b/rules/result_test.go index e799495..f99331d 100644 --- a/rules/result_test.go +++ b/rules/result_test.go @@ -32,62 +32,64 @@ import ( func TestMatchResult(t *testing.T) { var ( - version = 1 cutoverNs = int64(12345) expireAtNs = int64(67890) - mappings = []policy.Policy{ - policy.NewPolicy(10*time.Second, xtime.Second, 12*time.Hour), - policy.NewPolicy(time.Minute, xtime.Minute, 24*time.Hour), - policy.NewPolicy(5*time.Minute, xtime.Minute, 48*time.Hour), + mappings = policy.PoliciesList{ + policy.NewStagedPolicies( + cutoverNs, + false, + []policy.Policy{ + policy.NewPolicy(10*time.Second, xtime.Second, 12*time.Hour), + policy.NewPolicy(time.Minute, xtime.Minute, 24*time.Hour), + policy.NewPolicy(5*time.Minute, xtime.Minute, 48*time.Hour), + }, + ), } rollups = []RollupResult{ { ID: b("rName1|rtagName1=rtagValue1,rtagName2=rtagValue2"), - Policies: []policy.Policy{ - policy.NewPolicy(10*time.Second, xtime.Second, 12*time.Hour), - policy.NewPolicy(time.Minute, xtime.Minute, 24*time.Hour), - policy.NewPolicy(5*time.Minute, xtime.Minute, 48*time.Hour), + PoliciesList: policy.PoliciesList{ + policy.NewStagedPolicies( + cutoverNs, + false, + []policy.Policy{ + policy.NewPolicy(10*time.Second, xtime.Second, 12*time.Hour), + policy.NewPolicy(time.Minute, xtime.Minute, 24*time.Hour), + policy.NewPolicy(5*time.Minute, xtime.Minute, 48*time.Hour), + }, + ), }, }, { - ID: b("rName2|rtagName1=rtagValue1"), - Policies: []policy.Policy{}, + ID: b("rName2|rtagName1=rtagValue1"), + PoliciesList: policy.PoliciesList{policy.NewStagedPolicies(cutoverNs, false, nil)}, }, } ) - res := NewMatchResult(version, cutoverNs, expireAtNs, mappings, rollups) + res := NewMatchResult(expireAtNs, mappings, rollups) require.False(t, res.HasExpired(time.Unix(0, 0))) require.True(t, res.HasExpired(time.Unix(0, 100000))) - expectedMappings := policy.CustomVersionedPolicies(version, time.Unix(0, 12345), mappings) - require.Equal(t, expectedMappings, res.Mappings()) + require.Equal(t, mappings, res.MappingsAt(time.Unix(0, 0))) var ( expectedRollupIDs = [][]byte{ b("rName1|rtagName1=rtagValue1,rtagName2=rtagValue2"), b("rName2|rtagName1=rtagValue1"), } - expectedRollupPolicies = []policy.VersionedPolicies{ - policy.CustomVersionedPolicies( - 1, - time.Unix(0, 12345), - []policy.Policy{ - policy.NewPolicy(10*time.Second, xtime.Second, 12*time.Hour), - policy.NewPolicy(time.Minute, xtime.Minute, 24*time.Hour), - policy.NewPolicy(5*time.Minute, xtime.Minute, 48*time.Hour), - }, - ), - policy.DefaultVersionedPolicies(1, time.Unix(0, 12345)), + expectedRollupPolicies = policy.PoliciesList{ + rollups[0].PoliciesList[0], + rollups[1].PoliciesList[0], } rollupIDs [][]byte - rollupPolicies []policy.VersionedPolicies + rollupPolicies policy.PoliciesList ) require.Equal(t, 2, res.NumRollups()) for i := 0; i < 2; i++ { - id, policies := res.Rollups(i) - rollupIDs = append(rollupIDs, id) - rollupPolicies = append(rollupPolicies, policies) + rollup := res.RollupsAt(i, time.Unix(0, 0)) + rollupIDs = append(rollupIDs, rollup.ID) + rollupPolicies = append(rollupPolicies, rollup.PoliciesList...) } require.Equal(t, expectedRollupIDs, rollupIDs) require.Equal(t, expectedRollupPolicies, rollupPolicies) diff --git a/rules/ruleset.go b/rules/ruleset.go index d6880a1..927befd 100644 --- a/rules/ruleset.go +++ b/rules/ruleset.go @@ -48,12 +48,11 @@ type TagPair struct { // Matcher matches metrics against rules to determine applicable policies. type Matcher interface { - // Match returns applicable policies given a metric id for a given time. - Match(id []byte, t time.Time) MatchResult + // MatchAll returns all applicable policies given a metric id between [from, to). + MatchAll(id []byte, from time.Time, to time.Time) MatchResult } type activeRuleSet struct { - version int iterFn filters.NewSortedTagIteratorFn newIDFn NewIDFn mappingRules []*mappingRule @@ -62,7 +61,6 @@ type activeRuleSet struct { } func newActiveRuleSet( - version int, iterFn filters.NewSortedTagIteratorFn, newIDFn NewIDFn, mappingRules []*mappingRule, @@ -80,38 +78,48 @@ func newActiveRuleSet( } } - cutoverTimeAsc := make([]int64, 0, len(uniqueCutoverTimes)) + cutoverTimesAsc := make([]int64, 0, len(uniqueCutoverTimes)) for t := range uniqueCutoverTimes { - cutoverTimeAsc = append(cutoverTimeAsc, t) + cutoverTimesAsc = append(cutoverTimesAsc, t) } - sort.Sort(int64Asc(cutoverTimeAsc)) + sort.Sort(int64Asc(cutoverTimesAsc)) return &activeRuleSet{ - version: version, iterFn: iterFn, newIDFn: newIDFn, mappingRules: mappingRules, rollupRules: rollupRules, - cutoverTimesAsc: cutoverTimeAsc, + cutoverTimesAsc: cutoverTimesAsc, } } -func (as *activeRuleSet) Match(id []byte, t time.Time) MatchResult { - timeNs := t.UnixNano() - mappingCutoverNs, mappingPolicies := as.matchMappings(id, timeNs) - rollupCutoverNs, rollupResults := as.matchRollups(id, timeNs) +// NB(xichen): could make this more efficient by keeping track of matched rules +// at previous iteration and incrementally update match results. +func (as *activeRuleSet) MatchAll(id []byte, from time.Time, to time.Time) MatchResult { + var ( + fromNs = from.UnixNano() + toNs = to.UnixNano() + currMappingResults = policy.PoliciesList{as.matchMappings(id, fromNs)} + currRollupResults = as.matchRollups(id, fromNs) + nextIdx = as.nextCutoverIdx(fromNs) + nextCutoverNs = as.cutoverNsAt(nextIdx) + ) - // NB(xichen): we take the latest cutover time across all rules matched. This is - // used to determine whether the match result is valid for a given time. - cutoverNs := int64(math.Max(float64(mappingCutoverNs), float64(rollupCutoverNs))) + for nextIdx < len(as.cutoverTimesAsc) && nextCutoverNs < toNs { + nextMappingPolicies := as.matchMappings(id, nextCutoverNs) + nextRollupResults := as.matchRollups(id, nextCutoverNs) + currMappingResults = mergeMappingResults(currMappingResults, nextMappingPolicies) + currRollupResults = mergeRollupResults(currRollupResults, nextRollupResults, nextCutoverNs) + nextIdx++ + nextCutoverNs = as.cutoverNsAt(nextIdx) + } // The result expires when it reaches the first cutover time after t among all // active rules because the metric may then be matched against a different set of rules. - expireAtNs := as.nextCutover(timeNs) - return NewMatchResult(as.version, cutoverNs, expireAtNs, mappingPolicies, rollupResults) + return NewMatchResult(nextCutoverNs, currMappingResults, currRollupResults) } -func (as *activeRuleSet) matchMappings(id []byte, timeNs int64) (int64, []policy.Policy) { +func (as *activeRuleSet) matchMappings(id []byte, timeNs int64) policy.StagedPolicies { // TODO(xichen): pool the policies. var ( cutoverNs int64 @@ -119,10 +127,7 @@ func (as *activeRuleSet) matchMappings(id []byte, timeNs int64) (int64, []policy ) for _, mappingRule := range as.mappingRules { snapshot := mappingRule.ActiveSnapshot(timeNs) - if snapshot == nil { - continue - } - if !snapshot.filter.Matches(id) { + if snapshot == nil || snapshot.tombstoned || !snapshot.filter.Matches(id) { continue } if cutoverNs < snapshot.cutoverNs { @@ -130,10 +135,11 @@ func (as *activeRuleSet) matchMappings(id []byte, timeNs int64) (int64, []policy } policies = append(policies, snapshot.policies...) } - return cutoverNs, resolvePolicies(policies) + resolved := resolvePolicies(policies) + return policy.NewStagedPolicies(cutoverNs, false, resolved) } -func (as *activeRuleSet) matchRollups(id []byte, timeNs int64) (int64, []RollupResult) { +func (as *activeRuleSet) matchRollups(id []byte, timeNs int64) []RollupResult { // TODO(xichen): pool the rollup targets. var ( cutoverNs int64 @@ -141,10 +147,7 @@ func (as *activeRuleSet) matchRollups(id []byte, timeNs int64) (int64, []RollupR ) for _, rollupRule := range as.rollupRules { snapshot := rollupRule.ActiveSnapshot(timeNs) - if snapshot == nil { - continue - } - if !snapshot.filter.Matches(id) { + if snapshot == nil || snapshot.tombstoned || !snapshot.filter.Matches(id) { continue } if cutoverNs < snapshot.cutoverNs { @@ -173,11 +176,11 @@ func (as *activeRuleSet) matchRollups(id []byte, timeNs int64) (int64, []RollupR rollups[i].Policies = resolvePolicies(rollups[i].Policies) } - return cutoverNs, as.toRollupResults(id, rollups) + return as.toRollupResults(id, cutoverNs, rollups) } // toRollupResults encodes rollup target name and values into ids for each rollup target. -func (as *activeRuleSet) toRollupResults(id []byte, targets []rollupTarget) []RollupResult { +func (as *activeRuleSet) toRollupResults(id []byte, cutoverNs int64, targets []rollupTarget) []RollupResult { // NB(r): This is n^2 however this should be quite fast still as // long as there is not an absurdly high number of rollup // targets for any given ID and that iterfn is alloc free. @@ -186,6 +189,9 @@ func (as *activeRuleSet) toRollupResults(id []byte, targets []rollupTarget) []Ro // any given ID would match a relatively low number of rollups. // TODO(xichen): pool tag pairs and rollup results. + if len(targets) == 0 { + return nil + } var tagPairs []TagPair rollups := make([]RollupResult, 0, len(targets)) for _, target := range targets { @@ -202,18 +208,19 @@ func (as *activeRuleSet) toRollupResults(id []byte, targets []rollupTarget) []Ro iter.Close() } result := RollupResult{ - ID: as.newIDFn(target.Name, tagPairs), - Policies: target.Policies, + ID: as.newIDFn(target.Name, tagPairs), + PoliciesList: policy.PoliciesList{policy.NewStagedPolicies(cutoverNs, false, target.Policies)}, } rollups = append(rollups, result) } + sort.Sort(RollupResultsByIDAsc(rollups)) return rollups } -// nextCutover returns the next cutover time after t. +// nextCutoverIdx returns the next snapshot index whose cutover time is after t. // NB(xichen): not using sort.Search to avoid a lambda capture. -func (as *activeRuleSet) nextCutover(t int64) int64 { +func (as *activeRuleSet) nextCutoverIdx(t int64) int { i, j := 0, len(as.cutoverTimesAsc) for i < j { h := i + (j-i)/2 @@ -223,8 +230,13 @@ func (as *activeRuleSet) nextCutover(t int64) int64 { j = h } } - if i < len(as.cutoverTimesAsc) { - return as.cutoverTimesAsc[i] + return i +} + +// cutoverNsAt returns the cutover time at given index. +func (as *activeRuleSet) cutoverNsAt(idx int) int64 { + if idx < len(as.cutoverTimesAsc) { + return as.cutoverTimesAsc[idx] } return timeNsMax } @@ -315,7 +327,7 @@ func (rs *ruleSet) ActiveSet(t time.Time) Matcher { activeRule := rollupRule.ActiveRule(timeNs) rollupRules = append(rollupRules, activeRule) } - return newActiveRuleSet(rs.version, rs.iterFn, rs.newIDFn, mappingRules, rollupRules) + return newActiveRuleSet(rs.iterFn, rs.newIDFn, mappingRules, rollupRules) } // resolvePolicies resolves the conflicts among policies if any, following the rules below: @@ -353,6 +365,80 @@ func resolvePolicies(policies []policy.Policy) []policy.Policy { return policies[:curr+1] } +func mergeMappingResults( + currMappingResults policy.PoliciesList, + nextMappingPolicies policy.StagedPolicies, +) policy.PoliciesList { + currMappingPolicies := currMappingResults[len(currMappingResults)-1] + if currMappingPolicies.SamePolicies(nextMappingPolicies) { + return currMappingResults + } + currMappingResults = append(currMappingResults, nextMappingPolicies) + return currMappingResults +} + +func mergeRollupResults( + currRollupResults []RollupResult, + nextRollupResults []RollupResult, + nextCutoverNs int64, +) []RollupResult { + var ( + numCurrRollupResults = len(currRollupResults) + numNextRollupResults = len(nextRollupResults) + currRollupIdx int + nextRollupIdx int + ) + + for currRollupIdx < numCurrRollupResults && nextRollupIdx < numNextRollupResults { + currRollupResult := currRollupResults[currRollupIdx] + nextRollupResult := nextRollupResults[nextRollupIdx] + + // If the current and the next rollup result have the same id, we merge their policies. + compareResult := bytes.Compare(currRollupResult.ID, nextRollupResult.ID) + if compareResult == 0 { + currRollupPolicies := currRollupResult.PoliciesList[len(currRollupResult.PoliciesList)-1] + nextRollupPolicies := nextRollupResult.PoliciesList[0] + if !currRollupPolicies.SamePolicies(nextRollupPolicies) { + currRollupResults[currRollupIdx].PoliciesList = append(currRollupResults[currRollupIdx].PoliciesList, nextRollupPolicies) + } + currRollupIdx++ + nextRollupIdx++ + continue + } + + // If the current id is smaller, it means the id is deleted in the next rollup result. + if compareResult < 0 { + tombstonedPolicies := policy.NewStagedPolicies(nextCutoverNs, true, nil) + currRollupResults[currRollupIdx].PoliciesList = append(currRollupResults[currRollupIdx].PoliciesList, tombstonedPolicies) + currRollupIdx++ + continue + } + + // Otherwise the current id is larger, meaning a new id is added in the next rollup result. + currRollupResults = append(currRollupResults, nextRollupResult) + nextRollupIdx++ + } + + // If there are leftover ids in the current rollup result, these ids must have been deleted + // in the next rollup result. + for currRollupIdx < numCurrRollupResults { + tombstonedPolicies := policy.NewStagedPolicies(nextCutoverNs, true, nil) + currRollupResults[currRollupIdx].PoliciesList = append(currRollupResults[currRollupIdx].PoliciesList, tombstonedPolicies) + currRollupIdx++ + } + + // If there are additional ids in the next rollup result, these ids must have been added + // in the next rollup result. + for nextRollupIdx < numNextRollupResults { + nextRollupResult := nextRollupResults[nextRollupIdx] + currRollupResults = append(currRollupResults, nextRollupResult) + nextRollupIdx++ + } + + sort.Sort(RollupResultsByIDAsc(currRollupResults)) + return currRollupResults +} + type int64Asc []int64 func (a int64Asc) Len() int { return len(a) } diff --git a/rules/ruleset_test.go b/rules/ruleset_test.go index 2b63742..f8f38c4 100644 --- a/rules/ruleset_test.go +++ b/rules/ruleset_test.go @@ -38,47 +38,58 @@ func TestActiveRuleSetMatchMappingRules(t *testing.T) { { id: "mtagName1=mtagValue1", matchAt: time.Unix(0, 25000), - cutoverNs: 22000, expireAtNs: 30000, - result: []policy.Policy{ - policy.NewPolicy(10*time.Second, xtime.Second, 12*time.Hour), - policy.NewPolicy(time.Minute, xtime.Minute, 24*time.Hour), - policy.NewPolicy(5*time.Minute, xtime.Minute, 48*time.Hour), + result: policy.PoliciesList{ + policy.NewStagedPolicies( + 22000, + false, + []policy.Policy{ + policy.NewPolicy(10*time.Second, xtime.Second, 12*time.Hour), + policy.NewPolicy(time.Minute, xtime.Minute, 24*time.Hour), + policy.NewPolicy(5*time.Minute, xtime.Minute, 48*time.Hour), + }, + ), }, }, { id: "mtagName1=mtagValue1", matchAt: time.Unix(0, 35000), - cutoverNs: 35000, expireAtNs: 100000, - result: []policy.Policy{ - policy.NewPolicy(10*time.Second, xtime.Second, 2*time.Hour), - policy.NewPolicy(30*time.Second, xtime.Second, 6*time.Hour), - policy.NewPolicy(45*time.Second, xtime.Second, 12*time.Hour), + result: policy.PoliciesList{ + policy.NewStagedPolicies( + 34000, + false, + []policy.Policy{ + policy.NewPolicy(10*time.Second, xtime.Second, 2*time.Hour), + policy.NewPolicy(30*time.Second, xtime.Second, 6*time.Hour), + }, + ), }, }, { id: "mtagName1=mtagValue2", matchAt: time.Unix(0, 25000), - cutoverNs: 24000, expireAtNs: 30000, - result: []policy.Policy{ - policy.NewPolicy(10*time.Second, xtime.Second, 24*time.Hour), + result: policy.PoliciesList{ + policy.NewStagedPolicies( + 24000, + false, + []policy.Policy{ + policy.NewPolicy(10*time.Second, xtime.Second, 24*time.Hour), + }, + ), }, }, { id: "mtagName1=mtagValue3", matchAt: time.Unix(0, 25000), - cutoverNs: 0, expireAtNs: 30000, - result: policy.DefaultVersionedPolicies(1, time.Unix(0, 25000)).Policies(), + result: policy.DefaultPoliciesList, }, } - version := 1 mappingRules := testMappingRules(t) as := newActiveRuleSet( - version, filters.NewMockSortedTagIterator, mockNewID, mappingRules, @@ -87,11 +98,9 @@ func TestActiveRuleSetMatchMappingRules(t *testing.T) { expectedCutovers := []int64{10000, 15000, 20000, 22000, 24000, 30000, 34000, 35000, 100000} require.Equal(t, expectedCutovers, as.cutoverTimesAsc) for _, input := range inputs { - res := as.Match(b(input.id), input.matchAt) - require.Equal(t, 1, res.version) - require.Equal(t, input.cutoverNs, res.cutoverNs) + res := as.MatchAll(b(input.id), input.matchAt, input.matchAt.Add(time.Nanosecond)) require.Equal(t, input.expireAtNs, res.expireAtNs) - require.Equal(t, input.result, res.Mappings().Policies()) + require.Equal(t, input.result, res.MappingsAt(time.Unix(0, 0))) } } @@ -100,21 +109,32 @@ func TestActiveRuleSetMatchRollupRules(t *testing.T) { { id: "rtagName1=rtagValue1,rtagName2=rtagValue2,rtagName3=rtagValue3", matchAt: time.Unix(0, 25000), - cutoverNs: 22000, expireAtNs: 30000, result: []RollupResult{ { ID: b("rName1|rtagName1=rtagValue1,rtagName2=rtagValue2"), - Policies: []policy.Policy{ - policy.NewPolicy(10*time.Second, xtime.Second, 12*time.Hour), - policy.NewPolicy(time.Minute, xtime.Minute, 24*time.Hour), - policy.NewPolicy(5*time.Minute, xtime.Minute, 48*time.Hour), + PoliciesList: policy.PoliciesList{ + policy.NewStagedPolicies( + 22000, + false, + []policy.Policy{ + policy.NewPolicy(10*time.Second, xtime.Second, 12*time.Hour), + policy.NewPolicy(time.Minute, xtime.Minute, 24*time.Hour), + policy.NewPolicy(5*time.Minute, xtime.Minute, 48*time.Hour), + }, + ), }, }, { ID: b("rName2|rtagName1=rtagValue1"), - Policies: []policy.Policy{ - policy.NewPolicy(10*time.Second, xtime.Second, 24*time.Hour), + PoliciesList: policy.PoliciesList{ + policy.NewStagedPolicies( + 22000, + false, + []policy.Policy{ + policy.NewPolicy(10*time.Second, xtime.Second, 24*time.Hour), + }, + ), }, }, }, @@ -122,13 +142,18 @@ func TestActiveRuleSetMatchRollupRules(t *testing.T) { { id: "rtagName1=rtagValue2", matchAt: time.Unix(0, 25000), - cutoverNs: 24000, expireAtNs: 30000, result: []RollupResult{ { ID: b("rName3|rtagName1=rtagValue2"), - Policies: []policy.Policy{ - policy.NewPolicy(time.Minute, xtime.Minute, time.Hour), + PoliciesList: policy.PoliciesList{ + policy.NewStagedPolicies( + 24000, + false, + []policy.Policy{ + policy.NewPolicy(time.Minute, xtime.Minute, time.Hour), + }, + ), }, }, }, @@ -136,16 +161,13 @@ func TestActiveRuleSetMatchRollupRules(t *testing.T) { { id: "rtagName5=rtagValue5", matchAt: time.Unix(0, 25000), - cutoverNs: 0, expireAtNs: 30000, result: []RollupResult{}, }, } - version := 1 rollupRules := testRollupRules(t) as := newActiveRuleSet( - version, filters.NewMockSortedTagIterator, mockNewID, nil, @@ -154,15 +176,14 @@ func TestActiveRuleSetMatchRollupRules(t *testing.T) { expectedCutovers := []int64{10000, 15000, 20000, 22000, 24000, 30000, 34000, 35000, 100000} require.Equal(t, expectedCutovers, as.cutoverTimesAsc) for _, input := range inputs { - res := as.Match(b(input.id), input.matchAt) - require.Equal(t, 1, res.version) - require.Equal(t, input.cutoverNs, res.cutoverNs) + res := as.MatchAll(b(input.id), input.matchAt, input.matchAt.Add(time.Nanosecond)) require.Equal(t, input.expireAtNs, res.expireAtNs) require.Equal(t, len(input.result), res.NumRollups()) for i := 0; i < len(input.result); i++ { - id, policies := res.Rollups(i) + rollup := res.RollupsAt(i, time.Unix(0, 0)) + id, policies := rollup.ID, rollup.PoliciesList require.Equal(t, input.result[i].ID, id) - require.Equal(t, input.result[i].Policies, policies.Policies()) + require.Equal(t, input.result[i].PoliciesList, policies) } } } @@ -210,61 +231,85 @@ func TestRuleSetActiveSet(t *testing.T) { { id: "mtagName1=mtagValue1", matchAt: time.Unix(0, 25000), - cutoverNs: 22000, expireAtNs: 30000, - result: []policy.Policy{ - policy.NewPolicy(10*time.Second, xtime.Second, 12*time.Hour), - policy.NewPolicy(time.Minute, xtime.Minute, 24*time.Hour), - policy.NewPolicy(5*time.Minute, xtime.Minute, 48*time.Hour), + result: policy.PoliciesList{ + policy.NewStagedPolicies( + 22000, + false, + []policy.Policy{ + policy.NewPolicy(10*time.Second, xtime.Second, 12*time.Hour), + policy.NewPolicy(time.Minute, xtime.Minute, 24*time.Hour), + policy.NewPolicy(5*time.Minute, xtime.Minute, 48*time.Hour), + }, + ), }, }, { id: "mtagName1=mtagValue1", matchAt: time.Unix(0, 35000), - cutoverNs: 35000, expireAtNs: 100000, - result: []policy.Policy{ - policy.NewPolicy(10*time.Second, xtime.Second, 2*time.Hour), - policy.NewPolicy(30*time.Second, xtime.Second, 6*time.Hour), - policy.NewPolicy(45*time.Second, xtime.Second, 12*time.Hour), + result: policy.PoliciesList{ + policy.NewStagedPolicies( + 34000, + false, + []policy.Policy{ + policy.NewPolicy(10*time.Second, xtime.Second, 2*time.Hour), + policy.NewPolicy(30*time.Second, xtime.Second, 6*time.Hour), + }, + ), }, }, { id: "mtagName1=mtagValue2", matchAt: time.Unix(0, 25000), - cutoverNs: 24000, expireAtNs: 30000, - result: []policy.Policy{ - policy.NewPolicy(10*time.Second, xtime.Second, 24*time.Hour), + result: policy.PoliciesList{ + policy.NewStagedPolicies( + 24000, + false, + []policy.Policy{ + policy.NewPolicy(10*time.Second, xtime.Second, 24*time.Hour), + }, + ), }, }, { id: "mtagName1=mtagValue3", matchAt: time.Unix(0, 25000), - cutoverNs: 0, expireAtNs: 30000, - result: policy.DefaultVersionedPolicies(1, time.Unix(0, 25000)).Policies(), + result: policy.DefaultPoliciesList, }, }, rollupInputs: []testRollupResultsData{ { id: "rtagName1=rtagValue1,rtagName2=rtagValue2,rtagName3=rtagValue3", matchAt: time.Unix(0, 25000), - cutoverNs: 22000, expireAtNs: 30000, result: []RollupResult{ { ID: b("rName1|rtagName1=rtagValue1,rtagName2=rtagValue2"), - Policies: []policy.Policy{ - policy.NewPolicy(10*time.Second, xtime.Second, 12*time.Hour), - policy.NewPolicy(time.Minute, xtime.Minute, 24*time.Hour), - policy.NewPolicy(5*time.Minute, xtime.Minute, 48*time.Hour), + PoliciesList: policy.PoliciesList{ + policy.NewStagedPolicies( + 22000, + false, + []policy.Policy{ + policy.NewPolicy(10*time.Second, xtime.Second, 12*time.Hour), + policy.NewPolicy(time.Minute, xtime.Minute, 24*time.Hour), + policy.NewPolicy(5*time.Minute, xtime.Minute, 48*time.Hour), + }, + ), }, }, { ID: b("rName2|rtagName1=rtagValue1"), - Policies: []policy.Policy{ - policy.NewPolicy(10*time.Second, xtime.Second, 24*time.Hour), + PoliciesList: policy.PoliciesList{ + policy.NewStagedPolicies( + 22000, + false, + []policy.Policy{ + policy.NewPolicy(10*time.Second, xtime.Second, 24*time.Hour), + }, + ), }, }, }, @@ -272,13 +317,18 @@ func TestRuleSetActiveSet(t *testing.T) { { id: "rtagName1=rtagValue2", matchAt: time.Unix(0, 25000), - cutoverNs: 24000, expireAtNs: 30000, result: []RollupResult{ { ID: b("rName3|rtagName1=rtagValue2"), - Policies: []policy.Policy{ - policy.NewPolicy(time.Minute, xtime.Minute, time.Hour), + PoliciesList: policy.PoliciesList{ + policy.NewStagedPolicies( + 24000, + false, + []policy.Policy{ + policy.NewPolicy(time.Minute, xtime.Minute, time.Hour), + }, + ), }, }, }, @@ -286,7 +336,6 @@ func TestRuleSetActiveSet(t *testing.T) { { id: "rtagName5=rtagValue5", matchAt: time.Unix(0, 25000), - cutoverNs: 0, expireAtNs: 30000, result: []RollupResult{}, }, @@ -298,44 +347,56 @@ func TestRuleSetActiveSet(t *testing.T) { { id: "mtagName1=mtagValue1", matchAt: time.Unix(0, 35000), - cutoverNs: 35000, expireAtNs: 100000, - result: []policy.Policy{ - policy.NewPolicy(10*time.Second, xtime.Second, 2*time.Hour), - policy.NewPolicy(30*time.Second, xtime.Second, 6*time.Hour), - policy.NewPolicy(45*time.Second, xtime.Second, 12*time.Hour), + result: policy.PoliciesList{ + policy.NewStagedPolicies( + 34000, + false, + []policy.Policy{ + policy.NewPolicy(10*time.Second, xtime.Second, 2*time.Hour), + policy.NewPolicy(30*time.Second, xtime.Second, 6*time.Hour), + }, + ), }, }, { id: "mtagName1=mtagValue2", matchAt: time.Unix(0, 35000), - cutoverNs: 24000, expireAtNs: 100000, - result: []policy.Policy{ - policy.NewPolicy(10*time.Second, xtime.Second, 24*time.Hour), + result: policy.PoliciesList{ + policy.NewStagedPolicies( + 24000, + false, + []policy.Policy{ + policy.NewPolicy(10*time.Second, xtime.Second, 24*time.Hour), + }, + ), }, }, { id: "mtagName1=mtagValue3", matchAt: time.Unix(0, 35000), - cutoverNs: 0, expireAtNs: 100000, - result: policy.DefaultVersionedPolicies(1, time.Unix(0, 35000)).Policies(), + result: policy.DefaultPoliciesList, }, }, rollupInputs: []testRollupResultsData{ { id: "rtagName1=rtagValue1,rtagName2=rtagValue2,rtagName3=rtagValue3", matchAt: time.Unix(0, 35000), - cutoverNs: 35000, expireAtNs: 100000, result: []RollupResult{ { ID: b("rName1|rtagName1=rtagValue1,rtagName2=rtagValue2"), - Policies: []policy.Policy{ - policy.NewPolicy(10*time.Second, xtime.Second, 2*time.Hour), - policy.NewPolicy(30*time.Second, xtime.Second, 6*time.Hour), - policy.NewPolicy(45*time.Second, xtime.Second, 12*time.Hour), + PoliciesList: policy.PoliciesList{ + policy.NewStagedPolicies( + 34000, + false, + []policy.Policy{ + policy.NewPolicy(10*time.Second, xtime.Second, 2*time.Hour), + policy.NewPolicy(30*time.Second, xtime.Second, 6*time.Hour), + }, + ), }, }, }, @@ -343,13 +404,18 @@ func TestRuleSetActiveSet(t *testing.T) { { id: "rtagName1=rtagValue2", matchAt: time.Unix(0, 35000), - cutoverNs: 24000, expireAtNs: 100000, result: []RollupResult{ { ID: b("rName3|rtagName1=rtagValue2"), - Policies: []policy.Policy{ - policy.NewPolicy(time.Minute, xtime.Minute, time.Hour), + PoliciesList: policy.PoliciesList{ + policy.NewStagedPolicies( + 24000, + false, + []policy.Policy{ + policy.NewPolicy(time.Minute, xtime.Minute, time.Hour), + }, + ), }, }, }, @@ -357,7 +423,6 @@ func TestRuleSetActiveSet(t *testing.T) { { id: "rtagName5=rtagValue5", matchAt: time.Unix(0, 35000), - cutoverNs: 0, expireAtNs: 100000, result: []RollupResult{}, }, @@ -369,49 +434,67 @@ func TestRuleSetActiveSet(t *testing.T) { { id: "mtagName1=mtagValue1", matchAt: time.Unix(0, 250000), - cutoverNs: 100000, expireAtNs: timeNsMax, - result: []policy.Policy{ - policy.NewPolicy(10*time.Second, xtime.Second, 24*time.Hour), + result: policy.PoliciesList{ + policy.NewStagedPolicies( + 100000, + false, + []policy.Policy{ + policy.NewPolicy(10*time.Second, xtime.Second, 24*time.Hour), + }, + ), }, }, { id: "mtagName1=mtagValue2", matchAt: time.Unix(0, 250000), - cutoverNs: 24000, expireAtNs: timeNsMax, - result: []policy.Policy{ - policy.NewPolicy(10*time.Second, xtime.Second, 24*time.Hour), + result: policy.PoliciesList{ + policy.NewStagedPolicies( + 24000, + false, + []policy.Policy{ + policy.NewPolicy(10*time.Second, xtime.Second, 24*time.Hour), + }, + ), }, }, { id: "mtagName1=mtagValue3", matchAt: time.Unix(0, 250000), - cutoverNs: 0, expireAtNs: timeNsMax, - result: policy.DefaultVersionedPolicies(1, time.Unix(0, 250000)).Policies(), + result: policy.DefaultPoliciesList, }, }, - rollupInputs: []testRollupResultsData{ { id: "rtagName1=rtagValue1,rtagName2=rtagValue2,rtagName3=rtagValue3", matchAt: time.Unix(0, 250000), - cutoverNs: 100000, expireAtNs: timeNsMax, result: []RollupResult{ { ID: b("rName1|rtagName1=rtagValue1,rtagName2=rtagValue2"), - Policies: []policy.Policy{ - policy.NewPolicy(10*time.Second, xtime.Second, 2*time.Hour), - policy.NewPolicy(30*time.Second, xtime.Second, 6*time.Hour), - policy.NewPolicy(45*time.Second, xtime.Second, 12*time.Hour), + PoliciesList: policy.PoliciesList{ + policy.NewStagedPolicies( + 100000, + false, + []policy.Policy{ + policy.NewPolicy(10*time.Second, xtime.Second, 2*time.Hour), + policy.NewPolicy(30*time.Second, xtime.Second, 6*time.Hour), + }, + ), }, }, { ID: b("rName3|rtagName1=rtagValue1,rtagName2=rtagValue2"), - Policies: []policy.Policy{ - policy.NewPolicy(time.Minute, xtime.Minute, time.Hour), + PoliciesList: policy.PoliciesList{ + policy.NewStagedPolicies( + 100000, + false, + []policy.Policy{ + policy.NewPolicy(time.Minute, xtime.Minute, time.Hour), + }, + ), }, }, }, @@ -419,13 +502,18 @@ func TestRuleSetActiveSet(t *testing.T) { { id: "rtagName1=rtagValue2", matchAt: time.Unix(0, 250000), - cutoverNs: 24000, expireAtNs: timeNsMax, result: []RollupResult{ { ID: b("rName3|rtagName1=rtagValue2"), - Policies: []policy.Policy{ - policy.NewPolicy(time.Minute, xtime.Minute, time.Hour), + PoliciesList: policy.PoliciesList{ + policy.NewStagedPolicies( + 24000, + false, + []policy.Policy{ + policy.NewPolicy(time.Minute, xtime.Minute, time.Hour), + }, + ), }, }, }, @@ -433,7 +521,6 @@ func TestRuleSetActiveSet(t *testing.T) { { id: "rtagName5=rtagValue5", matchAt: time.Unix(0, 250000), - cutoverNs: 0, expireAtNs: timeNsMax, result: []RollupResult{}, }, @@ -444,22 +531,19 @@ func TestRuleSetActiveSet(t *testing.T) { for _, inputs := range allInputs { as := newRuleSet.ActiveSet(inputs.activeSetTime) for _, input := range inputs.mappingInputs { - res := as.Match(b(input.id), input.matchAt) - require.Equal(t, 1, res.version) - require.Equal(t, input.cutoverNs, res.cutoverNs) + res := as.MatchAll(b(input.id), input.matchAt, input.matchAt.Add(time.Nanosecond)) require.Equal(t, input.expireAtNs, res.expireAtNs) - require.Equal(t, input.result, res.Mappings().Policies()) + require.Equal(t, input.result, res.MappingsAt(time.Unix(0, 0))) } for _, input := range inputs.rollupInputs { - res := as.Match(b(input.id), input.matchAt) - require.Equal(t, 1, res.version) - require.Equal(t, input.cutoverNs, res.cutoverNs) + res := as.MatchAll(b(input.id), input.matchAt, input.matchAt.Add(time.Nanosecond)) require.Equal(t, input.expireAtNs, res.expireAtNs) require.Equal(t, len(input.result), res.NumRollups()) for i := 0; i < len(input.result); i++ { - id, policies := res.Rollups(i) + rollup := res.RollupsAt(i, time.Unix(0, 0)) + id, policies := rollup.ID, rollup.PoliciesList require.Equal(t, input.result[i].ID, id) - require.Equal(t, input.result[i].Policies, policies.Policies()) + require.Equal(t, input.result[i].PoliciesList, policies) } } } @@ -1464,15 +1548,13 @@ func testRollupRulesConfig() []*schema.RollupRule { type testMappingsData struct { id string matchAt time.Time - cutoverNs int64 expireAtNs int64 - result []policy.Policy + result policy.PoliciesList } type testRollupResultsData struct { id string matchAt time.Time - cutoverNs int64 expireAtNs int64 result []RollupResult }