-
Notifications
You must be signed in to change notification settings - Fork 568
/
client.go
129 lines (118 loc) · 4.22 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// Package client contains GRPC client interceptors for logging.
package client
import (
"context"
"io"
"runtime/trace"
"strings"
"sync/atomic"
"github.com/pachyderm/pachyderm/v2/src/internal/log"
"github.com/pachyderm/pachyderm/v2/src/internal/pctx"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/proto"
)
type loggingStream struct {
grpc.ClientStream
done func(...log.Field)
desc *grpc.StreamDesc
task *trace.Task
closedSend atomic.Bool
}
var _ grpc.ClientStream = new(loggingStream)
func (s *loggingStream) RecvMsg(m any) error {
trace.Logf(s.Context(), "grpc client", "grpc client recv %T", m)
if err := s.ClientStream.RecvMsg(m); err != nil {
if err == io.EOF {
s.done(log.Metadata("trailer", s.Trailer()))
return err //nolint:wrapcheck
}
log.Debug(s.Context(), "stream ended unexpectedly", zap.Error(err))
s.done(zap.Error(err), log.Metadata("trailer", s.Trailer()))
return err //nolint:wrapcheck
}
var field log.Field
if p, ok := m.(proto.Message); ok {
field = log.Proto("reply", p)
} else {
field = zap.Any("reply", m)
}
log.Debug(s.Context(), "received message", field)
if !s.desc.ServerStreams && s.closedSend.Load() {
// When the server sends a unary response, the generated client typically only calls
// CloseAndRecv, which means they never call RecvMsg in a way that returns io.EOF to
// indiciate the end of the stream. To handle this case, we consider the stream
// done if we CloseSend'd and this not a streaming server call.
s.done(log.Metadata("trailer", s.Trailer()))
}
return nil
}
func (s *loggingStream) SendMsg(m any) (retErr error) {
trace.Logf(s.Context(), "grpc client", "grpc client send %T", m)
var field log.Field
if p, ok := m.(proto.Message); ok {
field = log.Proto("request", p)
} else {
field = zap.Any("request", m)
}
if err := s.ClientStream.SendMsg(m); err != nil {
log.Debug(s.Context(), "error sending message", zap.Error(err), field)
return err //nolint:wrapcheck
}
log.Debug(s.Context(), "sent message", field)
return nil
}
func (s *loggingStream) CloseSend() error {
trace.Log(s.Context(), "grpc client", "grpc client CloseSend")
err := s.ClientStream.CloseSend()
log.Debug(s.Context(), "send side of stream closed", zap.Error(err))
s.closedSend.Store(true)
return err //nolint:wrapcheck
}
func LogStream(rctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
ctx, done := log.SpanContext(pctx.Child(rctx, "grpcClient.stream", pctx.WithOptions(zap.WithCaller(false))), strings.TrimPrefix(method, "/"))
ctx, task := trace.NewTask(ctx, "grpc client stream"+method)
defer task.End()
underlying, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
return underlying, err
}
stream := &loggingStream{
ClientStream: underlying,
desc: desc,
done: done,
task: task,
}
log.Debug(ctx, "stream started", log.OutgoingMetadata(ctx))
go func() {
if md, err := stream.Header(); err != nil {
log.Debug(ctx, "problem getting server headers", zap.Error(err))
} else {
log.Debug(ctx, "server headers", log.Metadata("headers", md))
}
}()
return stream, nil
}
var _ grpc.StreamClientInterceptor = LogStream
func LogUnary(rctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
headers, trailer := make(metadata.MD), make(metadata.MD)
opts = append(opts, grpc.Header(&headers), grpc.Trailer(&trailer))
ctx, done := log.SpanContext(pctx.Child(rctx, "grpcClient", pctx.WithOptions(zap.WithCaller(false))), strings.TrimPrefix(method, "/"))
var reqF, replyF log.Field
if p, ok := req.(proto.Message); ok {
reqF = log.Proto("request", p)
} else {
reqF = zap.Any("request", req)
}
log.Debug(ctx, "request", reqF, log.OutgoingMetadata(ctx))
err := invoker(ctx, method, req, reply, cc, opts...)
if p, ok := reply.(proto.Message); ok {
replyF = log.Proto("reply", p)
} else {
replyF = zap.Any("reply", reply)
}
done(zap.Error(err), log.Metadata("headers", headers), log.Metadata("trailer", trailer), replyF)
return err
}
var _ grpc.UnaryClientInterceptor = LogUnary