This repository has been archived by the owner on Dec 20, 2022. It is now read-only.
/
grpc.go
130 lines (111 loc) · 3.39 KB
/
grpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package handler
import (
"context"
"io"
"net"
"strconv"
"strings"
"sync"
"github.com/kpango/glg"
"github.com/mwitkow/grpc-proxy/proxy"
"golang.org/x/sync/singleflight"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
authorizerd "github.com/yahoojapan/athenz-authorizer/v5"
"github.com/yahoojapan/authorization-proxy/v4/config"
"github.com/yahoojapan/authorization-proxy/v4/service"
)
const (
gRPC = "grpc"
)
type GRPCHandler struct {
proxyCfg config.Proxy
roleCfg config.RoleToken
authorizationd service.Authorizationd
connMap sync.Map
group singleflight.Group
}
func NewGRPC(opts ...GRPCOption) (grpc.StreamHandler, io.Closer) {
gh := new(GRPCHandler)
for _, opt := range append(defaultGRPCOptions, opts...) {
opt(gh)
}
if !strings.EqualFold(gh.proxyCfg.Scheme, gRPC) {
return nil, nil
}
dialOpts := []grpc.DialOption{
grpc.WithCodec(proxy.Codec()),
grpc.WithInsecure(),
}
target := net.JoinHostPort(gh.proxyCfg.Host, strconv.Itoa(int(gh.proxyCfg.Port)))
return proxy.TransparentHandler(func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return ctx, nil, status.Errorf(codes.Unauthenticated, ErrGRPCMetadataNotFound)
}
rts := md.Get(gh.roleCfg.RoleAuthHeader)
if len(rts) == 0 {
return ctx, nil, status.Errorf(codes.Unauthenticated, ErrRoleTokenNotFound)
}
p, err := gh.authorizationd.AuthorizeRoleToken(ctx, rts[0], gRPC, fullMethodName)
if err != nil {
return ctx, nil, status.Errorf(codes.Unauthenticated, err.Error())
}
ctx = metadata.AppendToOutgoingContext(ctx,
"X-Athenz-Principal", p.Name(),
"X-Athenz-Role", strings.Join(p.Roles(), ","),
"X-Athenz-Domain", p.Domain(),
"X-Athenz-Issued-At", strconv.FormatInt(p.IssueTime(), 10),
"X-Athenz-Expires-At", strconv.FormatInt(p.ExpiryTime(), 10))
if c, ok := p.(authorizerd.OAuthAccessToken); ok {
ctx = metadata.AppendToOutgoingContext(ctx, "X-Athenz-Client-ID", c.ClientID())
}
conn, err := gh.dialContext(ctx, target, dialOpts...)
return ctx, conn, err
}), gh
}
func (gh *GRPCHandler) Close() error {
gh.connMap.Range(func(target, v interface{}) bool {
if conn, ok := v.(*grpc.ClientConn); ok {
if err := conn.Close(); err != nil {
glg.Warnf("failed to close connection. target: %s, err: %v", target, err)
}
gh.connMap.Delete(target)
}
return true
})
return nil
}
func (gh *GRPCHandler) dialContext(ctx context.Context, target string, dialOpts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
if v, ok := gh.connMap.Load(target); ok {
if conn, ok = v.(*grpc.ClientConn); ok && isHealthy(conn) {
return conn, nil
}
}
v, err, _ := gh.group.Do(target, func() (interface{}, error) {
conn, err := grpc.DialContext(ctx, target, dialOpts...)
if err != nil {
return nil, err
}
gh.connMap.Store(target, conn)
return conn, nil
})
if err == nil {
if conn, ok := v.(*grpc.ClientConn); ok {
return conn, nil
}
}
return grpc.DialContext(ctx, target, dialOpts...)
}
func isHealthy(conn *grpc.ClientConn) bool {
glg.Debugf("conn.GetState(): %s", conn.GetState().String())
switch conn.GetState() {
case connectivity.Ready, connectivity.Idle, connectivity.Connecting:
return true
default:
return false
}
}