-
Notifications
You must be signed in to change notification settings - Fork 94
/
grpc.go
197 lines (160 loc) · 5.06 KB
/
grpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package driver
import (
"bytes"
"fmt"
"net"
"os"
"strings"
"sync"
"golang.org/x/net/context"
"google.golang.org/grpc"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
"k8s.io/klog"
"github.com/container-storage-interface/spec/lib/go/csi"
)
// parseEndpoint should have a valid prefix(unix/tcp) to return a valid endpoint parts
func parseEndpoint(ep string) (string, string, error) {
if strings.HasPrefix(strings.ToLower(ep), "unix://") || strings.HasPrefix(strings.ToLower(ep), "tcp://") {
s := strings.SplitN(ep, "://", 2)
if s[1] != "" {
return s[0], s[1], nil
}
}
return "", "", fmt.Errorf("invalid endpoint: %v", ep)
}
//filters if the logd are informative or pollutant
func isInfotrmativeLog(info string) bool {
// add the messages that pollute logs to the array
var msgsToFilter = [][]byte{
[]byte("NodeGetVolumeStats"),
[]byte("NodeGetCapabilities"),
}
// checks for message in request
for _, msg := range msgsToFilter {
if bytes.Contains([]byte(info), msg) {
return false
}
}
return true
}
// logGRPC logs all the grpc related errors, i.e the final errors
// which are returned to the grpc clients
func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
log := isInfotrmativeLog(info.FullMethod)
if log {
klog.Infof("GRPC call: %s requests %s", info.FullMethod, protosanitizer.StripSecrets(req))
}
resp, err := handler(ctx, req)
if log {
if err != nil {
klog.Errorf("GRPC error: %v", err)
} else {
klog.Infof("GRPC response: %s", protosanitizer.StripSecrets(resp))
}
}
return resp, err
}
// NonBlockingGRPCServer defines Non blocking GRPC server interfaces
type NonBlockingGRPCServer interface {
// Start services at the endpoint
Start()
// Waits for the service to stop
Wait()
// Stops the service gracefully
Stop()
// Stops the service forcefully
ForceStop()
}
// NewNonBlockingGRPCServer returns a new instance of NonBlockingGRPCServer
func NewNonBlockingGRPCServer(ep string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) NonBlockingGRPCServer {
return &nonBlockingGRPCServer{
endpoint: ep,
idntyServer: ids,
ctrlServer: cs,
agentServer: ns}
}
// NonBlocking server
// dont block the execution for a task to complete.
// use wait group to wait for all the tasks dispatched.
type nonBlockingGRPCServer struct {
wg sync.WaitGroup
server *grpc.Server
endpoint string
idntyServer csi.IdentityServer
ctrlServer csi.ControllerServer
agentServer csi.NodeServer
}
// Start grpc server for serving CSI endpoints
func (s *nonBlockingGRPCServer) Start() {
s.wg.Add(1)
go s.serve(s.endpoint, s.idntyServer, s.ctrlServer, s.agentServer)
}
// Wait for the service to stop
func (s *nonBlockingGRPCServer) Wait() {
s.wg.Wait()
}
// Stop the service forcefully
func (s *nonBlockingGRPCServer) Stop() {
s.server.GracefulStop()
}
// ForceStop the service
func (s *nonBlockingGRPCServer) ForceStop() {
s.server.Stop()
}
// serve starts serving requests at the provided endpoint based on the type of
// plugin. In this function all the csi related interfaces are provided by
// container-storage-interface
func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
proto, addr, err := parseEndpoint(endpoint)
if err != nil {
klog.Fatal(err.Error())
}
// Clear off the addr if it is already present, this is done to remove stale
// entries, as this path is shared with the OS and will be the same
// everytime the plugin restarts, its possible that the last instance leaves
// a stale entry
if proto == "unix" {
addr = "/" + addr
if err := os.Remove(addr); err != nil && !os.IsNotExist(err) {
klog.Fatalf("Failed to remove %s, error: %s", addr, err.Error())
}
}
listener, err := net.Listen(proto, addr)
if err != nil {
klog.Fatalf("Failed to listen: %v", err)
}
opts := []grpc.ServerOption{
grpc.UnaryInterceptor(logGRPC),
}
// Create a new grpc server, all the request from csi client to
// create/delete/... will hit this server
server := grpc.NewServer(opts...)
s.server = server
if ids != nil {
csi.RegisterIdentityServer(server, ids)
}
if cs != nil {
csi.RegisterControllerServer(server, cs)
}
if ns != nil {
csi.RegisterNodeServer(server, ns)
}
klog.Infof("Listening for connections on address: %#v", listener.Addr())
// Start serving requests on the grpc server created
err = server.Serve(listener)
if err != nil {
klog.Fatal(err.Error())
}
}