-
Notifications
You must be signed in to change notification settings - Fork 1
/
client_interceptor.go
143 lines (121 loc) · 4.18 KB
/
client_interceptor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package middleware
import (
"context"
"time"
"github.com/leondevpt/wallet/trxservice/pkg/metatext"
"github.com/leondevpt/wallet/trxservice/pkg/setting"
"github.com/apex/log"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc"
)
// ClientInterceptor grpc client wrapper
func ClientTracing() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
var parentCtx opentracing.SpanContext
var spanOpts []opentracing.StartSpanOption
var parentSpan = opentracing.SpanFromContext(ctx)
if parentSpan != nil {
parentCtx = parentSpan.Context()
spanOpts = append(spanOpts, opentracing.ChildOf(parentCtx))
}
spanOpts = append(spanOpts, []opentracing.StartSpanOption{
opentracing.Tag{Key: string(ext.Component), Value: "gRPC"},
ext.SpanKindRPCClient,
}...)
span := setting.Tracer.StartSpan(method, spanOpts...)
defer span.Finish()
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(nil)
}
_ = setting.Tracer.Inject(span.Context(), opentracing.TextMap, metatext.MetadataTextMap{md})
newCtx := opentracing.ContextWithSpan(metadata.NewOutgoingContext(ctx, md), span)
return invoker(newCtx, method, req, reply, cc, opts...)
}
}
func UnaryContextTimeout() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, resp interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx, cancel := defaultContextTimeout(ctx)
if cancel != nil {
defer cancel()
}
return invoker(ctx, method, req, resp, cc, opts...)
}
}
func StreamContextTimeout() grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
ctx, cancel := defaultContextTimeout(ctx)
if cancel != nil {
defer cancel()
}
return streamer(ctx, desc, cc, method, opts...)
}
}
func defaultContextTimeout(ctx context.Context) (context.Context, context.CancelFunc) {
var cancel context.CancelFunc
if _, ok := ctx.Deadline(); !ok {
defaultTimeout := 60 * time.Second
ctx, cancel = context.WithTimeout(ctx, defaultTimeout)
}
return ctx, cancel
}
// ClientInterceptor 客户端拦截器
// https://godoc.org/google.golang.org/grpc#UnaryClientInterceptor
func ClientInterceptor(tracer opentracing.Tracer) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, request, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
//一个RPC调用的服务端的span,和RPC服务客户端的span构成ChildOf关系
var parentCtx opentracing.SpanContext
parentSpan := opentracing.SpanFromContext(ctx)
if parentSpan != nil {
parentCtx = parentSpan.Context()
}
span := tracer.StartSpan(
method,
opentracing.ChildOf(parentCtx),
opentracing.Tag{Key: string(ext.Component), Value: "gRPC Client"},
ext.SpanKindRPCClient,
)
defer span.Finish()
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(nil)
} else {
md = md.Copy()
}
err := tracer.Inject(
span.Context(),
opentracing.TextMap,
MDCarrier{md}, // 自定义 carrier
)
if err != nil {
log.Errorf("inject span error :%v", err.Error())
}
newCtx := metadata.NewOutgoingContext(ctx, md)
err = invoker(newCtx, method, request, reply, cc, opts...)
if err != nil {
log.Errorf("call error : %v", err.Error())
}
return err
}
}
/*
func RequestIDClientInterceptor() grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string, req, resp interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption,
) (err error) {
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.Pairs()
}
value := ctx.Value(trace.RequestID)
if requestID, ok := value.(string); ok && requestID != "" {
md[string(trace.RequestID)] = []string{requestID}
}
return invoker(metadata.NewOutgoingContext(ctx, md), method, req, resp, cc, opts...)
}
}
*/