-
Notifications
You must be signed in to change notification settings - Fork 6
/
server.go
106 lines (89 loc) · 2.64 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package csi
import (
"context"
"net"
"net/url"
"os"
"sync"
"go.uber.org/zap"
"google.golang.org/grpc"
"github.com/container-storage-interface/spec/lib/go/csi/v0"
)
// This is based on the server written in gcp-filestore-csi-driver
// This is essentially a wrapper around the grpc server and setups up the right profile for receiving grpc requests.
type NonBlockingGRPCServer interface {
Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer)
Wait()
}
func NewNonBlockingGRPCServer(logger *zap.Logger) NonBlockingGRPCServer {
return &nonBlockingGRPCServer{
logger: logger,
}
}
type nonBlockingGRPCServer struct {
wg sync.WaitGroup
server *grpc.Server
logger *zap.Logger
}
func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
s.wg.Add(1)
go s.serve(endpoint, ids, cs, ns)
}
func (s *nonBlockingGRPCServer) Wait() {
s.wg.Wait()
}
func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
u, err := url.Parse(endpoint)
if err != nil {
s.logger.Fatal(err.Error())
return
}
var addr string
switch u.Scheme {
case "unix":
addr = u.Path
if err = os.Remove(addr); err != nil && !os.IsNotExist(err) {
s.logger.Fatal("failed to remove", zap.String("addr", addr), zap.Error(err))
}
case "tcp":
addr = u.Host
default:
s.logger.Fatal("endpoint scheme not supported", zap.String("scheme", u.Scheme))
}
listener, err := net.Listen(u.Scheme, addr)
if err != nil {
s.logger.Fatal("failed to listen", zap.Error(err))
}
s.logger.Info("started listening", zap.String("scheme", u.Scheme), zap.String("addr", addr))
opts := []grpc.ServerOption{
grpc.UnaryInterceptor(getInterceptor(s.logger)),
}
server := grpc.NewServer(opts...)
s.server = server
if ids != nil {
csi.RegisterIdentityServer(server, ids)
}
if cs != nil {
csi.RegisterControllerServer(server, cs)
}
if ns != nil {
csi.RegisterNodeServer(server, ns)
}
s.logger.Info("Listening for connections", zap.Any("addr", listener))
err = server.Serve(listener)
if err != nil {
s.logger.Fatal("Failed to start server", zap.Error(err))
}
}
func getInterceptor(l *zap.Logger) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
l.Debug("grpc call", zap.String("method", info.FullMethod), zap.Any("req", req))
resp, err := handler(ctx, req)
if err != nil {
l.Error("grpc error", zap.Error(err))
} else {
l.Debug("grpc response", zap.Any("resp", resp))
}
return resp, err
}
}