Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Add types to support timed metrics aggregation (#203)
Browse files Browse the repository at this point in the history
  • Loading branch information
cw9 committed Sep 24, 2018
1 parent dcfe68f commit e1a7251
Show file tree
Hide file tree
Showing 11 changed files with 602 additions and 89 deletions.
18 changes: 16 additions & 2 deletions encoding/protobuf/unaggregated_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type unaggregatedEncoder struct {
bm metricpb.BatchTimerWithMetadatas
gm metricpb.GaugeWithMetadatas
fm metricpb.ForwardedMetricWithMetadata
tm metricpb.TimedMetricWithMetadata
buf []byte
used int

Expand Down Expand Up @@ -124,6 +125,8 @@ func (enc *unaggregatedEncoder) EncodeMessage(msg encoding.UnaggregatedMessageUn
return enc.encodeGaugeWithMetadatas(msg.GaugeWithMetadatas)
case encoding.ForwardedMetricWithMetadataType:
return enc.encodeForwardedMetricWithMetadata(msg.ForwardedMetricWithMetadata)
case encoding.TimedMetricWithMetadataType:
return enc.encodeTimedMetricWithMetadata(msg.TimedMetricWithMetadata)
default:
return fmt.Errorf("unknown message type: %v", msg.Type)
}
Expand All @@ -145,7 +148,7 @@ func (enc *unaggregatedEncoder) encodeBatchTimerWithMetadatas(bm unaggregated.Ba
return fmt.Errorf("batch timer with metadatas proto conversion failed: %v", err)
}
mm := metricpb.MetricWithMetadatas{
Type: metricpb.MetricWithMetadatas_BATCH_TIMER_WITH_METADATAS,
Type: metricpb.MetricWithMetadatas_BATCH_TIMER_WITH_METADATAS,
BatchTimerWithMetadatas: &enc.bm,
}
return enc.encodeMetricWithMetadatas(mm)
Expand All @@ -167,12 +170,23 @@ func (enc *unaggregatedEncoder) encodeForwardedMetricWithMetadata(fm aggregated.
return fmt.Errorf("forwarded metric with metadata proto conversion failed: %v", err)
}
mm := metricpb.MetricWithMetadatas{
Type: metricpb.MetricWithMetadatas_FORWARDED_METRIC_WITH_METADATA,
Type: metricpb.MetricWithMetadatas_FORWARDED_METRIC_WITH_METADATA,
ForwardedMetricWithMetadata: &enc.fm,
}
return enc.encodeMetricWithMetadatas(mm)
}

func (enc *unaggregatedEncoder) encodeTimedMetricWithMetadata(tm aggregated.TimedMetricWithMetadata) error {
if err := tm.ToProto(&enc.tm); err != nil {
return fmt.Errorf("timed metric with metadata proto conversion failed: %v", err)
}
mm := metricpb.MetricWithMetadatas{
Type: metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_METADATA,
TimedMetricWithMetadata: &enc.tm,
}
return enc.encodeMetricWithMetadatas(mm)
}

func (enc *unaggregatedEncoder) encodeMetricWithMetadatas(pb metricpb.MetricWithMetadatas) error {
msgSize := pb.Size()
if msgSize > enc.maxMessageSize {
Expand Down
3 changes: 3 additions & 0 deletions encoding/protobuf/unaggregated_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ func (it *unaggregatedIterator) decodeMessage(size int) error {
case metricpb.MetricWithMetadatas_FORWARDED_METRIC_WITH_METADATA:
it.msg.Type = encoding.ForwardedMetricWithMetadataType
it.err = it.msg.ForwardedMetricWithMetadata.FromProto(it.pb.ForwardedMetricWithMetadata)
case metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_METADATA:
it.msg.Type = encoding.TimedMetricWithMetadataType
it.err = it.msg.TimedMetricWithMetadata.FromProto(it.pb.TimedMetricWithMetadata)
default:
it.err = fmt.Errorf("unrecognized message type: %v", it.pb.Type)
}
Expand Down
2 changes: 2 additions & 0 deletions encoding/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
BatchTimerWithMetadatasType
GaugeWithMetadatasType
ForwardedMetricWithMetadataType
TimedMetricWithMetadataType
)

// UnaggregatedMessageUnion is a union of different types of unaggregated messages.
Expand All @@ -49,6 +50,7 @@ type UnaggregatedMessageUnion struct {
BatchTimerWithMetadatas unaggregated.BatchTimerWithMetadatas
GaugeWithMetadatas unaggregated.GaugeWithMetadatas
ForwardedMetricWithMetadata aggregated.ForwardedMetricWithMetadata
TimedMetricWithMetadata aggregated.TimedMetricWithMetadata
}

// ByteReadScanner is capable of reading and scanning bytes.
Expand Down
Loading

0 comments on commit e1a7251

Please sign in to comment.