Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Expose support for encoding and decoding metrics without policies
Browse files Browse the repository at this point in the history
  • Loading branch information
Jerome Froelich committed Feb 11, 2017
1 parent fd4158e commit bc9b97a
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 35 deletions.
9 changes: 9 additions & 0 deletions protocol/msgpack/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,15 @@ type iteratorBase interface {

// UnaggregatedEncoder is an encoder for encoding different types of unaggregated metrics
type UnaggregatedEncoder interface {
// EncodeCounter encodes a counter
EncodeCounter(cp unaggregated.Counter) error

// EncodeBatchTimer encodes a batched timer
EncodeBatchTimer(btp unaggregated.BatchTimer) error

// EncodeGauge encodes a gauge
EncodeGauge(gp unaggregated.Gauge) error

// EncodeCounterWithPolicies encodes a counter with applicable policies
EncodeCounterWithPolicies(cp unaggregated.CounterWithPolicies) error

Expand Down
27 changes: 27 additions & 0 deletions protocol/msgpack/unaggregated_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,33 @@ func NewUnaggregatedEncoder(encoder BufferedEncoder) UnaggregatedEncoder {
func (enc *unaggregatedEncoder) Encoder() BufferedEncoder { return enc.encoder() }
func (enc *unaggregatedEncoder) Reset(encoder BufferedEncoder) { enc.reset(encoder) }

func (enc *unaggregatedEncoder) EncodeCounter(c unaggregated.Counter) error {
if err := enc.err(); err != nil {
return err
}
enc.encodeRootObjectFn(counterType)
enc.encodeCounterFn(c)
return enc.err()
}

func (enc *unaggregatedEncoder) EncodeBatchTimer(bt unaggregated.BatchTimer) error {
if err := enc.err(); err != nil {
return err
}
enc.encodeRootObjectFn(batchTimerType)
enc.encodeBatchTimerFn(bt)
return enc.err()
}

func (enc *unaggregatedEncoder) EncodeGauge(gp unaggregated.Gauge) error {
if err := enc.err(); err != nil {
return err
}
enc.encodeRootObjectFn(gaugeType)
enc.encodeGaugeFn(gp)
return enc.err()
}

func (enc *unaggregatedEncoder) EncodeCounterWithPolicies(cp unaggregated.CounterWithPolicies) error {
if err := enc.err(); err != nil {
return err
Expand Down
89 changes: 74 additions & 15 deletions protocol/msgpack/unaggregated_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,44 @@ func testCapturingUnaggregatedEncoder(t *testing.T) (UnaggregatedEncoder, *[]int
return encoder, result
}

func expectedResultsForUnaggregatedMetric(t *testing.T, m unaggregated.MetricUnion) []interface{} {
results := []interface{}{
int64(unaggregatedVersion),
numFieldsForType(rootObjectType),
}

switch m.Type {
case unaggregated.CounterType:
results = append(results, []interface{}{
int64(counterType),
numFieldsForType(counterType),
[]byte(m.ID),
m.CounterVal,
}...)
case unaggregated.BatchTimerType:
results = append(results, []interface{}{
int64(batchTimerType),
numFieldsForType(batchTimerType),
[]byte(m.ID),
len(m.BatchTimerVal),
}...)
for _, v := range m.BatchTimerVal {
results = append(results, v)
}
case unaggregated.GaugeType:
results = append(results, []interface{}{
int64(gaugeType),
numFieldsForType(gaugeType),
[]byte(m.ID),
m.GaugeVal,
}...)
default:
require.Fail(t, fmt.Sprintf("unrecognized metric type %v", m.Type))
}

return results
}

func expectedResultsForPolicy(t *testing.T, p policy.Policy) []interface{} {
results := []interface{}{numFieldsForType(policyType)}

Expand Down Expand Up @@ -142,26 +180,47 @@ func expectedResultsForUnaggregatedMetricWithPolicies(t *testing.T, m unaggregat
return results
}

func TestUnaggregatedEncodeCounter(t *testing.T) {
encoder, results := testCapturingUnaggregatedEncoder(t)
require.NoError(t, testUnaggregatedEncode(t, encoder, testCounter))
expected := expectedResultsForUnaggregatedMetric(t, testCounter)
require.Equal(t, expected, *results)
}

func TestUnaggregatedEncodeBatchTimer(t *testing.T) {
encoder, results := testCapturingUnaggregatedEncoder(t)
require.NoError(t, testUnaggregatedEncode(t, encoder, testBatchTimer))
expected := expectedResultsForUnaggregatedMetric(t, testBatchTimer)
require.Equal(t, expected, *results)
}

func TestUnaggregatedEncodeGauge(t *testing.T) {
encoder, results := testCapturingUnaggregatedEncoder(t)
require.NoError(t, testUnaggregatedEncode(t, encoder, testGauge))
expected := expectedResultsForUnaggregatedMetric(t, testGauge)
require.Equal(t, expected, *results)
}

func TestUnaggregatedEncodeCounterWithDefaultPolicies(t *testing.T) {
policies := policy.DefaultVersionedPolicies
encoder, results := testCapturingUnaggregatedEncoder(t)
require.NoError(t, testUnaggregatedEncode(t, encoder, testCounter, policies))
require.NoError(t, testUnaggregatedEncodeWithPolicies(t, encoder, testCounter, policies))
expected := expectedResultsForUnaggregatedMetricWithPolicies(t, testCounter, policies)
require.Equal(t, expected, *results)
}

func TestUnaggregatedEncodeBatchTimerWithDefaultPolicies(t *testing.T) {
policies := policy.DefaultVersionedPolicies
encoder, results := testCapturingUnaggregatedEncoder(t)
require.NoError(t, testUnaggregatedEncode(t, encoder, testBatchTimer, policies))
require.NoError(t, testUnaggregatedEncodeWithPolicies(t, encoder, testBatchTimer, policies))
expected := expectedResultsForUnaggregatedMetricWithPolicies(t, testBatchTimer, policies)
require.Equal(t, expected, *results)
}

func TestUnaggregatedEncodeGaugeWithDefaultPolicies(t *testing.T) {
policies := policy.DefaultVersionedPolicies
encoder, results := testCapturingUnaggregatedEncoder(t)
require.NoError(t, testUnaggregatedEncode(t, encoder, testGauge, policies))
require.NoError(t, testUnaggregatedEncodeWithPolicies(t, encoder, testGauge, policies))
expected := expectedResultsForUnaggregatedMetricWithPolicies(t, testGauge, policies)
require.Equal(t, expected, *results)
}
Expand All @@ -170,7 +229,7 @@ func TestUnaggregatedEncodeAllTypesWithDefaultPolicies(t *testing.T) {
var expected []interface{}
encoder, results := testCapturingUnaggregatedEncoder(t)
for _, input := range testInputWithAllTypesAndDefaultPolicies {
require.NoError(t, testUnaggregatedEncode(t, encoder, input.metric, input.versionedPolicies))
require.NoError(t, testUnaggregatedEncodeWithPolicies(t, encoder, input.metric, input.versionedPolicies))
expected = append(expected, expectedResultsForUnaggregatedMetricWithPolicies(t, input.metric, input.versionedPolicies)...)
}

Expand All @@ -181,7 +240,7 @@ func TestUnaggregatedEncodeAllTypesWithCustomPolicies(t *testing.T) {
var expected []interface{}
encoder, results := testCapturingUnaggregatedEncoder(t)
for _, input := range testInputWithAllTypesAndCustomPolicies {
require.NoError(t, testUnaggregatedEncode(t, encoder, input.metric, input.versionedPolicies))
require.NoError(t, testUnaggregatedEncodeWithPolicies(t, encoder, input.metric, input.versionedPolicies))
expected = append(expected, expectedResultsForUnaggregatedMetricWithPolicies(t, input.metric, input.versionedPolicies)...)
}

Expand All @@ -200,10 +259,10 @@ func TestUnaggregatedEncodeVarintError(t *testing.T) {
}

// Assert the error is expected
require.Equal(t, errTestVarint, testUnaggregatedEncode(t, encoder, counter, policies))
require.Equal(t, errTestVarint, testUnaggregatedEncodeWithPolicies(t, encoder, counter, policies))

// Assert re-encoding doesn't change the error
require.Equal(t, errTestVarint, testUnaggregatedEncode(t, encoder, counter, policies))
require.Equal(t, errTestVarint, testUnaggregatedEncodeWithPolicies(t, encoder, counter, policies))
}

func TestUnaggregatedEncodeFloat64Error(t *testing.T) {
Expand All @@ -218,10 +277,10 @@ func TestUnaggregatedEncodeFloat64Error(t *testing.T) {
}

// Assert the error is expected
require.Equal(t, errTestFloat64, testUnaggregatedEncode(t, encoder, gauge, policies))
require.Equal(t, errTestFloat64, testUnaggregatedEncodeWithPolicies(t, encoder, gauge, policies))

// Assert re-encoding doesn't change the error
require.Equal(t, errTestFloat64, testUnaggregatedEncode(t, encoder, gauge, policies))
require.Equal(t, errTestFloat64, testUnaggregatedEncodeWithPolicies(t, encoder, gauge, policies))
}

func TestUnaggregatedEncodeBytesError(t *testing.T) {
Expand All @@ -236,10 +295,10 @@ func TestUnaggregatedEncodeBytesError(t *testing.T) {
}

// Assert the error is expected
require.Equal(t, errTestBytes, testUnaggregatedEncode(t, encoder, timer, policies))
require.Equal(t, errTestBytes, testUnaggregatedEncodeWithPolicies(t, encoder, timer, policies))

// Assert re-encoding doesn't change the error
require.Equal(t, errTestBytes, testUnaggregatedEncode(t, encoder, timer, policies))
require.Equal(t, errTestBytes, testUnaggregatedEncodeWithPolicies(t, encoder, timer, policies))
}

func TestUnaggregatedEncodeArrayLenError(t *testing.T) {
Expand All @@ -263,10 +322,10 @@ func TestUnaggregatedEncodeArrayLenError(t *testing.T) {
}

// Assert the error is expected
require.Equal(t, errTestArrayLen, testUnaggregatedEncode(t, encoder, gauge, policies))
require.Equal(t, errTestArrayLen, testUnaggregatedEncodeWithPolicies(t, encoder, gauge, policies))

// Assert re-encoding doesn't change the error
require.Equal(t, errTestArrayLen, testUnaggregatedEncode(t, encoder, gauge, policies))
require.Equal(t, errTestArrayLen, testUnaggregatedEncodeWithPolicies(t, encoder, gauge, policies))
}

func TestUnaggregatedEncoderReset(t *testing.T) {
Expand All @@ -276,8 +335,8 @@ func TestUnaggregatedEncoderReset(t *testing.T) {
encoder := testUnaggregatedEncoder(t).(*unaggregatedEncoder)
baseEncoder := encoder.encoderBase.(*baseEncoder)
baseEncoder.encodeErr = errTestVarint
require.Equal(t, errTestVarint, testUnaggregatedEncode(t, encoder, metric, policies))
require.Equal(t, errTestVarint, testUnaggregatedEncodeWithPolicies(t, encoder, metric, policies))

encoder.Reset(NewBufferedEncoder())
require.NoError(t, testUnaggregatedEncode(t, encoder, metric, policies))
require.NoError(t, testUnaggregatedEncodeWithPolicies(t, encoder, metric, policies))
}
25 changes: 20 additions & 5 deletions protocol/msgpack/unaggregated_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ func (it *unaggregatedIterator) Reset(reader io.Reader) {
it.reset(reader)
}

// func (it *unaggregatedIterator) Metric() unaggregated.MetricUnion {
// return it.metric
// }

func (it *unaggregatedIterator) Value() (unaggregated.MetricUnion, policy.VersionedPolicies) {
return it.metric, it.versionedPolicies
}
Expand Down Expand Up @@ -119,7 +123,7 @@ func (it *unaggregatedIterator) decodeRootObject() bool {
return false
}
switch objType {
case counterWithPoliciesType, batchTimerWithPoliciesType, gaugeWithPoliciesType:
case counterType, counterWithPoliciesType, batchTimerType, batchTimerWithPoliciesType, gaugeType, gaugeWithPoliciesType:
it.decodeMetricWithPolicies(objType)
default:
it.setErr(fmt.Errorf("unrecognized object type %v", objType))
Expand All @@ -134,18 +138,29 @@ func (it *unaggregatedIterator) decodeMetricWithPolicies(objType objectType) {
if !ok {
return
}

switch objType {
case counterWithPoliciesType:
case counterType, counterWithPoliciesType:
it.decodeCounter()
case batchTimerWithPoliciesType:
case batchTimerType, batchTimerWithPoliciesType:
it.decodeBatchTimer()
case gaugeWithPoliciesType:
case gaugeType, gaugeWithPoliciesType:
it.decodeGauge()
default:
it.setErr(fmt.Errorf("unrecognized metric with policies type %v", objType))
return
}
it.decodeVersionedPolicies()

var decodePolicy bool
switch objType {
case counterWithPoliciesType, batchTimerWithPoliciesType, gaugeWithPoliciesType:
decodePolicy = true
}

if decodePolicy {
it.decodeVersionedPolicies()
}

it.skip(numActualFields - numExpectedFields)
}

Expand Down
26 changes: 13 additions & 13 deletions protocol/msgpack/unaggregated_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ func TestUnaggregatedIteratorDecodeNewerVersionThanSupported(t *testing.T) {
enc.encodeNumObjectFields(numFieldsForType(rootObjectType))
enc.encodeObjectType(objType)
}
require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.versionedPolicies))
require.NoError(t, testUnaggregatedEncodeWithPolicies(t, enc, input.metric, input.versionedPolicies))

// Now restore the encode top-level function and encode another counter
enc.encodeRootObjectFn = enc.encodeRootObject
require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.versionedPolicies))
require.NoError(t, testUnaggregatedEncodeWithPolicies(t, enc, input.metric, input.versionedPolicies))

// Check that we skipped the first counter and successfully decoded the second counter
it := testUnaggregatedIterator(t, bytes.NewBuffer(enc.Encoder().Buffer.Bytes()))
Expand All @@ -94,7 +94,7 @@ func TestUnaggregatedIteratorDecodeRootObjectMoreFieldsThanExpected(t *testing.T
enc.encodeNumObjectFields(numFieldsForType(rootObjectType) + 1)
enc.encodeObjectType(objType)
}
testUnaggregatedEncode(t, enc, input.metric, input.versionedPolicies)
testUnaggregatedEncodeWithPolicies(t, enc, input.metric, input.versionedPolicies)
enc.encodeVarint(0)
require.NoError(t, enc.err())

Expand All @@ -117,7 +117,7 @@ func TestUnaggregatedIteratorDecodeCounterWithPoliciesMoreFieldsThanExpected(t *
enc.encodeCounterFn(cp.Counter)
enc.encodeVersionedPoliciesFn(cp.VersionedPolicies)
}
testUnaggregatedEncode(t, enc, input.metric, input.versionedPolicies)
testUnaggregatedEncodeWithPolicies(t, enc, input.metric, input.versionedPolicies)
enc.encodeVarint(0)
require.NoError(t, enc.err())

Expand All @@ -141,7 +141,7 @@ func TestUnaggregatedIteratorDecodeCounterMoreFieldsThanExpected(t *testing.T) {
enc.encodeVarint(int64(c.Value))
enc.encodeVarint(0)
}
require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.versionedPolicies))
require.NoError(t, testUnaggregatedEncodeWithPolicies(t, enc, input.metric, input.versionedPolicies))

it := testUnaggregatedIterator(t, enc.Encoder().Buffer)

Expand All @@ -166,7 +166,7 @@ func TestUnaggregatedIteratorDecodeBatchTimerMoreFieldsThanExpected(t *testing.T
}
enc.encodeVarint(0)
}
require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.versionedPolicies))
require.NoError(t, testUnaggregatedEncodeWithPolicies(t, enc, input.metric, input.versionedPolicies))

