Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Aggregation for timed metrics (#154)
Browse files Browse the repository at this point in the history
  • Loading branch information
cw9 committed Sep 27, 2018
1 parent aa0a63a commit 71992c1
Show file tree
Hide file tree
Showing 64 changed files with 3,441 additions and 762 deletions.
12 changes: 7 additions & 5 deletions aggregator/aggregation_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,17 @@ import (
)

type aggregationKey struct {
aggregationID aggregation.ID
storagePolicy policy.StoragePolicy
pipeline applied.Pipeline
numForwardedTimes int
aggregationID aggregation.ID
storagePolicy policy.StoragePolicy
pipeline applied.Pipeline
numForwardedTimes int
idPrefixSuffixType IDPrefixSuffixType
}

func (k aggregationKey) Equal(other aggregationKey) bool {
return k.aggregationID == other.aggregationID &&
k.storagePolicy == other.storagePolicy &&
k.pipeline.Equal(other.pipeline) &&
k.numForwardedTimes == other.numForwardedTimes
k.numForwardedTimes == other.numForwardedTimes &&
k.idPrefixSuffixType == other.idPrefixSuffixType
}
20 changes: 17 additions & 3 deletions aggregator/aggregation_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,28 @@ func TestAggregationKeyEqual(t *testing.T) {
},
{
a: aggregationKey{
aggregationID: aggregation.DefaultID,
storagePolicy: policy.NewStoragePolicy(10*time.Second, xtime.Second, 48*time.Hour),
aggregationID: aggregation.DefaultID,
storagePolicy: policy.NewStoragePolicy(10*time.Second, xtime.Second, 48*time.Hour),
idPrefixSuffixType: WithPrefixWithSuffix,
},
b: aggregationKey{
aggregationID: aggregation.DefaultID,
storagePolicy: policy.NewStoragePolicy(10*time.Second, xtime.Second, 48*time.Hour),
idPrefixSuffixType: WithPrefixWithSuffix,
},
expected: true,
},
{
a: aggregationKey{
aggregationID: aggregation.DefaultID,
storagePolicy: policy.NewStoragePolicy(10*time.Second, xtime.Second, 48*time.Hour),
idPrefixSuffixType: NoPrefixNoSuffix,
},
b: aggregationKey{
aggregationID: aggregation.DefaultID,
storagePolicy: policy.NewStoragePolicy(10*time.Second, xtime.Second, 48*time.Hour),
},
expected: true,
expected: false,
},
{
a: aggregationKey{
Expand Down
60 changes: 60 additions & 0 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ type Aggregator interface {
// AddUntimed adds an untimed metric with staged metadatas.
AddUntimed(metric unaggregated.MetricUnion, metas metadata.StagedMetadatas) error

// AddTimed adds a timed metric with metadata.
AddTimed(metric aggregated.Metric, metadata metadata.TimedMetadata) error

// AddForwarded adds a forwarded metric with metadata.
AddForwarded(metric aggregated.ForwardedMetric, metadata metadata.ForwardMetadata) error

Expand Down Expand Up @@ -183,6 +186,25 @@ func (agg *aggregator) AddUntimed(
return nil
}

func (agg *aggregator) AddTimed(
metric aggregated.Metric,
metadata metadata.TimedMetadata,
) error {
callStart := agg.nowFn()
agg.metrics.timed.Inc(1)
shard, err := agg.shardFor(metric.ID)
if err != nil {
agg.metrics.addTimed.ReportError(err)
return err
}
if err = shard.AddTimed(metric, metadata); err != nil {
agg.metrics.addTimed.ReportError(err)
return err
}
agg.metrics.addTimed.ReportSuccess(agg.nowFn().Sub(callStart))
return nil
}

func (agg *aggregator) AddForwarded(
metric aggregated.ForwardedMetric,
metadata metadata.ForwardMetadata,
Expand Down Expand Up @@ -662,6 +684,39 @@ func (m *aggregatorAddUntimedMetrics) ReportError(err error) {
m.aggregatorAddMetricMetrics.ReportError(err)
}

type aggregatorAddTimedMetrics struct {
aggregatorAddMetricMetrics

tooFarInTheFuture tally.Counter
tooFarInThePast tally.Counter
}

func newAggregatorAddTimedMetrics(
scope tally.Scope,
samplingRate float64,
) aggregatorAddTimedMetrics {
return aggregatorAddTimedMetrics{
aggregatorAddMetricMetrics: newAggregatorAddMetricMetrics(scope, samplingRate),
tooFarInTheFuture: scope.Tagged(map[string]string{
"reason": "too-far-in-the-future",
}).Counter("errors"),
tooFarInThePast: scope.Tagged(map[string]string{
"reason": "too-far-in-the-past",
}).Counter("errors"),
}
}

func (m *aggregatorAddTimedMetrics) ReportError(err error) {
switch err {
case errTooFarInTheFuture:
m.tooFarInTheFuture.Inc(1)
case errTooFarInThePast:
m.tooFarInThePast.Inc(1)
default:
m.aggregatorAddMetricMetrics.ReportError(err)
}
}

type latencyBucketKey struct {
resolution time.Duration
numForwardedTimes int
Expand Down Expand Up @@ -837,7 +892,9 @@ type aggregatorMetrics struct {
timerBatches tally.Counter
gauges tally.Counter
forwarded tally.Counter
timed tally.Counter
addUntimed aggregatorAddUntimedMetrics
addTimed aggregatorAddTimedMetrics
addForwarded aggregatorAddForwardedMetrics
placement aggregatorPlacementMetrics
shards aggregatorShardsMetrics
Expand All @@ -851,6 +908,7 @@ func newAggregatorMetrics(
maxAllowedForwardingDelayFn MaxAllowedForwardingDelayFn,
) aggregatorMetrics {
addUntimedScope := scope.SubScope("addUntimed")
addTimedScope := scope.SubScope("addTimed")
addForwardedScope := scope.SubScope("addForwarded")
placementScope := scope.SubScope("placement")
shardsScope := scope.SubScope("shards")
Expand All @@ -862,7 +920,9 @@ func newAggregatorMetrics(
timerBatches: scope.Counter("timer-batches"),
gauges: scope.Counter("gauges"),
forwarded: scope.Counter("forwarded"),
timed: scope.Counter("timed"),
addUntimed: newAggregatorAddUntimedMetrics(addUntimedScope, samplingRate),
addTimed: newAggregatorAddTimedMetrics(addTimedScope, samplingRate),
addForwarded: newAggregatorAddForwardedMetrics(addForwardedScope, samplingRate, maxAllowedForwardingDelayFn),
placement: newAggregatorPlacementMetrics(placementScope),
shards: newAggregatorShardsMetrics(shardsScope),
Expand Down
Loading

0 comments on commit 71992c1

Please sign in to comment.