Skip to content

Commit

Permalink
fix: use histogram to keep track of request counts (#3240)
Browse files Browse the repository at this point in the history
* fix: use histogram to keep track of request counts

* feat: add tracetest version to telemetry as an attribute
  • Loading branch information
mathnogueira committed Oct 11, 2023
1 parent dcedf80 commit ad07ca4
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 23 deletions.
5 changes: 0 additions & 5 deletions server/executor/worker_metric_middleware.go
Expand Up @@ -19,13 +19,11 @@ type metricWorkerMiddlewareBuilder struct {
}

type metricWorkerMiddleware struct {
requestCounter metric.Int64Counter
latencyHistogram metric.Int64Histogram
next pipeline.StepProcessor[Job]
}

func NewWorkerMetricMiddlewareBuilder(meter metric.Meter) WorkerMiddlewareBuilder {
meter.Int64Counter("tracetest.worker.")
return &metricWorkerMiddlewareBuilder{
meter: meter,
}
Expand All @@ -34,11 +32,9 @@ func NewWorkerMetricMiddlewareBuilder(meter metric.Meter) WorkerMiddlewareBuilde
func (b *metricWorkerMiddlewareBuilder) New(name string, next pipeline.StepProcessor[Job]) pipeline.StepProcessor[Job] {
metricPrefix := fmt.Sprintf("tracetest.worker.%s", name)

requestCounter, _ := b.meter.Int64Counter(fmt.Sprintf("%s.request.count", metricPrefix))
latencyHistogram, _ := b.meter.Int64Histogram(fmt.Sprintf("%s.latency", metricPrefix))

return &metricWorkerMiddleware{
requestCounter: requestCounter,
latencyHistogram: latencyHistogram,
next: next,
}
Expand All @@ -52,7 +48,6 @@ func (m *metricWorkerMiddleware) ProcessItem(ctx context.Context, job Job) {
attribute.String("run_state", string(job.Run.State)),
)

m.requestCounter.Add(ctx, 1, metric.WithAttributeSet(attributeSet))
start := time.Now()
m.next.ProcessItem(ctx, job)
latency := time.Since(start)
Expand Down
6 changes: 1 addition & 5 deletions server/http/middleware/metrics.go
Expand Up @@ -13,7 +13,6 @@ import (
type httpMetricMiddleware struct {
next http.Handler
requestDurationHistogram metric.Int64Histogram
requestCounter metric.Int64Counter
}

func (m *httpMetricMiddleware) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand All @@ -36,7 +35,6 @@ func (m *httpMetricMiddleware) ServeHTTP(w http.ResponseWriter, r *http.Request)
}

m.requestDurationHistogram.Record(r.Context(), duration.Milliseconds(), metric.WithAttributes(metricAttributes...))
m.requestCounter.Add(r.Context(), 1, metric.WithAttributes(metricAttributes...))
}

var _ http.Handler = &httpMetricMiddleware{}
Expand All @@ -57,13 +55,11 @@ func (lrw *responseWriter) WriteHeader(code int) {

func NewMetricMiddleware(meter metric.Meter) mux.MiddlewareFunc {
return func(next http.Handler) http.Handler {
durationHistogram, _ := meter.Int64Histogram("http.server.duration", metric.WithUnit("ms"))
requestCounter, _ := meter.Int64Counter("http.server.request.count")
durationHistogram, _ := meter.Int64Histogram("http.server.latency", metric.WithUnit("ms"))

return &httpMetricMiddleware{
next: next,
requestDurationHistogram: durationHistogram,
requestCounter: requestCounter,
}
}
}
18 changes: 9 additions & 9 deletions server/pkg/pipeline/queue.go
Expand Up @@ -29,11 +29,11 @@ type namedDriver interface {
}

type Queue[T any] struct {
name string
driver QueueDriver[T]
itemProcessor QueueItemProcessor[T]
enqueueCounter metric.Int64Counter
listenCounter metric.Int64Counter
name string
driver QueueDriver[T]
itemProcessor QueueItemProcessor[T]
enqueueHistogram metric.Int64Histogram
listenHistogram metric.Int64Histogram

EnqueuePreprocessorFn func(context.Context, T) T
ListenPreprocessorFn func(context.Context, T) (context.Context, T)
Expand Down Expand Up @@ -62,8 +62,8 @@ func NewQueue[T any](driver QueueDriver[T], itemProcessor QueueItemProcessor[T])
}

func (q *Queue[T]) InitializeMetrics(meter metric.Meter) {
q.enqueueCounter, _ = meter.Int64Counter("messaging.enqueue.count")
q.listenCounter, _ = meter.Int64Counter("messaging.listen.count")
q.enqueueHistogram, _ = meter.Int64Histogram("messaging.enqueue")
q.listenHistogram, _ = meter.Int64Histogram("messaging.listen")
}

func (q *Queue[T]) SetDriver(driver QueueDriver[T]) {
Expand All @@ -85,7 +85,7 @@ func (q Queue[T]) Enqueue(ctx context.Context, item T) {
item = q.EnqueuePreprocessorFn(ctx, item)
}

q.enqueueCounter.Add(ctx, 1, metric.WithAttributes(
q.enqueueHistogram.Record(ctx, 1, metric.WithAttributes(
attribute.String("queue.name", q.name),
))
q.driver.Enqueue(item)
Expand All @@ -107,7 +107,7 @@ func (q Queue[T]) Listen(item T) {
}

q.workerPool.Submit(func() {
q.listenCounter.Add(ctx, 1, metric.WithAttributes(
q.listenHistogram.Record(ctx, 1, metric.WithAttributes(
attribute.String("queue.name", q.name),
))
q.itemProcessor.ProcessItem(ctx, item)
Expand Down
4 changes: 0 additions & 4 deletions server/test/run_repository.go
Expand Up @@ -36,7 +36,6 @@ type RunRepository interface {
type Cache struct {
cache *cache.Cache
instanceID string
readRequestCounter metric.Int64Counter
cacheLatencyHistogram metric.Int64Histogram
}

Expand All @@ -61,13 +60,11 @@ func NewCache(instanceID string, opts ...CacheOption) *Cache {
opt(cacheConfig)
}

readRequestCounter, _ := cacheConfig.meter.Int64Counter("tracetest.cache.request.count")
cacheLatencyHistogram, _ := cacheConfig.meter.Int64Histogram("tracetest.cache.latency")

cache := &Cache{
cache: cache.New(5*time.Minute, 10*time.Minute),
instanceID: instanceID,
readRequestCounter: readRequestCounter,
cacheLatencyHistogram: cacheLatencyHistogram,
}

Expand Down Expand Up @@ -106,7 +103,6 @@ func (c *Cache) Get(ctx context.Context, testID id.ID, runID int) (Run, bool) {
attribute.Bool("hit", found),
)

c.readRequestCounter.Add(ctx, 1, metric.WithAttributeSet(attributeSet))
c.cacheLatencyHistogram.Record(ctx, duration.Milliseconds(), metric.WithAttributeSet(attributeSet))

if !found {
Expand Down

0 comments on commit ad07ca4

Please sign in to comment.