Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Encode/decode timed metric with forward metadata (#159)
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Jun 7, 2018
1 parent 5f1ccc3 commit 3fc41f2
Show file tree
Hide file tree
Showing 18 changed files with 1,977 additions and 172 deletions.
29 changes: 29 additions & 0 deletions encoding/protobuf/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func resetMetricWithMetadatasProto(pb *metricpb.MetricWithMetadatas) {
resetCounterWithMetadatasProto(pb.CounterWithMetadatas)
resetBatchTimerWithMetadatasProto(pb.BatchTimerWithMetadatas)
resetGaugeWithMetadatasProto(pb.GaugeWithMetadatas)
resetTimedMetricWithForwardMetadataProto(pb.TimedMetricWithForwardMetadata)
}

func resetCounterWithMetadatasProto(pb *metricpb.CounterWithMetadatas) {
Expand All @@ -62,6 +63,14 @@ func resetGaugeWithMetadatasProto(pb *metricpb.GaugeWithMetadatas) {
resetMetadatas(&pb.Metadatas)
}

func resetTimedMetricWithForwardMetadataProto(pb *metricpb.TimedMetricWithForwardMetadata) {
if pb == nil {
return
}
resetTimedMetric(&pb.Metric)
resetForwardMetadata(&pb.Metadata)
}

func resetCounter(pb *metricpb.Counter) {
if pb == nil {
return
Expand All @@ -86,9 +95,29 @@ func resetGauge(pb *metricpb.Gauge) {
pb.Value = 0.0
}

func resetTimedMetric(pb *metricpb.TimedMetric) {
if pb == nil {
return
}
pb.Id = pb.Id[:0]
pb.TimeNanos = 0
pb.Value = 0.0
}

func resetMetadatas(pb *metricpb.StagedMetadatas) {
if pb == nil {
return
}
pb.Metadatas = pb.Metadatas[:0]
}

func resetForwardMetadata(pb *metricpb.ForwardMetadata) {
if pb == nil {
return
}
pb.AggregationId.Reset()
pb.StoragePolicy.Reset()
pb.Pipeline.Ops = pb.Pipeline.Ops[:0]
pb.SourceId = 0
pb.NumForwardedTimes = 0
}
67 changes: 67 additions & 0 deletions encoding/protobuf/reset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ package protobuf
import (
"testing"

"github.com/m3db/m3metrics/generated/proto/aggregationpb"
"github.com/m3db/m3metrics/generated/proto/metricpb"
"github.com/m3db/m3metrics/generated/proto/pipelinepb"

"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -53,6 +55,16 @@ var (
Id: []byte{},
Value: 0.0,
}
testTimedMetricBeforeResetProto = metricpb.TimedMetric{
Id: []byte("testTimedMetric"),
TimeNanos: 1234,
Value: 23.234,
}
testTimedMetricAfterResetProto = metricpb.TimedMetric{
Id: []byte{},
TimeNanos: 0,
Value: 0.0,
}
testMetadatasBeforeResetProto = metricpb.StagedMetadatas{
Metadatas: []metricpb.StagedMetadata{
{
Expand All @@ -66,6 +78,30 @@ var (
testMetadatasAfterResetProto = metricpb.StagedMetadatas{
Metadatas: []metricpb.StagedMetadata{},
}
testForwardMetadataBeforeResetProto = metricpb.ForwardMetadata{
Pipeline: pipelinepb.AppliedPipeline{
Ops: []pipelinepb.AppliedPipelineOp{
{
Type: pipelinepb.AppliedPipelineOp_ROLLUP,
Rollup: &pipelinepb.AppliedRollupOp{
Id: []byte("foo"),
AggregationId: aggregationpb.AggregationID{
Id: 12,
},
},
},
},
},
SourceId: 342,
NumForwardedTimes: 23,
}
testForwardMetadataAfterResetProto = metricpb.ForwardMetadata{
Pipeline: pipelinepb.AppliedPipeline{
Ops: []pipelinepb.AppliedPipelineOp{},
},
SourceId: 0,
NumForwardedTimes: 0,
}
)

func TestResetMetricWithMetadatasProtoNilProto(t *testing.T) {
Expand Down Expand Up @@ -135,6 +171,27 @@ func TestResetMetricWithMetadatasProtoOnlyGauge(t *testing.T) {
require.True(t, cap(input.GaugeWithMetadatas.Metadatas.Metadatas) > 0)
}

func TestResetMetricWithMetadatasProtoOnlyTimedMetric(t *testing.T) {
input := &metricpb.MetricWithMetadatas{
Type: metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_FORWARD_METADATA,
TimedMetricWithForwardMetadata: &metricpb.TimedMetricWithForwardMetadata{
Metric: testTimedMetricBeforeResetProto,
Metadata: testForwardMetadataBeforeResetProto,
},
}
expected := &metricpb.MetricWithMetadatas{
Type: metricpb.MetricWithMetadatas_UNKNOWN,
TimedMetricWithForwardMetadata: &metricpb.TimedMetricWithForwardMetadata{
Metric: testTimedMetricAfterResetProto,
Metadata: testForwardMetadataAfterResetProto,
},
}
resetMetricWithMetadatasProto(input)
require.Equal(t, expected, input)
require.True(t, cap(input.TimedMetricWithForwardMetadata.Metric.Id) > 0)
require.True(t, cap(input.TimedMetricWithForwardMetadata.Metadata.Pipeline.Ops) > 0)
}

func TestResetMetricWithMetadatasProtoAll(t *testing.T) {
input := &metricpb.MetricWithMetadatas{
Type: metricpb.MetricWithMetadatas_GAUGE_WITH_METADATAS,
Expand All @@ -150,6 +207,10 @@ func TestResetMetricWithMetadatasProtoAll(t *testing.T) {
Gauge: testGaugeBeforeResetProto,
Metadatas: testMetadatasBeforeResetProto,
},
TimedMetricWithForwardMetadata: &metricpb.TimedMetricWithForwardMetadata{
Metric: testTimedMetricBeforeResetProto,
Metadata: testForwardMetadataBeforeResetProto,
},
}
expected := &metricpb.MetricWithMetadatas{
Type: metricpb.MetricWithMetadatas_UNKNOWN,
Expand All @@ -165,6 +226,10 @@ func TestResetMetricWithMetadatasProtoAll(t *testing.T) {
Gauge: testGaugeAfterResetProto,
Metadatas: testMetadatasAfterResetProto,
},
TimedMetricWithForwardMetadata: &metricpb.TimedMetricWithForwardMetadata{
Metric: testTimedMetricAfterResetProto,
Metadata: testForwardMetadataAfterResetProto,
},
}
resetMetricWithMetadatasProto(input)
require.Equal(t, expected, input)
Expand All @@ -174,4 +239,6 @@ func TestResetMetricWithMetadatasProtoAll(t *testing.T) {
require.True(t, cap(input.BatchTimerWithMetadatas.Metadatas.Metadatas) > 0)
require.True(t, cap(input.GaugeWithMetadatas.Gauge.Id) > 0)
require.True(t, cap(input.GaugeWithMetadatas.Metadatas.Metadatas) > 0)
require.True(t, cap(input.TimedMetricWithForwardMetadata.Metric.Id) > 0)
require.True(t, cap(input.TimedMetricWithForwardMetadata.Metadata.Pipeline.Ops) > 0)
}
15 changes: 15 additions & 0 deletions encoding/protobuf/unaggregated_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/m3db/m3metrics/encoding"
"github.com/m3db/m3metrics/generated/proto/metricpb"
"github.com/m3db/m3metrics/metric/aggregated"
"github.com/m3db/m3metrics/metric/unaggregated"
"github.com/m3db/m3x/pool"
)
Expand Down Expand Up @@ -65,6 +66,7 @@ type unaggregatedEncoder struct {
cm metricpb.CounterWithMetadatas
bm metricpb.BatchTimerWithMetadatas
gm metricpb.GaugeWithMetadatas
tm metricpb.TimedMetricWithForwardMetadata
buf []byte
used int

Expand Down Expand Up @@ -120,6 +122,8 @@ func (enc *unaggregatedEncoder) EncodeMessage(msg encoding.UnaggregatedMessageUn
return enc.encodeBatchTimerWithMetadatas(msg.BatchTimerWithMetadatas)
case encoding.GaugeWithMetadatasType:
return enc.encodeGaugeWithMetadatas(msg.GaugeWithMetadatas)
case encoding.TimedMetricWithForwardMetadataType:
return enc.encodeTimedMetricWithForwardMetadata(msg.TimedMetricWithForwardMetadata)
default:
return fmt.Errorf("unknown message type: %v", msg.Type)
}
Expand Down Expand Up @@ -158,6 +162,17 @@ func (enc *unaggregatedEncoder) encodeGaugeWithMetadatas(gm unaggregated.GaugeWi
return enc.encodeMetricWithMetadatas(mm)
}

func (enc *unaggregatedEncoder) encodeTimedMetricWithForwardMetadata(tm aggregated.MetricWithForwardMetadata) error {
if err := tm.ToProto(&enc.tm); err != nil {
return fmt.Errorf("timed metric with forward metadata proto conversion failed: %v", err)
}
mm := metricpb.MetricWithMetadatas{
Type: metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_FORWARD_METADATA,
TimedMetricWithForwardMetadata: &enc.tm,
}
return enc.encodeMetricWithMetadatas(mm)
}

func (enc *unaggregatedEncoder) encodeMetricWithMetadatas(pb metricpb.MetricWithMetadatas) error {
msgSize := pb.Size()
if msgSize > enc.maxMessageSize {
Expand Down
Loading

0 comments on commit 3fc41f2

Please sign in to comment.