Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Reset protobuf before reuse
Browse files Browse the repository at this point in the history
  • Loading branch information
Chao Wang committed Sep 29, 2018
1 parent e1a7251 commit 5c6aed3
Show file tree
Hide file tree
Showing 3 changed files with 231 additions and 0 deletions.
27 changes: 27 additions & 0 deletions encoding/protobuf/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func resetMetricWithMetadatasProto(pb *metricpb.MetricWithMetadatas) {
resetBatchTimerWithMetadatasProto(pb.BatchTimerWithMetadatas)
resetGaugeWithMetadatasProto(pb.GaugeWithMetadatas)
resetForwardedMetricWithMetadataProto(pb.ForwardedMetricWithMetadata)
resetTimedMetricWithMetadataProto(pb.TimedMetricWithMetadata)
}

func resetCounterWithMetadatasProto(pb *metricpb.CounterWithMetadatas) {
Expand Down Expand Up @@ -71,6 +72,14 @@ func resetForwardedMetricWithMetadataProto(pb *metricpb.ForwardedMetricWithMetad
resetForwardMetadata(&pb.Metadata)
}

func resetTimedMetricWithMetadataProto(pb *metricpb.TimedMetricWithMetadata) {
if pb == nil {
return
}
resetTimedMetric(&pb.Metric)
resetTimedMetadata(&pb.Metadata)
}

func resetCounter(pb *metricpb.Counter) {
if pb == nil {
return
Expand Down Expand Up @@ -105,6 +114,16 @@ func resetForwardedMetric(pb *metricpb.ForwardedMetric) {
pb.Values = pb.Values[:0]
}

func resetTimedMetric(pb *metricpb.TimedMetric) {
if pb == nil {
return
}
pb.Type = metricpb.MetricType_UNKNOWN
pb.Id = pb.Id[:0]
pb.TimeNanos = 0
pb.Value = 0
}

func resetMetadatas(pb *metricpb.StagedMetadatas) {
if pb == nil {
return
Expand All @@ -122,3 +141,11 @@ func resetForwardMetadata(pb *metricpb.ForwardMetadata) {
pb.SourceId = 0
pb.NumForwardedTimes = 0
}

func resetTimedMetadata(pb *metricpb.TimedMetadata) {
if pb == nil {
return
}
pb.AggregationId.Reset()
pb.StoragePolicy.Reset()
}
142 changes: 142 additions & 0 deletions encoding/protobuf/unaggregated_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,18 @@ var (
TimeNanos: 145668,
Values: []float64{563.875, -23.87},
}
testTimedMetric1 = aggregated.Metric{
Type: metric.CounterType,
ID: []byte("testTimedMetric1"),
TimeNanos: 8259,
Value: 3234,
}
testTimedMetric2 = aggregated.Metric{
Type: metric.GaugeType,
ID: []byte("testTimedMetric2"),
TimeNanos: 82590,
Value: 0,
}
testStagedMetadatas1 = metadata.StagedMetadatas{
{
CutoverNanos: 1234,
Expand Down Expand Up @@ -219,6 +231,14 @@ var (
SourceID: 897,
NumForwardedTimes: 2,
}
testTimedMetadata1 = metadata.TimedMetadata{
AggregationID: aggregation.DefaultID,
StoragePolicy: policy.NewStoragePolicy(time.Minute, xtime.Minute, 12*time.Hour),
}
testTimedMetadata2 = metadata.TimedMetadata{
AggregationID: aggregation.MustCompressTypes(aggregation.Sum),
StoragePolicy: policy.NewStoragePolicy(10*time.Second, xtime.Second, 6*time.Hour),
}
testCounter1Proto = metricpb.Counter{
Id: []byte("testCounter1"),
Value: 123,
Expand Down Expand Up @@ -255,6 +275,18 @@ var (
TimeNanos: 145668,
Values: []float64{563.875, -23.87},
}
testTimedMetric1Proto = metricpb.TimedMetric{
Type: metricpb.MetricType_COUNTER,
Id: []byte("testTimedMetric1"),
TimeNanos: 8259,
Value: 3234,
}
testTimedMetric2Proto = metricpb.TimedMetric{
Type: metricpb.MetricType_GAUGE,
Id: []byte("testTimedMetric2"),
TimeNanos: 82590,
Value: 0,
}
testStagedMetadatas1Proto = metricpb.StagedMetadatas{
Metadatas: []metricpb.StagedMetadata{
{
Expand Down Expand Up @@ -460,6 +492,30 @@ var (
SourceId: 897,
NumForwardedTimes: 2,
}
testTimedMetadata1Proto = metricpb.TimedMetadata{
AggregationId: aggregationpb.AggregationID{Id: 0},
StoragePolicy: policypb.StoragePolicy{
Resolution: &policypb.Resolution{
WindowSize: time.Minute.Nanoseconds(),
Precision: time.Minute.Nanoseconds(),
},
Retention: &policypb.Retention{
Period: (12 * time.Hour).Nanoseconds(),
},
},
}
testTimedMetadata2Proto = metricpb.TimedMetadata{
AggregationId: aggregationpb.AggregationID{Id: aggregation.MustCompressTypes(aggregation.Sum)[0]},
StoragePolicy: policypb.StoragePolicy{
Resolution: &policypb.Resolution{
WindowSize: 10 * time.Second.Nanoseconds(),
Precision: time.Second.Nanoseconds(),
},
Retention: &policypb.Retention{
Period: (6 * time.Hour).Nanoseconds(),
},
},
}
testCmpOpts = []cmp.Option{
cmpopts.EquateEmpty(),
cmp.AllowUnexported(policy.StoragePolicy{}),
Expand Down Expand Up @@ -706,6 +762,66 @@ func TestUnaggregatedEncoderEncodeForwardedMetricWithMetadata(t *testing.T) {
}
}

func TestUnaggregatedEncoderEncodeTimedMetricWithMetadata(t *testing.T) {
inputs := []aggregated.TimedMetricWithMetadata{
{
Metric: testTimedMetric1,
TimedMetadata: testTimedMetadata1,
},
{
Metric: testTimedMetric1,
TimedMetadata: testTimedMetadata2,
},
{
Metric: testTimedMetric2,
TimedMetadata: testTimedMetadata1,
},
{
Metric: testTimedMetric2,
TimedMetadata: testTimedMetadata2,
},
}
expected := []metricpb.TimedMetricWithMetadata{
{
Metric: testTimedMetric1Proto,
Metadata: testTimedMetadata1Proto,
},
{
Metric: testTimedMetric1Proto,
Metadata: testTimedMetadata2Proto,
},
{
Metric: testTimedMetric2Proto,
Metadata: testTimedMetadata1Proto,
},
{
Metric: testTimedMetric2Proto,
Metadata: testTimedMetadata2Proto,
},
}

var (
sizeRes int
pbRes metricpb.MetricWithMetadatas
)
enc := NewUnaggregatedEncoder(NewUnaggregatedOptions())
enc.(*unaggregatedEncoder).encodeMessageSizeFn = func(size int) { sizeRes = size }
enc.(*unaggregatedEncoder).encodeMessageFn = func(pb metricpb.MetricWithMetadatas) error { pbRes = pb; return nil }
for i, input := range inputs {
require.NoError(t, enc.EncodeMessage(encoding.UnaggregatedMessageUnion{
Type: encoding.TimedMetricWithMetadataType,
TimedMetricWithMetadata: input,
}))
expectedProto := metricpb.MetricWithMetadatas{
Type: metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_METADATA,
TimedMetricWithMetadata: &expected[i],
}
expectedMsgSize := expectedProto.Size()
require.Equal(t, expectedMsgSize, sizeRes)
require.Equal(t, expectedProto, pbRes)
}
}

func TestUnaggregatedEncoderStress(t *testing.T) {
inputs := []interface{}{
unaggregated.CounterWithMetadatas{
Expand All @@ -724,6 +840,10 @@ func TestUnaggregatedEncoderStress(t *testing.T) {
ForwardedMetric: testForwardedMetric1,
ForwardMetadata: testForwardMetadata1,
},
aggregated.TimedMetricWithMetadata{
Metric: testTimedMetric1,
TimedMetadata: testTimedMetadata1,
},
unaggregated.CounterWithMetadatas{
Counter: testCounter2,
StagedMetadatas: testStagedMetadatas1,
Expand Down Expand Up @@ -772,6 +892,10 @@ func TestUnaggregatedEncoderStress(t *testing.T) {
ForwardedMetric: testForwardedMetric2,
ForwardMetadata: testForwardMetadata2,
},
aggregated.TimedMetricWithMetadata{
Metric: testTimedMetric2,
TimedMetadata: testTimedMetadata2,
},
}

expected := []interface{}{
Expand All @@ -791,6 +915,10 @@ func TestUnaggregatedEncoderStress(t *testing.T) {
Metric: testForwardedMetric1Proto,
Metadata: testForwardMetadata1Proto,
},
metricpb.TimedMetricWithMetadata{
Metric: testTimedMetric1Proto,
Metadata: testTimedMetadata1Proto,
},
metricpb.CounterWithMetadatas{
Counter: testCounter2Proto,
Metadatas: testStagedMetadatas1Proto,
Expand Down Expand Up @@ -839,6 +967,10 @@ func TestUnaggregatedEncoderStress(t *testing.T) {
Metric: testForwardedMetric2Proto,
Metadata: testForwardMetadata2Proto,
},
metricpb.TimedMetricWithMetadata{
Metric: testTimedMetric2Proto,
Metadata: testTimedMetadata2Proto,
},
}

var (
Expand Down Expand Up @@ -897,6 +1029,16 @@ func TestUnaggregatedEncoderStress(t *testing.T) {
Type: metricpb.MetricWithMetadatas_FORWARDED_METRIC_WITH_METADATA,
ForwardedMetricWithMetadata: &res,
}
case aggregated.TimedMetricWithMetadata:
msg = encoding.UnaggregatedMessageUnion{
Type: encoding.TimedMetricWithMetadataType,
TimedMetricWithMetadata: input,
}
res := expected[i].(metricpb.TimedMetricWithMetadata)
expectedProto = metricpb.MetricWithMetadatas{
Type: metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_METADATA,
TimedMetricWithMetadata: &res,
}
default:
require.Fail(t, "unrecognized type %T", input)
}
Expand Down
62 changes: 62 additions & 0 deletions encoding/protobuf/unaggregated_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,52 @@ func TestUnaggregatedIteratorDecodeForwardedMetricWithMetadata(t *testing.T) {
require.Equal(t, len(inputs), i)
}

func TestUnaggregatedIteratorDecodeTimedMetricWithMetadata(t *testing.T) {
inputs := []aggregated.TimedMetricWithMetadata{
{
Metric: testTimedMetric1,
TimedMetadata: testTimedMetadata1,
},
{
Metric: testTimedMetric2,
TimedMetadata: testTimedMetadata1,
},
{
Metric: testTimedMetric1,
TimedMetadata: testTimedMetadata2,
},
{
Metric: testTimedMetric2,
TimedMetadata: testTimedMetadata2,
},
}

enc := NewUnaggregatedEncoder(NewUnaggregatedOptions())
for _, input := range inputs {
require.NoError(t, enc.EncodeMessage(encoding.UnaggregatedMessageUnion{
Type: encoding.TimedMetricWithMetadataType,
TimedMetricWithMetadata: input,
}))
}
dataBuf := enc.Relinquish()
defer dataBuf.Close()

var (
i int
stream = bytes.NewReader(dataBuf.Bytes())
)
it := NewUnaggregatedIterator(stream, NewUnaggregatedOptions())
defer it.Close()
for it.Next() {
res := it.Current()
require.Equal(t, encoding.TimedMetricWithMetadataType, res.Type)
require.Equal(t, inputs[i], res.TimedMetricWithMetadata)
i++
}
require.Equal(t, io.EOF, it.Err())
require.Equal(t, len(inputs), i)
}

func TestUnaggregatedIteratorDecodeStress(t *testing.T) {
inputs := []interface{}{
unaggregated.CounterWithMetadatas{
Expand All @@ -236,6 +282,10 @@ func TestUnaggregatedIteratorDecodeStress(t *testing.T) {
ForwardedMetric: testForwardedMetric1,
ForwardMetadata: testForwardMetadata1,
},
aggregated.TimedMetricWithMetadata{
Metric: testTimedMetric1,
TimedMetadata: testTimedMetadata1,
},
unaggregated.CounterWithMetadatas{
Counter: testCounter2,
StagedMetadatas: testStagedMetadatas1,
Expand Down Expand Up @@ -284,6 +334,10 @@ func TestUnaggregatedIteratorDecodeStress(t *testing.T) {
ForwardedMetric: testForwardedMetric2,
ForwardMetadata: testForwardMetadata2,
},
aggregated.TimedMetricWithMetadata{
Metric: testTimedMetric2,
TimedMetadata: testTimedMetadata2,
},
}

numIter := 1000
Expand Down Expand Up @@ -312,6 +366,11 @@ func TestUnaggregatedIteratorDecodeStress(t *testing.T) {
Type: encoding.ForwardedMetricWithMetadataType,
ForwardedMetricWithMetadata: input,
}
case aggregated.TimedMetricWithMetadata:
msg = encoding.UnaggregatedMessageUnion{
Type: encoding.TimedMetricWithMetadataType,
TimedMetricWithMetadata: input,
}
default:
require.Fail(t, "unrecognized type %T", input)
}
Expand Down Expand Up @@ -343,6 +402,9 @@ func TestUnaggregatedIteratorDecodeStress(t *testing.T) {
case aggregated.ForwardedMetricWithMetadata:
require.Equal(t, encoding.ForwardedMetricWithMetadataType, res.Type)
require.True(t, cmp.Equal(expectedRes, res.ForwardedMetricWithMetadata, testCmpOpts...))
case aggregated.TimedMetricWithMetadata:
require.Equal(t, encoding.TimedMetricWithMetadataType, res.Type)
require.True(t, cmp.Equal(expectedRes, res.TimedMetricWithMetadata, testCmpOpts...))
default:
require.Fail(t, "unknown input type: %T", inputs[j])
}
Expand Down

0 comments on commit 5c6aed3

Please sign in to comment.