it := testUnaggregatedIterator(t, enc.Encoder().Buffer)

Expand All @@ -188,7 +188,7 @@ func TestUnaggregatedIteratorDecodeGaugeMoreFieldsThanExpected(t *testing.T) {
enc.encodeFloat64(g.Value)
enc.encodeVarint(0)
}
require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.versionedPolicies))
require.NoError(t, testUnaggregatedEncodeWithPolicies(t, enc, input.metric, input.versionedPolicies))

it := testUnaggregatedIterator(t, enc.Encoder().Buffer)

Expand All @@ -211,7 +211,7 @@ func TestUnaggregatedIteratorDecodePolicyWithCustomResolution(t *testing.T) {
},
}
enc := testUnaggregatedEncoder(t)
require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.versionedPolicies))
require.NoError(t, testUnaggregatedEncodeWithPolicies(t, enc, input.metric, input.versionedPolicies))

it := testUnaggregatedIterator(t, enc.Encoder().Buffer)

Expand All @@ -234,7 +234,7 @@ func TestUnaggregatedIteratorDecodePolicyWithCustomRetention(t *testing.T) {
},
}
enc := testUnaggregatedEncoder(t)
require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.versionedPolicies))
require.NoError(t, testUnaggregatedEncodeWithPolicies(t, enc, input.metric, input.versionedPolicies))

