From f81720221689b0b53db79de62126e1c1254eadf0 Mon Sep 17 00:00:00 2001 From: Xi Chen Date: Tue, 20 Dec 2016 14:55:02 -0500 Subject: [PATCH] Encoder now supports custom resolutions and retentions --- policy/resolution.go | 6 +- policy/resolution_test.go | 2 +- policy/retention.go | 6 +- policy/retention_test.go | 2 +- protocol/msgpack/roundtrip_test.go | 12 +- protocol/msgpack/schema.go | 27 +++- protocol/msgpack/types.go | 4 - protocol/msgpack/unaggregated_encoder.go | 71 +++++----- protocol/msgpack/unaggregated_encoder_test.go | 40 ++++-- protocol/msgpack/unaggregated_iterator.go | 122 +++++++++++------- .../msgpack/unaggregated_iterator_test.go | 69 ++++++++-- 11 files changed, 246 insertions(+), 115 deletions(-) diff --git a/policy/resolution.go b/policy/resolution.go index 62a5807..d4b6251 100644 --- a/policy/resolution.go +++ b/policy/resolution.go @@ -30,9 +30,9 @@ import ( // ResolutionValue is the resolution value type ResolutionValue int -// List of resolution values currently supported +// List of known resolution values const ( - UnknownResolution ResolutionValue = iota + UnknownResolutionValue ResolutionValue = iota OneSecond TenSeconds OneMinute @@ -63,7 +63,7 @@ func ValueFromResolution(resolution Resolution) (ResolutionValue, error) { if exists { return value, nil } - return UnknownResolution, errUnknownResolution + return UnknownResolutionValue, errUnknownResolution } var ( diff --git a/policy/resolution_test.go b/policy/resolution_test.go index 382f1a4..241d120 100644 --- a/policy/resolution_test.go +++ b/policy/resolution_test.go @@ -53,7 +53,7 @@ func TestValidResolutionValue(t *testing.T) { func TestInvalidResolutionValue(t *testing.T) { inputs := []ResolutionValue{ - UnknownResolution, + UnknownResolutionValue, ResolutionValue(100), } for _, value := range inputs { diff --git a/policy/retention.go b/policy/retention.go index d9d90b4..080081d 100644 --- a/policy/retention.go +++ b/policy/retention.go @@ -28,9 +28,9 @@ import ( // RetentionValue is the retention value type RetentionValue int -// List of retention values currently supported +// List of known retention values const ( - UnknownRetention RetentionValue = iota + UnknownRetentionValue RetentionValue = iota OneHour SixHours TwelveHours @@ -65,7 +65,7 @@ func ValueFromRetention(retention Retention) (RetentionValue, error) { if exists { return value, nil } - return UnknownRetention, errUnknownRetention + return UnknownRetentionValue, errUnknownRetention } var ( diff --git a/policy/retention_test.go b/policy/retention_test.go index 4a954fd..e86e7d2 100644 --- a/policy/retention_test.go +++ b/policy/retention_test.go @@ -59,7 +59,7 @@ func TestValidRetentionValue(t *testing.T) { func TestInvalidRetentionValue(t *testing.T) { inputs := []RetentionValue{ - UnknownRetention, + UnknownRetentionValue, RetentionValue(100), } for _, value := range inputs { diff --git a/protocol/msgpack/roundtrip_test.go b/protocol/msgpack/roundtrip_test.go index c8e86ad..e6d5759 100644 --- a/protocol/msgpack/roundtrip_test.go +++ b/protocol/msgpack/roundtrip_test.go @@ -83,21 +83,25 @@ var ( }, }, }, - // Retain this metric at 10 second resolution for 6 hours, - // then 1 minute resolution for 2 days + // Retain this metric at 100 second resolution for 6 hours, + // then custom resolution for 2 days, then 1 minute for 25 days { metric: testBatchTimer, versionedPolicies: policy.VersionedPolicies{ Version: 2, Policies: []policy.Policy{ { - Resolution: policy.Resolution{Window: time.Duration(10), Precision: xtime.Second}, + Resolution: policy.Resolution{Window: time.Duration(100), Precision: xtime.Second}, Retention: policy.Retention(6 * time.Hour), }, { - Resolution: policy.Resolution{Window: time.Duration(1), Precision: xtime.Minute}, + Resolution: policy.Resolution{Window: time.Duration(1), Precision: xtime.Unit(100)}, Retention: policy.Retention(2 * 24 * time.Hour), }, + { + Resolution: policy.Resolution{Window: time.Duration(1), Precision: xtime.Minute}, + Retention: policy.Retention(25 * 24 * time.Hour), + }, }, }, }, diff --git a/protocol/msgpack/schema.go b/protocol/msgpack/schema.go index eccb998..027ab63 100644 --- a/protocol/msgpack/schema.go +++ b/protocol/msgpack/schema.go @@ -44,6 +44,11 @@ package msgpack type objectType int +const ( + // Current version for encoding unaggregated metrics + unaggregatedVersion int = 1 +) + const ( unknownType = iota @@ -60,8 +65,12 @@ const ( batchTimerType gaugeType policyType - defaultVersionedPolicyType - customVersionedPolicyType + knownResolutionType + unknownResolutionType + knownRetentionType + unknownRetentionType + defaultVersionedPoliciesType + customVersionedPoliciesType // Total number of object types numObjectTypes = iota @@ -76,8 +85,12 @@ const ( numBatchTimerFields = 2 numGaugeFields = 2 numPolicyFields = 2 + numKnownResolutionFields = 2 + numUnknownResolutionFields = 3 + numKnownRetentionFields = 2 + numUnknownRetentionFields = 2 numDefaultVersionedPolicyFields = 1 - numCustomVersionedPolicyFields = 2 + numCustomVersionedPolicyFields = 3 ) var numObjectFields []int @@ -100,7 +113,11 @@ func init() { setNumFieldsForType(batchTimerType, numBatchTimerFields) setNumFieldsForType(gaugeType, numGaugeFields) setNumFieldsForType(policyType, numPolicyFields) - setNumFieldsForType(defaultVersionedPolicyType, numDefaultVersionedPolicyFields) - setNumFieldsForType(customVersionedPolicyType, numCustomVersionedPolicyFields) + setNumFieldsForType(knownResolutionType, numKnownResolutionFields) + setNumFieldsForType(unknownResolutionType, numUnknownResolutionFields) + setNumFieldsForType(knownRetentionType, numKnownRetentionFields) + setNumFieldsForType(unknownRetentionType, numKnownRetentionFields) + setNumFieldsForType(defaultVersionedPoliciesType, numDefaultVersionedPolicyFields) + setNumFieldsForType(customVersionedPoliciesType, numCustomVersionedPolicyFields) } diff --git a/protocol/msgpack/types.go b/protocol/msgpack/types.go index 202b814..69c1663 100644 --- a/protocol/msgpack/types.go +++ b/protocol/msgpack/types.go @@ -32,10 +32,6 @@ import ( "gopkg.in/vmihailenco/msgpack.v2" ) -const ( - supportedVersion int = 1 -) - // BufferedEncoder is an messagePack-based encoder backed by byte buffers type BufferedEncoder struct { *msgpack.Encoder diff --git a/protocol/msgpack/unaggregated_encoder.go b/protocol/msgpack/unaggregated_encoder.go index 8d4c05c..8bcf02d 100644 --- a/protocol/msgpack/unaggregated_encoder.go +++ b/protocol/msgpack/unaggregated_encoder.go @@ -130,7 +130,7 @@ func (enc *unaggregatedEncoder) EncodeGaugeWithPolicies( } func (enc *unaggregatedEncoder) encodeRootObject(objType objectType) { - enc.encodeVersion(supportedVersion) + enc.encodeVersion(unaggregatedVersion) enc.encodeNumObjectFields(numFieldsForType(rootObjectType)) enc.encodeObjectType(objType) } @@ -189,18 +189,55 @@ func (enc *unaggregatedEncoder) encodePolicy(p policy.Policy) { enc.encodeRetention(p.Retention) } +func (enc *unaggregatedEncoder) encodeResolution(resolution policy.Resolution) { + if enc.err != nil { + return + } + // If this is a known resolution, only encode its corresponding value + if resolutionValue, err := policy.ValueFromResolution(resolution); err == nil { + enc.encodeNumObjectFields(numFieldsForType(knownResolutionType)) + enc.encodeObjectType(knownResolutionType) + enc.encodeVarintFn(int64(resolutionValue)) + return + } + // Otherwise encode the entire resolution object + // TODO(xichen): validate the resolution before putting it on the wire + enc.encodeNumObjectFields(numFieldsForType(unknownResolutionType)) + enc.encodeObjectType(unknownResolutionType) + enc.encodeVarintFn(int64(resolution.Window)) + enc.encodeVarintFn(int64(resolution.Precision)) +} + +func (enc *unaggregatedEncoder) encodeRetention(retention policy.Retention) { + if enc.err != nil { + return + } + // If this is a known retention, only encode its corresponding value + if retentionValue, err := policy.ValueFromRetention(retention); err == nil { + enc.encodeNumObjectFields(numFieldsForType(knownRetentionType)) + enc.encodeObjectType(knownRetentionType) + enc.encodeVarintFn(int64(retentionValue)) + return + } + // Otherwise encode the entire retention object + // TODO(xichen): validate the retention before putting it on the wire + enc.encodeNumObjectFields(numFieldsForType(unknownRetentionType)) + enc.encodeObjectType(unknownRetentionType) + enc.encodeVarintFn(int64(retention)) +} + func (enc *unaggregatedEncoder) encodeVersionedPolicies(vp policy.VersionedPolicies) { // NB(xichen): if this is a default policy, we only encode the policy version // and not the actual policies to optimize for the common case where the policies // are the default ones if vp.Version == policy.DefaultPolicyVersion { - enc.encodeNumObjectFields(numFieldsForType(defaultVersionedPolicyType)) - enc.encodeVersion(vp.Version) + enc.encodeNumObjectFields(numFieldsForType(defaultVersionedPoliciesType)) + enc.encodeObjectType(defaultVersionedPoliciesType) return } - // Otherwise fallback to encoding the entire object - enc.encodeNumObjectFields(numFieldsForType(customVersionedPolicyType)) + enc.encodeNumObjectFields(numFieldsForType(customVersionedPoliciesType)) + enc.encodeObjectType(customVersionedPoliciesType) enc.encodeVersion(vp.Version) enc.encodeArrayLenFn(len(vp.Policies)) for _, policy := range vp.Policies { @@ -224,30 +261,6 @@ func (enc *unaggregatedEncoder) encodeID(id metric.ID) { enc.encodeBytesFn([]byte(id)) } -func (enc *unaggregatedEncoder) encodeResolution(resolution policy.Resolution) { - if enc.err != nil { - return - } - resolutionValue, err := policy.ValueFromResolution(resolution) - if err != nil { - enc.err = err - return - } - enc.encodeVarintFn(int64(resolutionValue)) -} - -func (enc *unaggregatedEncoder) encodeRetention(retention policy.Retention) { - if enc.err != nil { - return - } - retentionValue, err := policy.ValueFromRetention(retention) - if err != nil { - enc.err = err - return - } - enc.encodeVarintFn(int64(retentionValue)) -} - // NB(xichen): the underlying msgpack encoder implementation // always cast an integer value to an int64 and encodes integer // values as varints, regardless of the actual integer type diff --git a/protocol/msgpack/unaggregated_encoder_test.go b/protocol/msgpack/unaggregated_encoder_test.go index 2e5b3e0..6ff9673 100644 --- a/protocol/msgpack/unaggregated_encoder_test.go +++ b/protocol/msgpack/unaggregated_encoder_test.go @@ -62,7 +62,7 @@ func testCapturingUnaggregatedEncoder(t *testing.T) (UnaggregatedEncoder, *[]int func getExpectedResultsForMetricWithPolicies(t *testing.T, m *unaggregated.MetricUnion, p policy.VersionedPolicies) []interface{} { results := []interface{}{ - int64(supportedVersion), + int64(unaggregatedVersion), numFieldsForType(rootObjectType), } @@ -100,12 +100,13 @@ func getExpectedResultsForMetricWithPolicies(t *testing.T, m *unaggregated.Metri if p.Version == policy.DefaultPolicyVersion { results = append(results, []interface{}{ - numFieldsForType(defaultVersionedPolicyType), - int64(p.Version), + numFieldsForType(defaultVersionedPoliciesType), + int64(defaultVersionedPoliciesType), }...) } else { results = append(results, []interface{}{ - numFieldsForType(customVersionedPolicyType), + numFieldsForType(customVersionedPoliciesType), + int64(customVersionedPoliciesType), int64(p.Version), len(p.Policies), }...) @@ -113,12 +114,35 @@ func getExpectedResultsForMetricWithPolicies(t *testing.T, m *unaggregated.Metri results = append(results, numFieldsForType(policyType)) resolutionValue, err := policy.ValueFromResolution(p.Resolution) - require.NoError(t, err) - results = append(results, int64(resolutionValue)) + if err == nil { + results = append(results, []interface{}{ + numFieldsForType(knownResolutionType), + int64(knownResolutionType), + int64(resolutionValue), + }...) + } else { + results = append(results, []interface{}{ + numFieldsForType(unknownResolutionType), + int64(unknownResolutionType), + int64(p.Resolution.Window), + int64(p.Resolution.Precision), + }...) + } retentionValue, err := policy.ValueFromRetention(p.Retention) - require.NoError(t, err) - results = append(results, int64(retentionValue)) + if err == nil { + results = append(results, []interface{}{ + numFieldsForType(knownRetentionType), + int64(knownRetentionType), + int64(retentionValue), + }...) + } else { + results = append(results, []interface{}{ + numFieldsForType(unknownRetentionType), + int64(unknownRetentionType), + int64(p.Retention), + }...) + } } } diff --git a/protocol/msgpack/unaggregated_iterator.go b/protocol/msgpack/unaggregated_iterator.go index c43a4c7..767d8f2 100644 --- a/protocol/msgpack/unaggregated_iterator.go +++ b/protocol/msgpack/unaggregated_iterator.go @@ -23,12 +23,14 @@ package msgpack import ( "fmt" "io" + "time" "github.com/m3db/m3metrics/metric" "github.com/m3db/m3metrics/metric/unaggregated" "github.com/m3db/m3metrics/policy" "github.com/m3db/m3metrics/pool" xpool "github.com/m3db/m3x/pool" + "github.com/m3db/m3x/time" "gopkg.in/vmihailenco/msgpack.v2" ) @@ -91,7 +93,7 @@ func (it *unaggregatedIterator) decodeRootObject() bool { } // If the actual version is higher than supported version, we skip // the data for this metric and continue to the next - if version > supportedVersion { + if version > unaggregatedVersion { it.skip(it.decodeNumObjectFields()) return it.Next() } @@ -185,43 +187,93 @@ func (it *unaggregatedIterator) decodePolicy() policy.Policy { return p } -func (it *unaggregatedIterator) decodeVersionedPolicies() { +func (it *unaggregatedIterator) decodeResolution() policy.Resolution { numActualFields := it.decodeNumObjectFields() - version := int(it.decodeVarint()) - if it.err != nil { - return + resolutionType := it.decodeObjectType() + numExpectedFields, numActualFields, ok := it.checkNumFieldsForTypeWithActual( + resolutionType, + numActualFields, + ) + if !ok { + return policy.EmptyResolution + } + switch resolutionType { + case knownResolutionType: + resolutionValue := policy.ResolutionValue(it.decodeVarint()) + it.skip(numActualFields - numExpectedFields) + if it.err != nil { + return policy.EmptyResolution + } + resolution, err := resolutionValue.Resolution() + it.err = err + return resolution + case unknownResolutionType: + window := time.Duration(it.decodeVarint()) + precision := xtime.Unit(it.decodeVarint()) + it.skip(numActualFields - numExpectedFields) + return policy.Resolution{Window: window, Precision: precision} + default: + it.err = fmt.Errorf("unrecognized resolution type %v", resolutionType) + return policy.EmptyResolution } +} - // NB(xichen): if the policy version is the default version, simply - // return the default policies - if version == policy.DefaultPolicyVersion { - numExpectedFields, numActualFields, ok := it.checkNumFieldsForTypeWithActual( - defaultVersionedPolicyType, - numActualFields, - ) - if !ok { - return +func (it *unaggregatedIterator) decodeRetention() policy.Retention { + numActualFields := it.decodeNumObjectFields() + retentionType := it.decodeObjectType() + numExpectedFields, numActualFields, ok := it.checkNumFieldsForTypeWithActual( + retentionType, + numActualFields, + ) + if !ok { + return policy.EmptyRetention + } + switch retentionType { + case knownRetentionType: + retentionValue := policy.RetentionValue(it.decodeVarint()) + it.skip(numActualFields - numExpectedFields) + if it.err != nil { + return policy.EmptyRetention } - it.versionedPolicies = policy.DefaultVersionedPolicies + retention, err := retentionValue.Retention() + it.err = err + return retention + case unknownRetentionType: + retention := policy.Retention(it.decodeVarint()) it.skip(numActualFields - numExpectedFields) - return + return retention + default: + it.err = fmt.Errorf("unrecognized retention type %v", retentionType) + return policy.EmptyRetention } +} - // Otherwise proceed to decoding the entire object +func (it *unaggregatedIterator) decodeVersionedPolicies() { + numActualFields := it.decodeNumObjectFields() + versionedPoliciesType := it.decodeObjectType() numExpectedFields, numActualFields, ok := it.checkNumFieldsForTypeWithActual( - customVersionedPolicyType, + versionedPoliciesType, numActualFields, ) if !ok { return } - numPolicies := it.decodeArrayLen() - policies := it.policiesPool.Get(numPolicies) - for i := 0; i < numPolicies; i++ { - policies = append(policies, it.decodePolicy()) + switch versionedPoliciesType { + case defaultVersionedPoliciesType: + it.versionedPolicies = policy.DefaultVersionedPolicies + it.skip(numActualFields - numExpectedFields) + case customVersionedPoliciesType: + version := int(it.decodeVarint()) + numPolicies := it.decodeArrayLen() + policies := it.policiesPool.Get(numPolicies) + for i := 0; i < numPolicies; i++ { + policies = append(policies, it.decodePolicy()) + } + it.versionedPolicies = policy.VersionedPolicies{Version: version, Policies: policies} + it.skip(numActualFields - numExpectedFields) + default: + it.err = fmt.Errorf("unrecognized versioned policies type: %v", versionedPoliciesType) } - it.versionedPolicies = policy.VersionedPolicies{Version: version, Policies: policies} - it.skip(numActualFields - numExpectedFields) } // checkNumFieldsForType decodes and compares the number of actual fields with @@ -236,10 +288,10 @@ func (it *unaggregatedIterator) checkNumFieldsForTypeWithActual( objType objectType, numActualFields int, ) (int, int, bool) { - numExpectedFields := numFieldsForType(objType) if it.err != nil { return 0, 0, false } + numExpectedFields := numFieldsForType(objType) if numExpectedFields > numActualFields { it.err = fmt.Errorf("number of fields mismatch: expected %d actual %d", numExpectedFields, numActualFields) return 0, 0, false @@ -263,26 +315,6 @@ func (it *unaggregatedIterator) decodeID() metric.ID { return metric.ID(it.decodeBytes()) } -func (it *unaggregatedIterator) decodeResolution() policy.Resolution { - resolutionValue := policy.ResolutionValue(it.decodeVarint()) - resolution, err := resolutionValue.Resolution() - if it.err != nil { - return policy.EmptyResolution - } - it.err = err - return resolution -} - -func (it *unaggregatedIterator) decodeRetention() policy.Retention { - retentionValue := policy.RetentionValue(it.decodeVarint()) - retention, err := retentionValue.Retention() - if it.err != nil { - return policy.EmptyRetention - } - it.err = err - return retention -} - // NB(xichen): the underlying msgpack decoder implementation // always decodes an int64 and looks at the actual decoded // value to determine the width of the integer (a.k.a. varint diff --git a/protocol/msgpack/unaggregated_iterator_test.go b/protocol/msgpack/unaggregated_iterator_test.go index 293e5b4..5d6d8c4 100644 --- a/protocol/msgpack/unaggregated_iterator_test.go +++ b/protocol/msgpack/unaggregated_iterator_test.go @@ -59,7 +59,7 @@ func TestUnaggregatedIteratorDecodeNewerVersionThanSupported(t *testing.T) { // Version encoded is higher than supported version enc.encodeRootObjectFn = func(objType objectType) { - enc.encodeVersion(supportedVersion + 1) + enc.encodeVersion(unaggregatedVersion + 1) enc.encodeNumObjectFields(numFieldsForType(rootObjectType)) enc.encodeObjectType(objType) } @@ -71,7 +71,7 @@ func TestUnaggregatedIteratorDecodeNewerVersionThanSupported(t *testing.T) { it := testUnaggregatedIterator(t, enc.Encoder().Buffer) - // Check that we skipped the first counter and normally decoded the second counter + // Check that we skipped the first counter and successfully decoded the second counter validateDecodeResults(t, it, []metricWithPolicies{input}, io.EOF) } @@ -84,7 +84,7 @@ func TestUnaggregatedIteratorDecodeRootObjectMoreFieldsThanExpected(t *testing.T // Pretend we added an extra int field to the top-level object enc.encodeRootObjectFn = func(objType objectType) { - enc.encodeVersion(supportedVersion) + enc.encodeVersion(unaggregatedVersion) enc.encodeNumObjectFields(numFieldsForType(rootObjectType) + 1) enc.encodeObjectType(objType) } @@ -94,7 +94,7 @@ func TestUnaggregatedIteratorDecodeRootObjectMoreFieldsThanExpected(t *testing.T it := testUnaggregatedIterator(t, enc.Encoder().Buffer) - // Check that we normally decoded the counter + // Check that we successfully decoded the counter validateDecodeResults(t, it, []metricWithPolicies{input}, io.EOF) } @@ -117,7 +117,7 @@ func TestUnaggregatedIteratorDecodeCounterWithPoliciesMoreFieldsThanExpected(t * it := testUnaggregatedIterator(t, enc.Encoder().Buffer) - // Check that we normally decoded the counter + // Check that we successfully decoded the counter validateDecodeResults(t, it, []metricWithPolicies{input}, io.EOF) } @@ -139,7 +139,7 @@ func TestUnaggregatedIteratorDecodeCounterMoreFieldsThanExpected(t *testing.T) { it := testUnaggregatedIterator(t, enc.Encoder().Buffer) - // Check that we normally decoded the counter + // Check that we successfully decoded the counter validateDecodeResults(t, it, []metricWithPolicies{input}, io.EOF) } @@ -167,7 +167,7 @@ func TestUnaggregatedIteratorDecodeBatchTimerMoreFieldsThanExpected(t *testing.T it := testUnaggregatedIterator(t, enc.Encoder().Buffer) - // Check that we normally decoded the batch timer + // Check that we successfully decoded the batch timer validateDecodeResults(t, it, []metricWithPolicies{input}, io.EOF) } @@ -189,7 +189,51 @@ func TestUnaggregatedIteratorDecodeGaugeMoreFieldsThanExpected(t *testing.T) { it := testUnaggregatedIterator(t, enc.Encoder().Buffer) - // Check that we normally decoded the gauge + // Check that we successfully decoded the gauge + validateDecodeResults(t, it, []metricWithPolicies{input}, io.EOF) +} + +func TestUnaggregatedIteratorDecodePolicyWithCustomResolution(t *testing.T) { + input := metricWithPolicies{ + metric: testGauge, + versionedPolicies: policy.VersionedPolicies{ + Version: 1, + Policies: []policy.Policy{ + { + Resolution: policy.Resolution{Window: time.Duration(3), Precision: xtime.Second}, + Retention: policy.Retention(time.Hour), + }, + }, + }, + } + enc := testUnaggregatedEncoder(t) + require.NoError(t, enc.EncodeGaugeWithPolicies(input.metric.Gauge(), input.versionedPolicies)) + + it := testUnaggregatedIterator(t, enc.Encoder().Buffer) + + // Check that we successfully decoded the policy + validateDecodeResults(t, it, []metricWithPolicies{input}, io.EOF) +} + +func TestUnaggregatedIteratorDecodePolicyWithCustomRetention(t *testing.T) { + input := metricWithPolicies{ + metric: testGauge, + versionedPolicies: policy.VersionedPolicies{ + Version: 1, + Policies: []policy.Policy{ + { + Resolution: policy.Resolution{Window: time.Duration(1), Precision: xtime.Second}, + Retention: policy.Retention(289 * time.Hour), + }, + }, + }, + } + enc := testUnaggregatedEncoder(t) + require.NoError(t, enc.EncodeGaugeWithPolicies(input.metric.Gauge(), input.versionedPolicies)) + + it := testUnaggregatedIterator(t, enc.Encoder().Buffer) + + // Check that we successfully decoded the policy validateDecodeResults(t, it, []metricWithPolicies{input}, io.EOF) } @@ -219,7 +263,7 @@ func TestUnaggregatedIteratorDecodePolicyMoreFieldsThanExpected(t *testing.T) { it := testUnaggregatedIterator(t, enc.Encoder().Buffer) - // Check that we normally decoded the policy + // Check that we successfully decoded the policy validateDecodeResults(t, it, []metricWithPolicies{input}, io.EOF) } @@ -240,7 +284,8 @@ func TestUnaggregatedIteratorDecodeVersionedPoliciesMoreFieldsThanExpected(t *te // Pretend we added an extra int field to the policy object enc.encodeVersionedPoliciesFn = func(vp policy.VersionedPolicies) { - enc.encodeNumObjectFields(numFieldsForType(customVersionedPolicyType) + 1) + enc.encodeNumObjectFields(numFieldsForType(customVersionedPoliciesType) + 1) + enc.encodeObjectType(customVersionedPoliciesType) enc.encodeVersion(vp.Version) enc.encodeArrayLenFn(len(vp.Policies)) for _, policy := range vp.Policies { @@ -252,7 +297,7 @@ func TestUnaggregatedIteratorDecodeVersionedPoliciesMoreFieldsThanExpected(t *te it := testUnaggregatedIterator(t, enc.Encoder().Buffer) - // Check that we normally decoded the policy + // Check that we successfully decoded the policy validateDecodeResults(t, it, []metricWithPolicies{input}, io.EOF) } @@ -272,7 +317,7 @@ func TestUnaggregatedIteratorDecodeCounterFewerFieldsThanExpected(t *testing.T) it := testUnaggregatedIterator(t, enc.Encoder().Buffer) - // Check that we normally decoded the counter + // Check that we successfully decoded the counter validateDecodeResults(t, it, nil, errors.New("number of fields mismatch: expected 2 actual 1")) }