generated from things-labs/cicd-go-template
-
Notifications
You must be signed in to change notification settings - Fork 0
/
transport.go
157 lines (132 loc) · 3.91 KB
/
transport.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package grpc
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"github.com/things-go/dyn/transport"
)
var _ transport.Transporter = (*Transport)(nil)
// Transport is a gRPC transport.
type Transport struct {
fullMethod string
clientIp string
requestHeader header
responseHeader header
}
// Kind returns the transport kind.
func (tr *Transport) Kind() transport.Kind {
return transport.GRPC
}
// FullPath Service full method or path
func (tr *Transport) FullPath() string {
return tr.fullMethod
}
// ClientIp client ip
func (tr *Transport) ClientIp() string {
return tr.clientIp
}
// RequestHeader returns the request header.
func (tr *Transport) RequestHeader() transport.Header {
return tr.requestHeader
}
// ResponseHeader returns the reply header.
func (tr *Transport) ResponseHeader() transport.Header {
return tr.responseHeader
}
type header metadata.MD
// Len returns the number of items in header.
func (h header) Len() int { return metadata.MD(h).Len() }
// Get returns the value associated with the passed key.
func (h header) Get(key string) string {
vals := metadata.MD(h).Get(key)
if len(vals) > 0 {
return vals[0]
}
return ""
}
// Add adds the key, value pair to the header.
func (h header) Add(key, value string) { metadata.MD(h).Append(key, value) }
// Set stores the key-value pair.
func (h header) Set(key string, value string) { metadata.MD(h).Set(key, value) }
// Append adds the values to key k, not overwriting what was already stored at
// that key.
//
// k is converted to lowercase before storing in header.
func (h header) Append(key string, vals ...string) {
metadata.MD(h).Append(key, vals...)
}
// Delete removes the values for a given key k which is converted to lowercase
// before removing it from header.
func (h header) Delete(key string) { metadata.MD(h).Delete(key) }
// Keys lists the keys stored in this carrier.
func (h header) Keys() []string {
keys := make([]string, 0, len(h))
for k := range metadata.MD(h) {
keys = append(keys, k)
}
return keys
}
// Clone returns a copy of h or nil if h is nil.
func (h header) Clone() transport.Header { return transport.Header(header(metadata.MD(h).Copy())) }
// UnaryServerInterceptor is a gRPC unary server interceptor
func UnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
md, _ := metadata.FromIncomingContext(ctx)
p, _ := peer.FromContext(ctx)
clientIp := ""
if p != nil {
clientIp = p.Addr.String()
}
responseHeader := metadata.MD{}
ctx = transport.WithValueTransporter(ctx, &Transport{
info.FullMethod,
clientIp,
header(md),
header(responseHeader),
})
reply, err := handler(ctx, req)
if len(responseHeader) > 0 {
_ = grpc.SetHeader(ctx, responseHeader)
}
return reply, err
}
}
// wrappedStream is rewrite grpc stream's context
type wrappedStream struct {
grpc.ServerStream
ctx context.Context
}
func NewWrappedStream(ctx context.Context, stream grpc.ServerStream) grpc.ServerStream {
return &wrappedStream{
stream,
ctx,
}
}
func (w *wrappedStream) Context() context.Context {
return w.ctx
}
// StreamServerInterceptor is a gRPC stream server interceptor
func StreamServerInterceptor() grpc.StreamServerInterceptor {
return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, _ := metadata.FromIncomingContext(ss.Context())
p, _ := peer.FromContext(ss.Context())
clientIp := ""
if p != nil {
clientIp = p.Addr.String()
}
responseHeader := metadata.MD{}
ctx := transport.WithValueTransporter(ss.Context(), &Transport{
info.FullMethod,
clientIp,
header(md),
header(responseHeader),
})
ws := NewWrappedStream(ctx, ss)
err := handler(srv, ws)
if len(responseHeader) > 0 {
_ = grpc.SetHeader(ctx, responseHeader)
}
return err
}
}