Skip to content

Commit

Permalink
Extend logging middleware to support oneway procedures
Browse files Browse the repository at this point in the history
Extend the current unary-only logging middleware to also support oneway RPCs.
This is a part of T841919.

Next steps: expose the logger in the config shape, modify our internal wrapper
to populate that field, and (later on) build interop wrappers between bark and zap.
  • Loading branch information
Akshay Shah committed Apr 14, 2017
1 parent 61c91fa commit a4e3e0e
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 110 deletions.
5 changes: 4 additions & 1 deletion dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,13 @@ func NewDispatcher(cfg Config) *Dispatcher {
}

func addObservingMiddleware(cfg Config, logger *zap.Logger) Config {
observer := observerware.NewMiddleware(logger, observerware.NewNopContextExtractor())
observer := observerware.New(logger, observerware.NewNopContextExtractor())

cfg.InboundMiddleware.Unary = inboundmiddleware.UnaryChain(observer, cfg.InboundMiddleware.Unary)
cfg.InboundMiddleware.Oneway = inboundmiddleware.OnewayChain(observer, cfg.InboundMiddleware.Oneway)

cfg.OutboundMiddleware.Unary = outboundmiddleware.UnaryChain(observer, cfg.OutboundMiddleware.Unary)
cfg.OutboundMiddleware.Oneway = outboundmiddleware.OnewayChain(observer, cfg.OutboundMiddleware.Oneway)

return cfg
}
Expand Down
15 changes: 13 additions & 2 deletions internal/observerware/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (
"go.uber.org/yarpc/api/transport"
)

type fakeAck string

func (a fakeAck) String() string { return string(a) }

type fakeHandler struct {
err error
}
Expand All @@ -35,7 +39,7 @@ func (h fakeHandler) Handle(_ context.Context, _ *transport.Request, _ transport
return h.err
}

