Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Encode forwarded metrics (#189)
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Jun 25, 2018
1 parent 3b29579 commit ac91954
Show file tree
Hide file tree
Showing 17 changed files with 694 additions and 356 deletions.
13 changes: 7 additions & 6 deletions encoding/protobuf/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func resetMetricWithMetadatasProto(pb *metricpb.MetricWithMetadatas) {
resetCounterWithMetadatasProto(pb.CounterWithMetadatas)
resetBatchTimerWithMetadatasProto(pb.BatchTimerWithMetadatas)
resetGaugeWithMetadatasProto(pb.GaugeWithMetadatas)
resetTimedMetricWithForwardMetadataProto(pb.TimedMetricWithForwardMetadata)
resetForwardedMetricWithMetadataProto(pb.ForwardedMetricWithMetadata)
}

func resetCounterWithMetadatasProto(pb *metricpb.CounterWithMetadatas) {
Expand All @@ -63,11 +63,11 @@ func resetGaugeWithMetadatasProto(pb *metricpb.GaugeWithMetadatas) {
resetMetadatas(&pb.Metadatas)
}

func resetTimedMetricWithForwardMetadataProto(pb *metricpb.TimedMetricWithForwardMetadata) {
func resetForwardedMetricWithMetadataProto(pb *metricpb.ForwardedMetricWithMetadata) {
if pb == nil {
return
}
resetTimedMetric(&pb.Metric)
resetForwardedMetric(&pb.Metric)
resetForwardMetadata(&pb.Metadata)
}

Expand Down Expand Up @@ -95,13 +95,14 @@ func resetGauge(pb *metricpb.Gauge) {
pb.Value = 0.0
}

func resetTimedMetric(pb *metricpb.TimedMetric) {
func resetForwardedMetric(pb *metricpb.ForwardedMetric) {
if pb == nil {
return
}
pb.Type = metricpb.MetricType_UNKNOWN
pb.Id = pb.Id[:0]
pb.TimeNanos = 0
pb.Value = 0.0
pb.Values = pb.Values[:0]
}

func resetMetadatas(pb *metricpb.StagedMetadatas) {
Expand All @@ -118,6 +119,6 @@ func resetForwardMetadata(pb *metricpb.ForwardMetadata) {
pb.AggregationId.Reset()
pb.StoragePolicy.Reset()
pb.Pipeline.Ops = pb.Pipeline.Ops[:0]
pb.SourceId = pb.SourceId[:0]
pb.SourceId = 0
pb.NumForwardedTimes = 0
}
46 changes: 25 additions & 21 deletions encoding/protobuf/reset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,17 @@ var (
Id: []byte{},
Value: 0.0,
}
testTimedMetricBeforeResetProto = metricpb.TimedMetric{
Id: []byte("testTimedMetric"),
testForwardedMetricBeforeResetProto = metricpb.ForwardedMetric{
Type: metricpb.MetricType_COUNTER,
Id: []byte("testForwardedMetric"),
TimeNanos: 1234,
Value: 23.234,
Values: []float64{1.23, -4.56},
}
testTimedMetricAfterResetProto = metricpb.TimedMetric{
testForwardedMetricAfterResetProto = metricpb.ForwardedMetric{
Type: metricpb.MetricType_UNKNOWN,
Id: []byte{},
TimeNanos: 0,
Value: 0.0,
Values: []float64{},
}
testMetadatasBeforeResetProto = metricpb.StagedMetadatas{
Metadatas: []metricpb.StagedMetadata{
Expand Down Expand Up @@ -92,14 +94,14 @@ var (
},
},
},
SourceId: []byte("testForwardSourceBeforeReset"),
SourceId: 342,
NumForwardedTimes: 23,
}
testForwardMetadataAfterResetProto = metricpb.ForwardMetadata{
Pipeline: pipelinepb.AppliedPipeline{
Ops: []pipelinepb.AppliedPipelineOp{},
},
SourceId: []byte{},
SourceId: 0,
NumForwardedTimes: 0,
}
)
Expand Down Expand Up @@ -171,25 +173,26 @@ func TestResetMetricWithMetadatasProtoOnlyGauge(t *testing.T) {
require.True(t, cap(input.GaugeWithMetadatas.Metadatas.Metadatas) > 0)
}

func TestResetMetricWithMetadatasProtoOnlyTimedMetric(t *testing.T) {
func TestResetMetricWithMetadatasProtoOnlyForwardedMetric(t *testing.T) {
input := &metricpb.MetricWithMetadatas{
Type: metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_FORWARD_METADATA,
TimedMetricWithForwardMetadata: &metricpb.TimedMetricWithForwardMetadata{
Metric: testTimedMetricBeforeResetProto,
Type: metricpb.MetricWithMetadatas_FORWARDED_METRIC_WITH_METADATA,
ForwardedMetricWithMetadata: &metricpb.ForwardedMetricWithMetadata{
Metric: testForwardedMetricBeforeResetProto,
Metadata: testForwardMetadataBeforeResetProto,
},
}
expected := &metricpb.MetricWithMetadatas{
Type: metricpb.MetricWithMetadatas_UNKNOWN,
TimedMetricWithForwardMetadata: &metricpb.TimedMetricWithForwardMetadata{
Metric: testTimedMetricAfterResetProto,
ForwardedMetricWithMetadata: &metricpb.ForwardedMetricWithMetadata{
Metric: testForwardedMetricAfterResetProto,
Metadata: testForwardMetadataAfterResetProto,
},
}
resetMetricWithMetadatasProto(input)
require.Equal(t, expected, input)
require.True(t, cap(input.TimedMetricWithForwardMetadata.Metric.Id) > 0)
require.True(t, cap(input.TimedMetricWithForwardMetadata.Metadata.Pipeline.Ops) > 0)
require.True(t, cap(input.ForwardedMetricWithMetadata.Metric.Id) > 0)
require.True(t, cap(input.ForwardedMetricWithMetadata.Metric.Values) > 0)
require.True(t, cap(input.ForwardedMetricWithMetadata.Metadata.Pipeline.Ops) > 0)
}

func TestResetMetricWithMetadatasProtoAll(t *testing.T) {
Expand All @@ -207,8 +210,8 @@ func TestResetMetricWithMetadatasProtoAll(t *testing.T) {
Gauge: testGaugeBeforeResetProto,
Metadatas: testMetadatasBeforeResetProto,
},
TimedMetricWithForwardMetadata: &metricpb.TimedMetricWithForwardMetadata{
Metric: testTimedMetricBeforeResetProto,
ForwardedMetricWithMetadata: &metricpb.ForwardedMetricWithMetadata{
Metric: testForwardedMetricBeforeResetProto,
Metadata: testForwardMetadataBeforeResetProto,
},
}
Expand All @@ -226,8 +229,8 @@ func TestResetMetricWithMetadatasProtoAll(t *testing.T) {
Gauge: testGaugeAfterResetProto,
Metadatas: testMetadatasAfterResetProto,
},
TimedMetricWithForwardMetadata: &metricpb.TimedMetricWithForwardMetadata{
Metric: testTimedMetricAfterResetProto,
ForwardedMetricWithMetadata: &metricpb.ForwardedMetricWithMetadata{
Metric: testForwardedMetricAfterResetProto,
Metadata: testForwardMetadataAfterResetProto,
},
}
Expand All @@ -239,6 +242,7 @@ func TestResetMetricWithMetadatasProtoAll(t *testing.T) {
require.True(t, cap(input.BatchTimerWithMetadatas.Metadatas.Metadatas) > 0)
require.True(t, cap(input.GaugeWithMetadatas.Gauge.Id) > 0)
require.True(t, cap(input.GaugeWithMetadatas.Metadatas.Metadatas) > 0)
require.True(t, cap(input.TimedMetricWithForwardMetadata.Metric.Id) > 0)
require.True(t, cap(input.TimedMetricWithForwardMetadata.Metadata.Pipeline.Ops) > 0)
require.True(t, cap(input.ForwardedMetricWithMetadata.Metric.Id) > 0)
require.True(t, cap(input.ForwardedMetricWithMetadata.Metric.Values) > 0)
require.True(t, cap(input.ForwardedMetricWithMetadata.Metadata.Pipeline.Ops) > 0)
}
16 changes: 8 additions & 8 deletions encoding/protobuf/unaggregated_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type unaggregatedEncoder struct {
cm metricpb.CounterWithMetadatas
bm metricpb.BatchTimerWithMetadatas
gm metricpb.GaugeWithMetadatas
tm metricpb.TimedMetricWithForwardMetadata
fm metricpb.ForwardedMetricWithMetadata
buf []byte
used int

Expand Down Expand Up @@ -122,8 +122,8 @@ func (enc *unaggregatedEncoder) EncodeMessage(msg encoding.UnaggregatedMessageUn
return enc.encodeBatchTimerWithMetadatas(msg.BatchTimerWithMetadatas)
case encoding.GaugeWithMetadatasType:
return enc.encodeGaugeWithMetadatas(msg.GaugeWithMetadatas)
case encoding.TimedMetricWithForwardMetadataType:
return enc.encodeTimedMetricWithForwardMetadata(msg.TimedMetricWithForwardMetadata)
case encoding.ForwardedMetricWithMetadataType:
return enc.encodeForwardedMetricWithMetadata(msg.ForwardedMetricWithMetadata)
default:
return fmt.Errorf("unknown message type: %v", msg.Type)
}
Expand Down Expand Up @@ -162,13 +162,13 @@ func (enc *unaggregatedEncoder) encodeGaugeWithMetadatas(gm unaggregated.GaugeWi
return enc.encodeMetricWithMetadatas(mm)
}

func (enc *unaggregatedEncoder) encodeTimedMetricWithForwardMetadata(tm aggregated.MetricWithForwardMetadata) error {
if err := tm.ToProto(&enc.tm); err != nil {
return fmt.Errorf("timed metric with forward metadata proto conversion failed: %v", err)
func (enc *unaggregatedEncoder) encodeForwardedMetricWithMetadata(fm aggregated.ForwardedMetricWithMetadata) error {
if err := fm.ToProto(&enc.fm); err != nil {
return fmt.Errorf("forwarded metric with metadata proto conversion failed: %v", err)
}
mm := metricpb.MetricWithMetadatas{
Type: metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_FORWARD_METADATA,
TimedMetricWithForwardMetadata: &enc.tm,
Type: metricpb.MetricWithMetadatas_FORWARDED_METRIC_WITH_METADATA,
ForwardedMetricWithMetadata: &enc.fm,
}
return enc.encodeMetricWithMetadatas(mm)
}
Expand Down
Loading

0 comments on commit ac91954

Please sign in to comment.