Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Add new types to help debug forwarding delay
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Jul 10, 2018
1 parent d92603d commit 59bb8ba
Show file tree
Hide file tree
Showing 12 changed files with 431 additions and 72 deletions.
10 changes: 10 additions & 0 deletions encoding/protobuf/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func resetMetricWithMetadatasProto(pb *metricpb.MetricWithMetadatas) {
resetBatchTimerWithMetadatasProto(pb.BatchTimerWithMetadatas)
resetGaugeWithMetadatasProto(pb.GaugeWithMetadatas)
resetForwardedMetricWithMetadataProto(pb.ForwardedMetricWithMetadata)
resetRawBytesWithConnWriteTime(pb.RawBytesWithConnWriteTime)
}

func resetCounterWithMetadatasProto(pb *metricpb.CounterWithMetadatas) {
Expand Down Expand Up @@ -71,6 +72,14 @@ func resetForwardedMetricWithMetadataProto(pb *metricpb.ForwardedMetricWithMetad
resetForwardMetadata(&pb.Metadata)
}

func resetRawBytesWithConnWriteTime(pb *metricpb.RawBytesWithConnWriteTime) {
if pb == nil {
return
}
pb.Data = pb.Data[:0]
pb.ConnWriteAtNanos = 0
}

func resetCounter(pb *metricpb.Counter) {
if pb == nil {
return
Expand Down Expand Up @@ -121,4 +130,5 @@ func resetForwardMetadata(pb *metricpb.ForwardMetadata) {
pb.Pipeline.Ops = pb.Pipeline.Ops[:0]
pb.SourceId = 0
pb.NumForwardedTimes = 0
pb.FlushAtNanos = 0
}
12 changes: 12 additions & 0 deletions encoding/protobuf/unaggregated_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type unaggregatedEncoder struct {
bm metricpb.BatchTimerWithMetadatas
gm metricpb.GaugeWithMetadatas
fm metricpb.ForwardedMetricWithMetadata
rt metricpb.RawBytesWithConnWriteTime
buf []byte
used int

Expand Down Expand Up @@ -124,6 +125,8 @@ func (enc *unaggregatedEncoder) EncodeMessage(msg encoding.UnaggregatedMessageUn
return enc.encodeGaugeWithMetadatas(msg.GaugeWithMetadatas)
case encoding.ForwardedMetricWithMetadataType:
return enc.encodeForwardedMetricWithMetadata(msg.ForwardedMetricWithMetadata)
case encoding.RawBytesWithConnWriteTimeType:
return enc.encodeRawBytesWithConnWriteTime(msg.RawBytesWithConnWriteTime)
default:
return fmt.Errorf("unknown message type: %v", msg.Type)
}
Expand Down Expand Up @@ -173,6 +176,15 @@ func (enc *unaggregatedEncoder) encodeForwardedMetricWithMetadata(fm aggregated.
return enc.encodeMetricWithMetadatas(mm)
}

func (enc *unaggregatedEncoder) encodeRawBytesWithConnWriteTime(rt aggregated.RawBytesWithConnWriteTime) error {
rt.ToProto(&enc.rt)
mm := metricpb.MetricWithMetadatas{
Type: metricpb.MetricWithMetadatas_RAW_BYTES_WITH_CONN_WRITE_TIME,
RawBytesWithConnWriteTime: &enc.rt,
}
return enc.encodeMetricWithMetadatas(mm)
}

func (enc *unaggregatedEncoder) encodeMetricWithMetadatas(pb metricpb.MetricWithMetadatas) error {
msgSize := pb.Size()
if msgSize > enc.maxMessageSize {
Expand Down
4 changes: 4 additions & 0 deletions encoding/protobuf/unaggregated_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ var (
}),
SourceID: 1234,
NumForwardedTimes: 3,
FlushAtNanos: 1234,
}
testForwardMetadata2 = metadata.ForwardMetadata{
AggregationID: aggregation.MustCompressTypes(aggregation.Sum),
Expand All @@ -218,6 +219,7 @@ var (
}),
SourceID: 897,
NumForwardedTimes: 2,
FlushAtNanos: 5678,
}
testCounter1Proto = metricpb.Counter{
Id: []byte("testCounter1"),
Expand Down Expand Up @@ -428,6 +430,7 @@ var (
},
SourceId: 1234,
NumForwardedTimes: 3,
FlushAtNanos: 1234,
}
testForwardMetadata2Proto = metricpb.ForwardMetadata{
AggregationId: aggregationpb.AggregationID{Id: aggregation.MustCompressTypes(aggregation.Sum)[0]},
Expand Down Expand Up @@ -459,6 +462,7 @@ var (
},
SourceId: 897,
NumForwardedTimes: 2,
FlushAtNanos: 5678,
}
testCmpOpts = []cmp.Option{
cmpopts.EquateEmpty(),
Expand Down
3 changes: 3 additions & 0 deletions encoding/protobuf/unaggregated_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ func (it *unaggregatedIterator) decodeMessage(size int) error {
case metricpb.MetricWithMetadatas_FORWARDED_METRIC_WITH_METADATA:
it.msg.Type = encoding.ForwardedMetricWithMetadataType
it.err = it.msg.ForwardedMetricWithMetadata.FromProto(it.pb.ForwardedMetricWithMetadata)
case metricpb.MetricWithMetadatas_RAW_BYTES_WITH_CONN_WRITE_TIME:
it.msg.Type = encoding.RawBytesWithConnWriteTimeType
it.err = it.msg.RawBytesWithConnWriteTime.FromProto(it.pb.RawBytesWithConnWriteTime)
default:
it.err = fmt.Errorf("unrecognized message type: %v", it.pb.Type)
}
Expand Down
24 changes: 24 additions & 0 deletions encoding/protobuf/unaggregated_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ func TestUnaggregatedIteratorDecodeStress(t *testing.T) {
ForwardedMetric: testForwardedMetric1,
ForwardMetadata: testForwardMetadata1,
},
aggregated.RawBytesWithConnWriteTime{
Data: []byte("foobar"),
ConnWriteAtNanos: 12345,
},
unaggregated.CounterWithMetadatas{
Counter: testCounter2,
StagedMetadatas: testStagedMetadatas1,
Expand All @@ -252,6 +256,10 @@ func TestUnaggregatedIteratorDecodeStress(t *testing.T) {
ForwardedMetric: testForwardedMetric2,
ForwardMetadata: testForwardMetadata1,
},
aggregated.RawBytesWithConnWriteTime{
Data: []byte("baz"),
ConnWriteAtNanos: 67890,
},
unaggregated.CounterWithMetadatas{
Counter: testCounter1,
StagedMetadatas: testStagedMetadatas2,
Expand All @@ -268,6 +276,10 @@ func TestUnaggregatedIteratorDecodeStress(t *testing.T) {
ForwardedMetric: testForwardedMetric1,
ForwardMetadata: testForwardMetadata2,
},
aggregated.RawBytesWithConnWriteTime{
Data: []byte("foobarbaz"),
ConnWriteAtNanos: 13579,
},
unaggregated.CounterWithMetadatas{
Counter: testCounter2,
StagedMetadatas: testStagedMetadatas2,
Expand All @@ -284,6 +296,10 @@ func TestUnaggregatedIteratorDecodeStress(t *testing.T) {
ForwardedMetric: testForwardedMetric2,
ForwardMetadata: testForwardMetadata2,
},
aggregated.RawBytesWithConnWriteTime{
Data: []byte("oqiwjrajsdfadf"),
ConnWriteAtNanos: 24680,
},
}

numIter := 1000
Expand Down Expand Up @@ -312,6 +328,11 @@ func TestUnaggregatedIteratorDecodeStress(t *testing.T) {
Type: encoding.ForwardedMetricWithMetadataType,
ForwardedMetricWithMetadata: input,
}
case aggregated.RawBytesWithConnWriteTime:
msg = encoding.UnaggregatedMessageUnion{
Type: encoding.RawBytesWithConnWriteTimeType,
RawBytesWithConnWriteTime: input,
}
default:
require.Fail(t, "unrecognized type %T", input)
}
Expand Down Expand Up @@ -343,6 +364,9 @@ func TestUnaggregatedIteratorDecodeStress(t *testing.T) {
case aggregated.ForwardedMetricWithMetadata:
require.Equal(t, encoding.ForwardedMetricWithMetadataType, res.Type)
require.True(t, cmp.Equal(expectedRes, res.ForwardedMetricWithMetadata, testCmpOpts...))
case aggregated.RawBytesWithConnWriteTime:
require.Equal(t, encoding.RawBytesWithConnWriteTimeType, res.Type)
require.True(t, cmp.Equal(expectedRes, res.RawBytesWithConnWriteTime, testCmpOpts...))
default:
require.Fail(t, "unknown input type: %T", inputs[j])
}
Expand Down
2 changes: 2 additions & 0 deletions encoding/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
BatchTimerWithMetadatasType
GaugeWithMetadatasType
ForwardedMetricWithMetadataType
RawBytesWithConnWriteTimeType
)

// UnaggregatedMessageUnion is a union of different types of unaggregated messages.
Expand All @@ -49,6 +50,7 @@ type UnaggregatedMessageUnion struct {
BatchTimerWithMetadatas unaggregated.BatchTimerWithMetadatas
GaugeWithMetadatas unaggregated.GaugeWithMetadatas
ForwardedMetricWithMetadata aggregated.ForwardedMetricWithMetadata
RawBytesWithConnWriteTime aggregated.RawBytesWithConnWriteTime
}

// ByteReadScanner is capable of reading and scanning bytes.
Expand Down
Loading

0 comments on commit 59bb8ba

Please sign in to comment.