it := testUnaggregatedIterator(t, enc.Encoder().Buffer)

Expand Down Expand Up @@ -266,7 +266,7 @@ func TestUnaggregatedIteratorDecodePolicyMoreFieldsThanExpected(t *testing.T) {
baseEncoder.encodeRetention(p.Retention)
baseEncoder.encodeVarint(0)
}
require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.versionedPolicies))
require.NoError(t, testUnaggregatedEncodeWithPolicies(t, enc, input.metric, input.versionedPolicies))

it := testUnaggregatedIterator(t, enc.Encoder().Buffer)

Expand Down Expand Up @@ -302,7 +302,7 @@ func TestUnaggregatedIteratorDecodeVersionedPoliciesMoreFieldsThanExpected(t *te
}
enc.encodeVarint(0)
}
require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.versionedPolicies))
require.NoError(t, testUnaggregatedEncodeWithPolicies(t, enc, input.metric, input.versionedPolicies))

it := testUnaggregatedIterator(t, enc.Encoder().Buffer)

Expand All @@ -322,7 +322,7 @@ func TestUnaggregatedIteratorDecodeCounterFewerFieldsThanExpected(t *testing.T)
enc.encodeNumObjectFields(numFieldsForType(counterType) - 1)
enc.encodeID(c.ID)
}
require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.versionedPolicies))
require.NoError(t, testUnaggregatedEncodeWithPolicies(t, enc, input.metric, input.versionedPolicies))

it := testUnaggregatedIterator(t, enc.Encoder().Buffer)

Expand Down Expand Up @@ -363,7 +363,7 @@ func TestUnaggregatedIteratorDecodeInvalidTimeUnit(t *testing.T) {
versionedPolicies: testVersionedPoliciesWithInvalidTimeUnit,
}
enc := testUnaggregatedEncoder(t)
require.NoError(t, testUnaggregatedEncode(t, enc, input.metric, input.versionedPolicies))
require.NoError(t, testUnaggregatedEncodeWithPolicies(t, enc, input.metric, input.versionedPolicies))
it := testUnaggregatedIterator(t, enc.Encoder().Buffer)
validateUnaggregatedDecodeResults(t, it, nil, errors.New("invalid precision unknown"))
}
Loading

0 comments on commit bc9b97a

Please sign in to comment.