Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Encoding and decoding logic for forwarded metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Jun 24, 2018
1 parent a5ecee8 commit 48f934c
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 163 deletions.
11 changes: 6 additions & 5 deletions encoding/protobuf/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func resetMetricWithMetadatasProto(pb *metricpb.MetricWithMetadatas) {
resetCounterWithMetadatasProto(pb.CounterWithMetadatas)
resetBatchTimerWithMetadatasProto(pb.BatchTimerWithMetadatas)
resetGaugeWithMetadatasProto(pb.GaugeWithMetadatas)
resetTimedMetricWithForwardMetadataProto(pb.TimedMetricWithForwardMetadata)
resetForwardedMetricWithMetadataProto(pb.ForwardedMetricWithMetadata)
}

func resetCounterWithMetadatasProto(pb *metricpb.CounterWithMetadatas) {
Expand All @@ -63,11 +63,11 @@ func resetGaugeWithMetadatasProto(pb *metricpb.GaugeWithMetadatas) {
resetMetadatas(&pb.Metadatas)
}

func resetTimedMetricWithForwardMetadataProto(pb *metricpb.TimedMetricWithForwardMetadata) {
func resetForwardedMetricWithMetadataProto(pb *metricpb.ForwardedMetricWithMetadata) {
if pb == nil {
return
}
resetTimedMetric(&pb.Metric)
resetForwardedMetric(&pb.Metric)
resetForwardMetadata(&pb.Metadata)
}

Expand Down Expand Up @@ -95,13 +95,14 @@ func resetGauge(pb *metricpb.Gauge) {
pb.Value = 0.0
}

func resetTimedMetric(pb *metricpb.TimedMetric) {
func resetForwardedMetric(pb *metricpb.ForwardedMetric) {
if pb == nil {
return
}
pb.Type = metricpb.MetricType_UNKNOWN
pb.Id = pb.Id[:0]
pb.TimeNanos = 0
pb.Value = 0.0
pb.Values = pb.Values[:0]
}

func resetMetadatas(pb *metricpb.StagedMetadatas) {
Expand Down
42 changes: 23 additions & 19 deletions encoding/protobuf/reset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,17 @@ var (
Id: []byte{},
Value: 0.0,
}
testTimedMetricBeforeResetProto = metricpb.TimedMetric{
Id: []byte("testTimedMetric"),
testForwardedMetricBeforeResetProto = metricpb.ForwardedMetric{
Type: metricpb.MetricType_COUNTER,
Id: []byte("testForwardedMetric"),
TimeNanos: 1234,
Value: 23.234,
Values: []float64{1.23, -4.56},
}
testTimedMetricAfterResetProto = metricpb.TimedMetric{
testForwardedMetricAfterResetProto = metricpb.ForwardedMetric{
Type: metricpb.MetricType_UNKNOWN,
Id: []byte{},
TimeNanos: 0,
Value: 0.0,
Values: []float64{},
}
testMetadatasBeforeResetProto = metricpb.StagedMetadatas{
Metadatas: []metricpb.StagedMetadata{
Expand Down Expand Up @@ -171,25 +173,26 @@ func TestResetMetricWithMetadatasProtoOnlyGauge(t *testing.T) {
require.True(t, cap(input.GaugeWithMetadatas.Metadatas.Metadatas) > 0)
}

func TestResetMetricWithMetadatasProtoOnlyTimedMetric(t *testing.T) {
func TestResetMetricWithMetadatasProtoOnlyForwardedMetric(t *testing.T) {
input := &metricpb.MetricWithMetadatas{
Type: metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_FORWARD_METADATA,
TimedMetricWithForwardMetadata: &metricpb.TimedMetricWithForwardMetadata{
Metric: testTimedMetricBeforeResetProto,
Type: metricpb.MetricWithMetadatas_FORWARDED_METRIC_WITH_METADATA,
ForwardedMetricWithMetadata: &metricpb.ForwardedMetricWithMetadata{
Metric: testForwardedMetricBeforeResetProto,
Metadata: testForwardMetadataBeforeResetProto,
},
}
expected := &metricpb.MetricWithMetadatas{
Type: metricpb.MetricWithMetadatas_UNKNOWN,
TimedMetricWithForwardMetadata: &metricpb.TimedMetricWithForwardMetadata{
Metric: testTimedMetricAfterResetProto,
ForwardedMetricWithMetadata: &metricpb.ForwardedMetricWithMetadata{
Metric: testForwardedMetricAfterResetProto,
Metadata: testForwardMetadataAfterResetProto,
},
}
resetMetricWithMetadatasProto(input)
require.Equal(t, expected, input)
require.True(t, cap(input.TimedMetricWithForwardMetadata.Metric.Id) > 0)
require.True(t, cap(input.TimedMetricWithForwardMetadata.Metadata.Pipeline.Ops) > 0)
require.True(t, cap(input.ForwardedMetricWithMetadata.Metric.Id) > 0)
require.True(t, cap(input.ForwardedMetricWithMetadata.Metric.Values) > 0)
require.True(t, cap(input.ForwardedMetricWithMetadata.Metadata.Pipeline.Ops) > 0)
}

func TestResetMetricWithMetadatasProtoAll(t *testing.T) {
Expand All @@ -207,8 +210,8 @@ func TestResetMetricWithMetadatasProtoAll(t *testing.T) {
Gauge: testGaugeBeforeResetProto,
Metadatas: testMetadatasBeforeResetProto,
},
TimedMetricWithForwardMetadata: &metricpb.TimedMetricWithForwardMetadata{
Metric: testTimedMetricBeforeResetProto,
ForwardedMetricWithMetadata: &metricpb.ForwardedMetricWithMetadata{
Metric: testForwardedMetricBeforeResetProto,
Metadata: testForwardMetadataBeforeResetProto,
},
}
Expand All @@ -226,8 +229,8 @@ func TestResetMetricWithMetadatasProtoAll(t *testing.T) {
Gauge: testGaugeAfterResetProto,
Metadatas: testMetadatasAfterResetProto,
},
TimedMetricWithForwardMetadata: &metricpb.TimedMetricWithForwardMetadata{
Metric: testTimedMetricAfterResetProto,
ForwardedMetricWithMetadata: &metricpb.ForwardedMetricWithMetadata{
Metric: testForwardedMetricAfterResetProto,
Metadata: testForwardMetadataAfterResetProto,
},
}
Expand All @@ -239,6 +242,7 @@ func TestResetMetricWithMetadatasProtoAll(t *testing.T) {
require.True(t, cap(input.BatchTimerWithMetadatas.Metadatas.Metadatas) > 0)
require.True(t, cap(input.GaugeWithMetadatas.Gauge.Id) > 0)
require.True(t, cap(input.GaugeWithMetadatas.Metadatas.Metadatas) > 0)
require.True(t, cap(input.TimedMetricWithForwardMetadata.Metric.Id) > 0)
require.True(t, cap(input.TimedMetricWithForwardMetadata.Metadata.Pipeline.Ops) > 0)
require.True(t, cap(input.ForwardedMetricWithMetadata.Metric.Id) > 0)
require.True(t, cap(input.ForwardedMetricWithMetadata.Metric.Values) > 0)
require.True(t, cap(input.ForwardedMetricWithMetadata.Metadata.Pipeline.Ops) > 0)
}
16 changes: 8 additions & 8 deletions encoding/protobuf/unaggregated_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type unaggregatedEncoder struct {
cm metricpb.CounterWithMetadatas
bm metricpb.BatchTimerWithMetadatas
gm metricpb.GaugeWithMetadatas
tm metricpb.TimedMetricWithForwardMetadata
fm metricpb.ForwardedMetricWithMetadata
buf []byte
used int

Expand Down Expand Up @@ -122,8 +122,8 @@ func (enc *unaggregatedEncoder) EncodeMessage(msg encoding.UnaggregatedMessageUn
return enc.encodeBatchTimerWithMetadatas(msg.BatchTimerWithMetadatas)
case encoding.GaugeWithMetadatasType:
return enc.encodeGaugeWithMetadatas(msg.GaugeWithMetadatas)
case encoding.TimedMetricWithForwardMetadataType:
return enc.encodeTimedMetricWithForwardMetadata(msg.TimedMetricWithForwardMetadata)
case encoding.ForwardedMetricWithMetadataType:
return enc.encodeForwardedMetricWithMetadata(msg.ForwardedMetricWithMetadata)
default:
return fmt.Errorf("unknown message type: %v", msg.Type)
}
Expand Down Expand Up @@ -162,13 +162,13 @@ func (enc *unaggregatedEncoder) encodeGaugeWithMetadatas(gm unaggregated.GaugeWi
return enc.encodeMetricWithMetadatas(mm)
}

func (enc *unaggregatedEncoder) encodeTimedMetricWithForwardMetadata(tm aggregated.MetricWithForwardMetadata) error {
if err := tm.ToProto(&enc.tm); err != nil {
return fmt.Errorf("timed metric with forward metadata proto conversion failed: %v", err)
func (enc *unaggregatedEncoder) encodeForwardedMetricWithMetadata(fm aggregated.ForwardedMetricWithMetadata) error {
if err := fm.ToProto(&enc.fm); err != nil {
return fmt.Errorf("forwarded metric with metadata proto conversion failed: %v", err)
}
mm := metricpb.MetricWithMetadatas{
Type: metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_FORWARD_METADATA,
TimedMetricWithForwardMetadata: &enc.tm,
Type: metricpb.MetricWithMetadatas_FORWARDED_METRIC_WITH_METADATA,
ForwardedMetricWithMetadata: &enc.fm,
}
return enc.encodeMetricWithMetadatas(mm)
}
Expand Down
98 changes: 49 additions & 49 deletions encoding/protobuf/unaggregated_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,17 @@ var (
ID: []byte("testGauge2"),
Value: 234231.345,
}
testTimedMetric1 = aggregated.Metric{
testForwardedMetric1 = aggregated.ForwardedMetric{
Type: metric.CounterType,
ID: []byte("testTimedMetric1"),
ID: []byte("testForwardedMetric1"),
TimeNanos: 8259,
Value: 29234.29934,
Values: []float64{1, 3234, -12},
}
testTimedMetric2 = aggregated.Metric{
testForwardedMetric2 = aggregated.ForwardedMetric{
Type: metric.TimerType,
ID: []byte("testTimedMetric2"),
ID: []byte("testForwardedMetric2"),
TimeNanos: 145668,
Value: 563.875,
Values: []float64{563.875, -23.87},
}
testStagedMetadatas1 = metadata.StagedMetadatas{
{
Expand Down Expand Up @@ -243,17 +243,17 @@ var (
Id: []byte("testGauge2"),
Value: 234231.345,
}
testTimedMetric1Proto = metricpb.TimedMetric{
testForwardedMetric1Proto = metricpb.ForwardedMetric{
Type: metricpb.MetricType_COUNTER,
Id: []byte("testTimedMetric1"),
Id: []byte("testForwardedMetric1"),
TimeNanos: 8259,
Value: 29234.29934,
Values: []float64{1, 3234, -12},
}
testTimedMetric2Proto = metricpb.TimedMetric{
testForwardedMetric2Proto = metricpb.ForwardedMetric{
Type: metricpb.MetricType_TIMER,
Id: []byte("testTimedMetric2"),
Id: []byte("testForwardedMetric2"),
TimeNanos: 145668,
Value: 563.875,
Values: []float64{563.875, -23.87},
}
testStagedMetadatas1Proto = metricpb.StagedMetadatas{
Metadatas: []metricpb.StagedMetadata{
Expand Down Expand Up @@ -646,40 +646,40 @@ func TestUnaggregatedEncoderEncodeGaugeWithMetadatas(t *testing.T) {
}
}

func TestUnaggregatedEncoderEncodeTimedMetricWithForwardMetadata(t *testing.T) {
inputs := []aggregated.MetricWithForwardMetadata{
func TestUnaggregatedEncoderEncodeForwardedMetricWithMetadata(t *testing.T) {
inputs := []aggregated.ForwardedMetricWithMetadata{
{
Metric: testTimedMetric1,
ForwardedMetric: testForwardedMetric1,
ForwardMetadata: testForwardMetadata1,
},
{
Metric: testTimedMetric1,
ForwardedMetric: testForwardedMetric1,
ForwardMetadata: testForwardMetadata2,
},
{
Metric: testTimedMetric2,
ForwardedMetric: testForwardedMetric2,
ForwardMetadata: testForwardMetadata1,
},
{
Metric: testTimedMetric2,
ForwardedMetric: testForwardedMetric2,
ForwardMetadata: testForwardMetadata2,
},
}
expected := []metricpb.TimedMetricWithForwardMetadata{
expected := []metricpb.ForwardedMetricWithMetadata{
{
Metric: testTimedMetric1Proto,
Metric: testForwardedMetric1Proto,
Metadata: testForwardMetadata1Proto,
},
{
Metric: testTimedMetric1Proto,
Metric: testForwardedMetric1Proto,
Metadata: testForwardMetadata2Proto,
},
{
Metric: testTimedMetric2Proto,
Metric: testForwardedMetric2Proto,
Metadata: testForwardMetadata1Proto,
},
{
Metric: testTimedMetric2Proto,
Metric: testForwardedMetric2Proto,
Metadata: testForwardMetadata2Proto,
},
}
Expand All @@ -693,12 +693,12 @@ func TestUnaggregatedEncoderEncodeTimedMetricWithForwardMetadata(t *testing.T) {
enc.(*unaggregatedEncoder).encodeMessageFn = func(pb metricpb.MetricWithMetadatas) error { pbRes = pb; return nil }
for i, input := range inputs {
require.NoError(t, enc.EncodeMessage(encoding.UnaggregatedMessageUnion{
Type: encoding.TimedMetricWithForwardMetadataType,
TimedMetricWithForwardMetadata: input,
Type: encoding.ForwardedMetricWithMetadataType,
ForwardedMetricWithMetadata: input,
}))
expectedProto := metricpb.MetricWithMetadatas{
Type: metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_FORWARD_METADATA,
TimedMetricWithForwardMetadata: &expected[i],
Type: metricpb.MetricWithMetadatas_FORWARDED_METRIC_WITH_METADATA,
ForwardedMetricWithMetadata: &expected[i],
}
expectedMsgSize := expectedProto.Size()
require.Equal(t, expectedMsgSize, sizeRes)
Expand All @@ -720,8 +720,8 @@ func TestUnaggregatedEncoderStress(t *testing.T) {
Gauge: testGauge1,
StagedMetadatas: testStagedMetadatas1,
},
aggregated.MetricWithForwardMetadata{
Metric: testTimedMetric1,
aggregated.ForwardedMetricWithMetadata{
ForwardedMetric: testForwardedMetric1,
ForwardMetadata: testForwardMetadata1,
},
unaggregated.CounterWithMetadatas{
Expand All @@ -736,8 +736,8 @@ func TestUnaggregatedEncoderStress(t *testing.T) {
Gauge: testGauge2,
StagedMetadatas: testStagedMetadatas1,
},
aggregated.MetricWithForwardMetadata{
Metric: testTimedMetric2,
aggregated.ForwardedMetricWithMetadata{
ForwardedMetric: testForwardedMetric2,
ForwardMetadata: testForwardMetadata1,
},
unaggregated.CounterWithMetadatas{
Expand All @@ -752,8 +752,8 @@ func TestUnaggregatedEncoderStress(t *testing.T) {
Gauge: testGauge1,
StagedMetadatas: testStagedMetadatas2,
},
aggregated.MetricWithForwardMetadata{
Metric: testTimedMetric1,
aggregated.ForwardedMetricWithMetadata{
ForwardedMetric: testForwardedMetric1,
ForwardMetadata: testForwardMetadata2,
},
unaggregated.CounterWithMetadatas{
Expand All @@ -768,8 +768,8 @@ func TestUnaggregatedEncoderStress(t *testing.T) {
Gauge: testGauge2,
StagedMetadatas: testStagedMetadatas2,
},
aggregated.MetricWithForwardMetadata{
Metric: testTimedMetric2,
aggregated.ForwardedMetricWithMetadata{
ForwardedMetric: testForwardedMetric2,
ForwardMetadata: testForwardMetadata2,
},
}
Expand All @@ -787,8 +787,8 @@ func TestUnaggregatedEncoderStress(t *testing.T) {
Gauge: testGauge1Proto,
Metadatas: testStagedMetadatas1Proto,
},
metricpb.TimedMetricWithForwardMetadata{
Metric: testTimedMetric1Proto,
metricpb.ForwardedMetricWithMetadata{
Metric: testForwardedMetric1Proto,
Metadata: testForwardMetadata1Proto,
},
metricpb.CounterWithMetadatas{
Expand All @@ -803,8 +803,8 @@ func TestUnaggregatedEncoderStress(t *testing.T) {
Gauge: testGauge2Proto,
Metadatas: testStagedMetadatas1Proto,
},
metricpb.TimedMetricWithForwardMetadata{
Metric: testTimedMetric2Proto,
metricpb.ForwardedMetricWithMetadata{
Metric: testForwardedMetric2Proto,
Metadata: testForwardMetadata1Proto,
},
metricpb.CounterWithMetadatas{
Expand All @@ -819,8 +819,8 @@ func TestUnaggregatedEncoderStress(t *testing.T) {
Gauge: testGauge1Proto,
Metadatas: testStagedMetadatas2Proto,
},
metricpb.TimedMetricWithForwardMetadata{
Metric: testTimedMetric1Proto,
metricpb.ForwardedMetricWithMetadata{
Metric: testForwardedMetric1Proto,
Metadata: testForwardMetadata2Proto,
},
metricpb.CounterWithMetadatas{
Expand All @@ -835,8 +835,8 @@ func TestUnaggregatedEncoderStress(t *testing.T) {
Gauge: testGauge2Proto,
Metadatas: testStagedMetadatas2Proto,
},
metricpb.TimedMetricWithForwardMetadata{
Metric: testTimedMetric2Proto,
metricpb.ForwardedMetricWithMetadata{
Metric: testForwardedMetric2Proto,
Metadata: testForwardMetadata2Proto,
},
}
Expand Down Expand Up @@ -887,15 +887,15 @@ func TestUnaggregatedEncoderStress(t *testing.T) {
Type: metricpb.MetricWithMetadatas_GAUGE_WITH_METADATAS,
GaugeWithMetadatas: &res,
}
case aggregated.MetricWithForwardMetadata:
case aggregated.ForwardedMetricWithMetadata:
msg = encoding.UnaggregatedMessageUnion{
Type: encoding.TimedMetricWithForwardMetadataType,
TimedMetricWithForwardMetadata: input,
Type: encoding.ForwardedMetricWithMetadataType,
ForwardedMetricWithMetadata: input,
}
res := expected[i].(metricpb.TimedMetricWithForwardMetadata)
res := expected[i].(metricpb.ForwardedMetricWithMetadata)
expectedProto = metricpb.MetricWithMetadatas{
Type: metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_FORWARD_METADATA,
TimedMetricWithForwardMetadata: &res,
Type: metricpb.MetricWithMetadatas_FORWARDED_METRIC_WITH_METADATA,
ForwardedMetricWithMetadata: &res,
}
default:
require.Fail(t, "unrecognized type %T", input)
Expand Down
Loading

0 comments on commit 48f934c

Please sign in to comment.