-
Notifications
You must be signed in to change notification settings - Fork 567
/
service_env.go
197 lines (181 loc) · 7.05 KB
/
service_env.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
package serviceenv
import (
"errors"
"fmt"
"math"
"net"
"os"
"time"
etcd "github.com/coreos/etcd/clientv3"
"github.com/pachyderm/pachyderm/src/client"
"github.com/pachyderm/pachyderm/src/server/pkg/backoff"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
kube "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"golang.org/x/net/context"
"golang.org/x/sync/errgroup"
)
// ServiceEnv is a struct containing connections to other services in the
// cluster. In pachd, there is only one instance of this struct, but tests may
// create more, if they want to create multiple pachyderm "clusters" served in
// separate goroutines.
type ServiceEnv struct {
*Configuration
// pachAddress is the domain name or hostport where pachd can be reached
pachAddress string
// pachClient is the "template" client other clients returned by this library
// are based on. It contains the original GRPC client connection and has no
// ctx and therefore no auth credentials or cancellation
pachClient *client.APIClient
// pachEg coordinates the initialization of pachClient. Note that ServiceEnv
// uses a separate error group for each client, rather than one for all
// three clients, so that pachd can initialize a ServiceEnv inside of its own
// initialization (if GetEtcdClient() blocked on intialization of 'pachClient'
// and pachd/main.go couldn't start the pachd server until GetEtcdClient() had
// returned, then pachd would be unable to start)
pachEg errgroup.Group
// etcdAddress is the domain name or hostport where etcd can be reached
etcdAddress string
// etcdClient is an etcd client that's shared by all users of this environment
etcdClient *etcd.Client
// etcdEg coordinates the initialization of etcdClient (see pachdEg)
etcdEg errgroup.Group
// kubeClient is a kubernetes client that, if initialized, is shared by all
// users of this environment
kubeClient *kube.Clientset
// kubeEg coordinates the initialization of kubeClient (see pachdEg)
kubeEg errgroup.Group
}
// InitPachOnlyEnv initializes this service environment. This dials a GRPC
// connection to pachd only (in a background goroutine), and creates the
// template pachClient used by future calls to GetPachClient.
//
// This call returns immediately, but GetPachClient will block
// until the client is ready.
func InitPachOnlyEnv(config *Configuration) *ServiceEnv {
env := &ServiceEnv{Configuration: config}
env.pachAddress = net.JoinHostPort("127.0.0.1", fmt.Sprintf("%d", env.PeerPort))
env.pachEg.Go(env.initPachClient)
return env // env is not ready yet
}
// InitServiceEnv initializes this service environment. This dials a GRPC
// connection to pachd and etcd (in a background goroutine), and creates the
// template pachClient used by future calls to GetPachClient.
//
// This call returns immediately, but GetPachClient and GetEtcdClient block
// until their respective clients are ready.
func InitServiceEnv(config *Configuration) *ServiceEnv {
env := InitPachOnlyEnv(config)
env.etcdAddress = fmt.Sprintf("http://%s", net.JoinHostPort(env.EtcdHost, env.EtcdPort))
env.etcdEg.Go(env.initEtcdClient)
return env // env is not ready yet
}
// InitWithKube is like InitServiceEnv, but also assumes that it's run inside
// a kubernetes cluster and tries to connect to the kubernetes API server.
func InitWithKube(config *Configuration) *ServiceEnv {
env := InitServiceEnv(config)
env.kubeEg.Go(env.initKubeClient)
return env // env is not ready yet
}
func (env *ServiceEnv) initPachClient() error {
// validate argument
if env.pachAddress == "" {
return errors.New("cannot initialize pach client with empty pach address")
}
// Initialize pach client
return backoff.Retry(func() error {
var err error
env.pachClient, err = client.NewFromAddress(env.pachAddress)
if err != nil {
return fmt.Errorf("failed to initialize pach client: %v", err)
}
return nil
}, backoff.RetryEvery(time.Second).For(time.Minute))
}
func (env *ServiceEnv) initEtcdClient() error {
// validate argument
if env.etcdAddress == "" {
return errors.New("cannot initialize pach client with empty pach address")
}
// Initialize etcd
return backoff.Retry(func() error {
var err error
env.etcdClient, err = etcd.New(etcd.Config{
Endpoints: []string{env.etcdAddress},
// Use a long timeout with Etcd so that Pachyderm doesn't crash loop
// while waiting for etcd to come up (makes startup net faster)
DialOptions: append(client.DefaultDialOptions(), grpc.WithTimeout(3*time.Minute)),
MaxCallSendMsgSize: math.MaxInt32,
MaxCallRecvMsgSize: math.MaxInt32,
})
if err != nil {
return fmt.Errorf("failed to initialize etcd client: %v", err)
}
return nil
}, backoff.RetryEvery(time.Second).For(time.Minute))
}
func (env *ServiceEnv) initKubeClient() error {
return backoff.Retry(func() error {
// Get secure in-cluster config
var kubeAddr string
var ok bool
cfg, err := rest.InClusterConfig()
if err != nil {
// InClusterConfig failed, fall back to insecure config
log.Errorf("falling back to insecure kube client due to error from NewInCluster: %s", err)
kubeAddr, ok = os.LookupEnv("KUBERNETES_PORT_443_TCP_ADDR")
if !ok {
return fmt.Errorf("can't fall back to insecure kube client due to missing env var (failed to retrieve in-cluster config: %v)", err)
}
cfg = &rest.Config{
Host: fmt.Sprintf("%s:443", kubeAddr),
TLSClientConfig: rest.TLSClientConfig{
Insecure: true,
},
}
}
env.kubeClient, err = kube.NewForConfig(cfg)
if err != nil {
return fmt.Errorf("could not initialize kube client: %v", err)
}
return nil
}, backoff.RetryEvery(time.Second).For(time.Minute))
}
// GetPachClient returns a pachd client with the same authentication
// credentials and cancellation as 'ctx' (ensuring that auth credentials are
// propagated through downstream RPCs).
//
// Functions that receive RPCs should call this to convert their RPC context to
// a Pachyderm client, and internal Pachyderm calls should accept clients
// returned by this call.
//
// (Warning) Do not call this function during server setup unless it is in a goroutine.
// A Pachyderm client is not available until the server has been setup.
func (env *ServiceEnv) GetPachClient(ctx context.Context) *client.APIClient {
if err := env.pachEg.Wait(); err != nil {
panic(err) // If env can't connect, there's no sensible way to recover
}
return env.pachClient.WithCtx(ctx)
}
// GetEtcdClient returns the already connected etcd client without modification.
func (env *ServiceEnv) GetEtcdClient() *etcd.Client {
if err := env.etcdEg.Wait(); err != nil {
panic(err) // If env can't connect, there's no sensible way to recover
}
if env.etcdClient == nil {
panic("service env never connected to etcd")
}
return env.etcdClient
}
// GetKubeClient returns the already connected Kubernetes API client without
// modification.
func (env *ServiceEnv) GetKubeClient() *kube.Clientset {
if err := env.kubeEg.Wait(); err != nil {
panic(err) // If env can't connect, there's no sensible way to recover
}
if env.kubeClient == nil {
panic("service env never connected to kubernetes")
}
return env.kubeClient
}