Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Add tests for encoding and decoding metrics without policies
Browse files Browse the repository at this point in the history
  • Loading branch information
Jerome Froelich committed Feb 11, 2017
1 parent bc9b97a commit c2a655b
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 17 deletions.
5 changes: 5 additions & 0 deletions policy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ var (
emptyPolicy Policy
emptyVersionedPolicies VersionedPolicies

// UninitializedVersionedPolicies ia an uninitialized VersionedPolicies struct
UninitializedVersionedPolicies = VersionedPolicies{
Version: InitPolicyVersion,
}

// DefaultVersionedPolicies are the default versioned policies
DefaultVersionedPolicies = VersionedPolicies{
Version: DefaultPolicyVersion,
Expand Down
3 changes: 3 additions & 0 deletions protocol/msgpack/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ type UnaggregatedIterator interface {
// Next returns true if there are more items to decode
Next() bool

// Metric returns the current metric
Metric() unaggregated.MetricUnion

// Value returns the current metric and applicable policies
Value() (unaggregated.MetricUnion, policy.VersionedPolicies)

Expand Down
36 changes: 26 additions & 10 deletions protocol/msgpack/unaggregated_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ func (it *unaggregatedIterator) Reset(reader io.Reader) {
it.reset(reader)
}

// func (it *unaggregatedIterator) Metric() unaggregated.MetricUnion {
// return it.metric
// }
func (it *unaggregatedIterator) Metric() unaggregated.MetricUnion {
return it.metric
}

func (it *unaggregatedIterator) Value() (unaggregated.MetricUnion, policy.VersionedPolicies) {
return it.metric, it.versionedPolicies
Expand Down Expand Up @@ -103,6 +103,7 @@ func (it *unaggregatedIterator) decodeRootObject() bool {
if it.err() != nil {
return false
}

// If the actual version is higher than supported version, we skip
// the data for this metric and continue to the next
if version > unaggregatedVersion {
Expand All @@ -123,16 +124,34 @@ func (it *unaggregatedIterator) decodeRootObject() bool {
return false
}
switch objType {
case counterType, counterWithPoliciesType, batchTimerType, batchTimerWithPoliciesType, gaugeType, gaugeWithPoliciesType:
case counterType, batchTimerType, gaugeType:
it.decodeMetric(objType)
case counterWithPoliciesType, batchTimerWithPoliciesType, gaugeWithPoliciesType:
it.decodeMetricWithPolicies(objType)
default:
it.setErr(fmt.Errorf("unrecognized object type %v", objType))
}
it.skip(numActualFields - numExpectedFields)

return it.err() == nil
}

func (it *unaggregatedIterator) decodeMetric(objType objectType) {
switch objType {
case counterType:
it.decodeCounter()
case batchTimerType:
it.decodeBatchTimer()
case gaugeType:
it.decodeGauge()
default:
it.setErr(fmt.Errorf("unrecognized metric with policies type %v", objType))
return
}

// set VersionedPolicies to uninitialiezd
it.versionedPolicies = policy.UninitializedVersionedPolicies
}

func (it *unaggregatedIterator) decodeMetricWithPolicies(objType objectType) {
numExpectedFields, numActualFields, ok := it.checkNumFieldsForType(objType)
if !ok {
Expand All @@ -151,14 +170,11 @@ func (it *unaggregatedIterator) decodeMetricWithPolicies(objType objectType) {
return
}

var decodePolicy bool
switch objType {
case counterWithPoliciesType, batchTimerWithPoliciesType, gaugeWithPoliciesType:
decodePolicy = true
}

if decodePolicy {
it.decodeVersionedPolicies()
default:
it.versionedPolicies = policy.UninitializedVersionedPolicies
}

it.skip(numActualFields - numExpectedFields)
Expand Down
62 changes: 55 additions & 7 deletions protocol/msgpack/unaggregated_roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,49 @@ func compareUnaggregatedMetric(t *testing.T, expected unaggregated.MetricUnion,
}
}

func validateUnaggregatedRoundtrip(t *testing.T, inputs ...metricWithPolicies) {
func validateUnaggregatedRoundtrip(t *testing.T, inputs ...unaggregated.MetricUnion) {
encoder := testUnaggregatedEncoder(t)
it := testUnaggregatedIterator(t, nil)
validateUnaggregatedRoundtripWithEncoderAndIterator(t, encoder, it, inputs...)
}

func validateUnaggregatedRoundtripWithPolicies(t *testing.T, inputs ...metricWithPolicies) {
encoder := testUnaggregatedEncoder(t)
it := testUnaggregatedIterator(t, nil)
validateUnaggregatedRoundtripWithPoliciesWithEncoderAndIterator(t, encoder, it, inputs...)
}

func validateUnaggregatedRoundtripWithEncoderAndIterator(
t *testing.T,
encoder UnaggregatedEncoder,
it UnaggregatedIterator,
inputs ...unaggregated.MetricUnion,
) {
var results []unaggregated.MetricUnion

// Encode the batch of metrics
encoder.Reset(NewBufferedEncoder())
for _, input := range inputs {
testUnaggregatedEncode(t, encoder, input)
}

// Decode the batch of metrics
byteStream := bytes.NewBuffer(encoder.Encoder().Bytes())
it.Reset(byteStream)
for it.Next() {
m := it.Metric()
results = append(results, m)
}

// Assert the results match expectations
require.Equal(t, io.EOF, it.Err())
require.Equal(t, len(inputs), len(results))
for i := 0; i < len(inputs); i++ {
compareUnaggregatedMetric(t, inputs[i], results[i])
}
}

func validateUnaggregatedRoundtripWithPoliciesWithEncoderAndIterator(
t *testing.T,
encoder UnaggregatedEncoder,
it UnaggregatedIterator,
Expand Down Expand Up @@ -264,33 +300,45 @@ func validateUnaggregatedRoundtripWithEncoderAndIterator(
}
}

func TestUnaggregatedEncodeDecodeCounter(t *testing.T) {
validateUnaggregatedRoundtrip(t, testCounter)
}

func TestUnaggregatedEncodeDecodeBatchTimer(t *testing.T) {
validateUnaggregatedRoundtrip(t, testBatchTimer)
}

func TestUnaggregatedEncodeDecodeGauge(t *testing.T) {
validateUnaggregatedRoundtrip(t, testGauge)
}

func TestUnaggregatedEncodeDecodeCounterWithDefaultPolicies(t *testing.T) {
validateUnaggregatedRoundtrip(t, metricWithPolicies{
validateUnaggregatedRoundtripWithPolicies(t, metricWithPolicies{
metric: testCounter,
versionedPolicies: policy.DefaultVersionedPolicies,
})
}

func TestUnaggregatedEncodeDecodeBatchTimerWithDefaultPolicies(t *testing.T) {
validateUnaggregatedRoundtrip(t, metricWithPolicies{
validateUnaggregatedRoundtripWithPolicies(t, metricWithPolicies{
metric: testBatchTimer,
versionedPolicies: policy.DefaultVersionedPolicies,
})
}

func TestUnaggregatedEncodeDecodeGaugeWithDefaultPolicies(t *testing.T) {
validateUnaggregatedRoundtrip(t, metricWithPolicies{
validateUnaggregatedRoundtripWithPolicies(t, metricWithPolicies{
metric: testGauge,
versionedPolicies: policy.DefaultVersionedPolicies,
})
}

func TestUnaggregatedEncodeDecodeAllTypesWithDefaultPolicies(t *testing.T) {
validateUnaggregatedRoundtrip(t, testInputWithAllTypesAndDefaultPolicies...)
validateUnaggregatedRoundtripWithPolicies(t, testInputWithAllTypesAndDefaultPolicies...)
}

func TestUnaggregatedEncodeDecodeAllTypesWithCustomPolicies(t *testing.T) {
validateUnaggregatedRoundtrip(t, testInputWithAllTypesAndCustomPolicies...)
validateUnaggregatedRoundtripWithPolicies(t, testInputWithAllTypesAndCustomPolicies...)
}

func TestUnaggregatedEncodeDecodeStress(t *testing.T) {
Expand Down Expand Up @@ -324,6 +372,6 @@ func TestUnaggregatedEncodeDecodeStress(t *testing.T) {
p := allPolicies[rand.Int63n(int64(len(allPolicies)))]
inputs = append(inputs, metricWithPolicies{metric: m, versionedPolicies: p})
}
validateUnaggregatedRoundtripWithEncoderAndIterator(t, encoder, iterator, inputs...)
validateUnaggregatedRoundtripWithPoliciesWithEncoderAndIterator(t, encoder, iterator, inputs...)
}
}

0 comments on commit c2a655b

Please sign in to comment.