forked from pachyderm/pachyderm
/
server.go
127 lines (115 loc) · 3.62 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package grpcutil
import (
"errors"
"fmt"
"math"
"net"
"os"
"path"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"github.com/pachyderm/pachyderm/src/client/pkg/tracing"
log "github.com/sirupsen/logrus"
)
const (
// TLSVolumePath is the path at which the tls cert and private key (if any)
// will be mounted in the pachd pod
TLSVolumePath = "/pachd-tls-cert"
// TLSCertFile is the name of the mounted file containing a TLS certificate
// that identifies pachd
TLSCertFile = "tls.crt"
// TLSKeyFile is the name of the mounted file containing a private key
// corresponding to the public certificate in TLSCertFile
TLSKeyFile = "tls.key"
)
var (
// ErrMustSpecifyRegisterFunc is used when a register func is nil.
ErrMustSpecifyRegisterFunc = errors.New("must specify registerFunc")
// ErrMustSpecifyPort is used when a port is 0
ErrMustSpecifyPort = errors.New("must specify port on which to serve")
)
// ServerOptions represent optional fields for serving.
type ServerOptions struct {
Port uint16
MaxMsgSize int
Cancel chan struct{}
RegisterFunc func(*grpc.Server) error
// If set, grpcutil may enable TLS. This should be set for public ports that
// serve GRPC services to 3rd party clients.
//
// If set, the criterion for actually serving over TLS is:
// if a signed TLS cert and corresponding private key in 'TLSVolumePath',
// this will serve GRPC traffic over TLS. If either are missing this will
// serve GRPC traffic over unencrypted HTTP,
//
// TODO make the TLS cert and key path a parameter, as pachd will need
// multiple certificates for multiple ports
PublicPortTLSAllowed bool
}
// Serve serves stuff.
func Serve(
servers ...ServerOptions,
) (retErr error) {
for _, server := range servers {
if server.RegisterFunc == nil {
return ErrMustSpecifyRegisterFunc
}
if server.Port == 0 {
return ErrMustSpecifyPort
}
opts := []grpc.ServerOption{
grpc.MaxConcurrentStreams(math.MaxUint32),
grpc.MaxRecvMsgSize(server.MaxMsgSize),
grpc.MaxSendMsgSize(server.MaxMsgSize),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 5 * time.Second,
PermitWithoutStream: true,
}),
grpc.UnaryInterceptor(tracing.UnaryServerInterceptor()),
grpc.StreamInterceptor(tracing.StreamServerInterceptor()),
}
if server.PublicPortTLSAllowed {
// Validate environment
certPath := path.Join(TLSVolumePath, TLSCertFile)
keyPath := path.Join(TLSVolumePath, TLSKeyFile)
_, certPathStatErr := os.Stat(certPath)
_, keyPathStatErr := os.Stat(keyPath)
if certPathStatErr != nil {
log.Warnf("TLS disabled: could not stat public cert at %s: %v", certPath, certPathStatErr)
}
if keyPathStatErr != nil {
log.Warnf("TLS disabled: could not stat private key at %s: %v", keyPath, keyPathStatErr)
}
if certPathStatErr == nil && keyPathStatErr == nil {
// Read TLS cert and key
transportCreds, err := credentials.NewServerTLSFromFile(certPath, keyPath)
if err != nil {
return fmt.Errorf("couldn't build transport creds: %v", err)
}
opts = append(opts, grpc.Creds(transportCreds))
}
}
grpcServer := grpc.NewServer(opts...)
if err := server.RegisterFunc(grpcServer); err != nil {
return err
}
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", server.Port))
if err != nil {
return err
}
if server.Cancel != nil {
go func() {
<-server.Cancel
if err := listener.Close(); err != nil {
fmt.Printf("listener.Close(): %v\n", err)
}
}()
}
if err := grpcServer.Serve(listener); err != nil {
return err
}
}
return nil
}