func (h fakeHandler) HandleOneway(_ context.Context, _ *transport.Request, _ transport.ResponseWriter) error {
func (h fakeHandler) HandleOneway(_ context.Context, _ *transport.Request) error {
return h.err
}

Expand All @@ -45,13 +49,20 @@ type fakeOutbound struct {
err error
}

func (o fakeOutbound) Call(_ context.Context, req *transport.Request) (*transport.Response, error) {
func (o fakeOutbound) Call(_ context.Context, _ *transport.Request) (*transport.Response, error) {
if o.err != nil {
return nil, o.err
}
return &transport.Response{}, nil
}

func (o fakeOutbound) CallOneway(_ context.Context, _ *transport.Request) (transport.Ack, error) {
if o.err != nil {
return nil, o.err
}
return fakeAck("ok"), nil
}

func stubTime() func() {
prev := _timeNow
_timeNow = func() time.Time { return time.Time{} }
Expand Down
45 changes: 27 additions & 18 deletions internal/observerware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ type Middleware struct {
extract ContextExtractor
}

// NewMiddleware constructs a Middleware.
func NewMiddleware(logger *zap.Logger, extract ContextExtractor) *Middleware {
// New constructs a Middleware.
func New(logger *zap.Logger, extract ContextExtractor) *Middleware {
return &Middleware{
logger: logger.With(zap.String("rpcType", "unary")),
logger: logger,
extract: extract,
}
}
Expand All @@ -50,34 +50,43 @@ func NewMiddleware(logger *zap.Logger, extract ContextExtractor) *Middleware {
func (m *Middleware) Handle(ctx context.Context, req *transport.Request, w transport.ResponseWriter, h transport.UnaryHandler) error {
start := _timeNow()
err := h.Handle(ctx, req, w)
elapsed := _timeNow().Sub(start)

if ce := m.logger.Check(zap.DebugLevel, "Handled inbound request."); ce != nil {
ce.Write(
m.extract(ctx),
zap.Object("request", req),
zap.Duration("latency", elapsed),
zap.Bool("successful", err == nil),
zap.Error(err),
)
}
m.log(ctx, "Handled inbound request.", "unary", req, _timeNow().Sub(start), err)
return err
}

// Call implements middleware.UnaryOutbound.
func (m *Middleware) Call(ctx context.Context, req *transport.Request, out transport.UnaryOutbound) (*transport.Response, error) {
start := _timeNow()
res, err := out.Call(ctx, req)
elapsed := _timeNow().Sub(start)
m.log(ctx, "Made outbound call.", "unary", req, _timeNow().Sub(start), err)
return res, err
}

if ce := m.logger.Check(zap.DebugLevel, "Made outbound call."); ce != nil {
// HandleOneway implements middleware.OnewayInbound.
func (m *Middleware) HandleOneway(ctx context.Context, req *transport.Request, h transport.OnewayHandler) error {
start := _timeNow()
err := h.HandleOneway(ctx, req)
m.log(ctx, "Handled inbound request.", "oneway", req, _timeNow().Sub(start), err)
return err
}

// CallOneway implements middleware.OnewayOutbound.
func (m *Middleware) CallOneway(ctx context.Context, req *transport.Request, out transport.OnewayOutbound) (transport.Ack, error) {
start := _timeNow()
ack, err := out.CallOneway(ctx, req)
m.log(ctx, "Made outbound call.", "oneway", req, _timeNow().Sub(start), err)
return ack, err
}

func (m *Middleware) log(ctx context.Context, msg, rpcType string, req *transport.Request, elapsed time.Duration, err error) {
if ce := m.logger.Check(zap.DebugLevel, msg); ce != nil {
ce.Write(
m.extract(ctx),
zap.String("rpcType", rpcType),
zap.Object("request", req),
zap.Duration("latency", elapsed),
zap.Bool("successful", err == nil),
zap.Error(err),
m.extract(ctx),
)
}
return res, err
}
158 changes: 69 additions & 89 deletions internal/observerware/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"errors"
"strings"
"testing"
"time"

"go.uber.org/yarpc/api/transport"

Expand All @@ -35,8 +36,9 @@ import (
"go.uber.org/zap/zaptest/observer"
)

var (
_req = &transport.Request{
func TestUnaryInboundMiddleware(t *testing.T) {
defer stubTime()()
req := &transport.Request{
Caller: "caller",
Service: "service",
Encoding: "raw",
Expand All @@ -47,131 +49,109 @@ var (
RoutingDelegate: "routing-delegate",
Body: strings.NewReader("body"),
}
errFailed = errors.New("fail")
)
failed := errors.New("fail")

func TestUnaryInboundMiddleware(t *testing.T) {
defer stubTime()()
tests := []struct {
desc string
handler transport.UnaryHandler
extract ContextExtractor

wantErr bool
desc string
err error // downstream error
wantFields []zapcore.Field
}{
{
desc: "no downstream errors",
handler: fakeHandler{},
extract: NewNopContextExtractor(),
desc: "no downstream errors",
wantFields: []zapcore.Field{
zap.String("rpcType", "unary"),
zap.Skip(),
zap.Object("request", _req),
zap.Object("request", req),
zap.Duration("latency", 0),
zap.Bool("successful", true),
zap.Skip(),
zap.Skip(),
},
},
{
desc: "downstream errors",
extract: NewNopContextExtractor(),
handler: fakeHandler{errFailed},
wantErr: true,
desc: "downstream errors",
err: failed,
wantFields: []zapcore.Field{
zap.String("rpcType", "unary"),
zap.Skip(),
zap.Object("request", _req),
zap.Object("request", req),
zap.Duration("latency", 0),
zap.Bool("successful", false),
zap.Error(errFailed),
zap.Error(failed),
zap.Skip(),
},
},
}

for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
core, logs := observer.New(zapcore.DebugLevel)
mw := NewMiddleware(zap.New(core), tt.extract)
err := mw.Handle(context.Background(), _req, nil /* response writer */, tt.handler)
if tt.wantErr {
core, logs := observer.New(zapcore.DebugLevel)
mw := New(zap.New(core), NewNopContextExtractor())

getLog := func() observer.LoggedEntry {
entries := logs.TakeAll()
require.Equal(t, 1, len(entries), "Unexpected number of logs written.")
e := entries[0]
e.Entry.Time = time.Time{}
return e
}

checkErr := func(err error) {
if tt.err != nil {
assert.Error(t, err, "Expected an error from middleware.")
} else {
assert.NoError(t, err, "Unexpected error from middleware.")
}
require.Equal(t, 1, logs.Len(), "Unexpected number of logs written.")
}

t.Run(tt.desc+", unary inbound", func(t *testing.T) {
err := mw.Handle(context.Background(), req, nil /* response writer */, fakeHandler{tt.err})
checkErr(err)
expected := observer.LoggedEntry{
Entry: zapcore.Entry{
Level: zapcore.DebugLevel,
Message: "Handled inbound request.",
},
Context: tt.wantFields,
Context: append([]zapcore.Field{zap.String("rpcType", "unary")}, tt.wantFields...),
}
assert.Equal(t, expected, logs.AllUntimed()[0], "Unexpected log entry written.")
assert.Equal(t, expected, getLog(), "Unexpected log entry written.")
})
}
}

func TestUnaryOutboundMiddleware(t *testing.T) {
defer stubTime()()
tests := []struct {
desc string
out transport.UnaryOutbound
extract ContextExtractor

wantErr bool
wantFields []zapcore.Field
}{
{
desc: "no downstream errors",
out: fakeOutbound{},
extract: NewNopContextExtractor(),
wantFields: []zapcore.Field{
zap.String("rpcType", "unary"),
zap.Skip(),
zap.Object("request", _req),
zap.Duration("latency", 0),
zap.Bool("successful", true),
zap.Skip(),
},
},
{
desc: "downstream errors",
extract: NewNopContextExtractor(),
out: fakeOutbound{err: errFailed},
wantErr: true,
wantFields: []zapcore.Field{
zap.String("rpcType", "unary"),
zap.Skip(),
zap.Object("request", _req),
zap.Duration("latency", 0),
zap.Bool("successful", false),
zap.Error(errFailed),
},
},
}

for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
core, logs := observer.New(zapcore.DebugLevel)
mw := NewMiddleware(zap.New(core), tt.extract)
res, err := mw.Call(context.Background(), _req, tt.out)
if tt.wantErr {
assert.Nil(t, res, "Expected nil response in error cases.")
assert.Error(t, err, "Expected an error from middleware.")
} else {
assert.NotNil(t, res, "Expected non-nil response in success cases.")
assert.NoError(t, err, "Unexpected error from middleware.")
t.Run(tt.desc+", unary outbound", func(t *testing.T) {
res, err := mw.Call(context.Background(), req, fakeOutbound{err: tt.err})
checkErr(err)
if tt.err == nil {
assert.NotNil(t, res, "Expected non-nil response if call is successful.")
}
expected := observer.LoggedEntry{
Entry: zapcore.Entry{
Level: zapcore.DebugLevel,
Message: "Made outbound call.",
},
Context: append([]zapcore.Field{zap.String("rpcType", "unary")}, tt.wantFields...),
}
assert.Equal(t, expected, getLog(), "Unexpected log entry written.")
})
t.Run(tt.desc+", oneway inbound", func(t *testing.T) {
err := mw.HandleOneway(context.Background(), req, fakeHandler{tt.err})
checkErr(err)
expected := observer.LoggedEntry{
Entry: zapcore.Entry{
Level: zapcore.DebugLevel,
Message: "Handled inbound request.",
},
Context: append([]zapcore.Field{zap.String("rpcType", "oneway")}, tt.wantFields...),
}
assert.Equal(t, expected, getLog(), "Unexpected log entry written.")
})
t.Run(tt.desc+", oneway outbound", func(t *testing.T) {
ack, err := mw.CallOneway(context.Background(), req, fakeOutbound{err: tt.err})
checkErr(err)
if tt.err == nil {
assert.NotNil(t, ack, "Expected non-nil ack if call is successful.")
}
require.Equal(t, 1, logs.Len(), "Unexpected number of logs written.")
expected := observer.LoggedEntry{
Entry: zapcore.Entry{
Level: zapcore.DebugLevel,
Message: "Made outbound call.",
},
Context: tt.wantFields,
Context: append([]zapcore.Field{zap.String("rpcType", "oneway")}, tt.wantFields...),
}
assert.Equal(t, expected, logs.AllUntimed()[0], "Unexpected log entry written.")
assert.Equal(t, expected, getLog(), "Unexpected log entry written.")
})
}
}

0 comments on commit a4e3e0e

Please sign in to comment.