Skip to content

Commit

Permalink
Add stats to observerware
Browse files Browse the repository at this point in the history
Use pally to add metrics to the logging middleware.
  • Loading branch information
Akshay Shah committed Apr 24, 2017
1 parent a52b6f8 commit b551c9b
Show file tree
Hide file tree
Showing 7 changed files with 581 additions and 34 deletions.
56 changes: 51 additions & 5 deletions dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
package yarpc

import (
"context"
"fmt"
"sync"
"time"

"go.uber.org/yarpc/api/middleware"
"go.uber.org/yarpc/api/transport"
Expand All @@ -31,14 +33,21 @@ import (
"go.uber.org/yarpc/internal/inboundmiddleware"
"go.uber.org/yarpc/internal/observerware"
"go.uber.org/yarpc/internal/outboundmiddleware"
"go.uber.org/yarpc/internal/pally"
"go.uber.org/yarpc/internal/request"
intsync "go.uber.org/yarpc/internal/sync"

"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/uber-go/tally"
"go.uber.org/multierr"
"go.uber.org/zap"
)

// Default sleep between pushes to Tally metrics. At some point, we may want
// this to be configurable.
const _tallyPushInterval = 500 * time.Millisecond

// Config specifies the parameters of a new Dispatcher constructed via
// NewDispatcher.
type Config struct {
Expand Down Expand Up @@ -73,6 +82,14 @@ type Config struct {
// ZapLogger provides a logger for the dispatcher. The default logger is a
// no-op.
ZapLogger *zap.Logger

// TallyScope provides a push-based metrics implementation for the
// dispatcher. By default, metrics are collected in memory but not pushed.
TallyScope tally.Scope

// Enabling PrometheusExport tees YARPC's internal metrics to Prometheus's
// package-global default registry.
PrometheusExport bool
}

// Inbounds contains a list of inbound transports. Each inbound transport
Expand Down Expand Up @@ -117,9 +134,28 @@ func NewDispatcher(cfg Config) *Dispatcher {
zap.Namespace("yarpc"), // isolate yarpc's keys
zap.String("dispatcher", cfg.Name),
)
cfg = addObservingMiddleware(cfg, logger)
}

pallyOpts := []pally.RegistryOption{
pally.Labeled(pally.Labels{
"component": "yarpc",
"dispatcher": cfg.Name,
}),
}
if cfg.PrometheusExport {
pallyOpts = append(pallyOpts, pally.Federated(prometheus.DefaultRegisterer))
}
registry := pally.NewRegistry(pallyOpts...)
var stopPush context.CancelFunc
if cfg.TallyScope != nil {
stop, err := registry.Push(cfg.TallyScope, _tallyPushInterval)
if err != nil {
logger.Error("Failed to start pushing metrics to Tally.", zap.Error(err))
}
stopPush = stop
}
cfg = addObservingMiddleware(cfg, logger, registry)

return &Dispatcher{
name: cfg.Name,
table: middleware.ApplyRouteTable(NewMapRouter(cfg.Name), cfg.RouterMiddleware),
Expand All @@ -128,11 +164,13 @@ func NewDispatcher(cfg Config) *Dispatcher {
transports: collectTransports(cfg.Inbounds, cfg.Outbounds),
inboundMiddleware: cfg.InboundMiddleware,
log: logger,
registry: registry,
stopRegistryPush: stopPush,
}
}

func addObservingMiddleware(cfg Config, logger *zap.Logger) Config {
observer := observerware.New(logger, observerware.NewNopContextExtractor())
func addObservingMiddleware(cfg Config, logger *zap.Logger, registry *pally.Registry) Config {
observer := observerware.New(logger, registry, observerware.NewNopContextExtractor())

cfg.InboundMiddleware.Unary = inboundmiddleware.UnaryChain(observer, cfg.InboundMiddleware.Unary)
cfg.InboundMiddleware.Oneway = inboundmiddleware.OnewayChain(observer, cfg.InboundMiddleware.Oneway)
Expand Down Expand Up @@ -225,8 +263,9 @@ type Dispatcher struct {

inboundMiddleware InboundMiddleware

// TODO (shah): add a *pally.Registry too.
log *zap.Logger
log *zap.Logger
registry *pally.Registry
stopRegistryPush context.CancelFunc
}

// Inbounds returns a copy of the list of inbounds for this RPC object.
Expand Down Expand Up @@ -450,6 +489,13 @@ func (d *Dispatcher) Stop() error {
return err
}

// Stop pushing metrics to Tally.
if d.stopRegistryPush != nil {
d.log.Debug("Stopping metrics push loop.")
d.stopRegistryPush()
d.log.Debug("Stopped metrics push loop.")
}

d.log.Debug("Unregistering debug pages.")
removeDispatcherFromDebugPages(d)
d.log.Debug("Unregistered debug pages.")
Expand Down
6 changes: 6 additions & 0 deletions internal/inboundmiddleware/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ import (
func UnaryChain(mw ...middleware.UnaryInbound) middleware.UnaryInbound {
unchained := make([]middleware.UnaryInbound, 0, len(mw))
for _, m := range mw {
if m == nil {
continue
}
if c, ok := m.(unaryChain); ok {
unchained = append(unchained, c...)
continue
Expand Down Expand Up @@ -77,6 +80,9 @@ func (x unaryChainExec) Handle(ctx context.Context, req *transport.Request, resw
func OnewayChain(mw ...middleware.OnewayInbound) middleware.OnewayInbound {
unchained := make([]middleware.OnewayInbound, 0, len(mw))
for _, m := range mw {
if m == nil {
continue
}
if c, ok := m.(onewayChain); ok {
unchained = append(unchained, c...)
continue
Expand Down
102 changes: 102 additions & 0 deletions internal/observerware/call.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package observerware

import (
"context"
"time"

"go.uber.org/yarpc/api/transport"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

type call struct {
edge *edge
extract ContextExtractor

started time.Time
ctx context.Context
req *transport.Request
rpcType transport.Type
inbound bool
}

func (c call) End(err error, isApplicationError bool) {
elapsed := _timeNow().Sub(c.started)
c.endLogs(elapsed, err, isApplicationError)
c.endStats(elapsed, err, isApplicationError)
}

func (c call) endLogs(elapsed time.Duration, err error, isApplicationError bool) {
msg := "Handled inbound request."
if !c.inbound {
msg = "Made outbound call."
}
if ce := c.edge.logger.Check(zap.DebugLevel, msg); ce != nil {
fields := make([]zapcore.Field, 0, 6)
fields = append(fields, zap.String("rpcType", c.rpcType.String()))
fields = append(fields, zap.Object("request", c.req))
fields = append(fields, zap.Duration("latency", elapsed))
fields = append(fields, zap.Bool("successful", err == nil && !isApplicationError))
fields = append(fields, c.extract(c.ctx))
if isApplicationError {
fields = append(fields, zap.String("error", "application_error"))
} else {
fields = append(fields, zap.Error(err))
}
ce.Write(fields...)
}
}

func (c call) endStats(elapsed time.Duration, err error, isApplicationError bool) {
// TODO: We need a much better way to distinguish between caller and server
// errors. See T855583.
c.edge.calls.Inc()
if err == nil && !isApplicationError {
c.edge.successes.Inc()
c.edge.latencies.Observe(elapsed)
return
}
// For now, assume that all application errors are the caller's fault.
if isApplicationError {
c.edge.callerErrLatencies.Observe(elapsed)
if counter, err := c.edge.callerFailures.Get("application_error"); err != nil {
counter.Inc()
}
return
}
// Bad request errors are the caller's fault.
if transport.IsBadRequestError(err) {
c.edge.callerErrLatencies.Observe(elapsed)
if counter, err := c.edge.callerFailures.Get("bad_request"); err != nil {
counter.Inc()
}
return
}
// For now, assume that all other errors are the server's fault.
c.edge.serverErrLatencies.Observe(elapsed)
if transport.IsUnexpectedError(err) {
if counter, err := c.edge.serverFailures.Get("bad_request"); err != nil {
counter.Inc()
}
}
}
Loading

0 comments on commit b551c9b

Please sign in to comment.