/
cluster_management.go
523 lines (471 loc) · 19.3 KB
/
cluster_management.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
/*
Copyright 2021 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package multicluster
import (
"bytes"
"context"
"fmt"
"github.com/briandowns/spinner"
"github.com/oam-dev/cluster-register/pkg/hub"
"github.com/oam-dev/cluster-register/pkg/spoke"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
ocmclusterv1 "open-cluster-management.io/api/cluster/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
clusterv1alpha1 "github.com/oam-dev/cluster-gateway/pkg/apis/cluster/v1alpha1"
"github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1"
"github.com/oam-dev/kubevela/apis/types"
"github.com/oam-dev/kubevela/pkg/policy/envbinding"
"github.com/oam-dev/kubevela/pkg/utils"
velaerrors "github.com/oam-dev/kubevela/pkg/utils/errors"
cmdutil "github.com/oam-dev/kubevela/pkg/utils/util"
)
// KubeClusterConfig info for cluster management
type KubeClusterConfig struct {
ClusterName string
CreateNamespace string
*clientcmdapi.Config
*clientcmdapi.Cluster
*clientcmdapi.AuthInfo
// Logs records intermediate logs (which do not return error) during running
Logs bytes.Buffer
}
// SetClusterName set cluster name if not empty
func (clusterConfig *KubeClusterConfig) SetClusterName(clusterName string) *KubeClusterConfig {
if clusterName != "" {
clusterConfig.ClusterName = clusterName
}
return clusterConfig
}
// SetCreateNamespace set create namespace, if empty, no namespace will be created
func (clusterConfig *KubeClusterConfig) SetCreateNamespace(createNamespace string) *KubeClusterConfig {
clusterConfig.CreateNamespace = createNamespace
return clusterConfig
}
// Validate check if config is valid for join
func (clusterConfig *KubeClusterConfig) Validate() error {
switch clusterConfig.ClusterName {
case "":
return errors.Errorf("ClusterName cannot be empty")
case ClusterLocalName:
return errors.Errorf("ClusterName cannot be `%s`, it is reserved as the local cluster", ClusterLocalName)
}
return nil
}
// RegisterByVelaSecret create cluster secrets for KubeVela to use
func (clusterConfig *KubeClusterConfig) RegisterByVelaSecret(ctx context.Context, cli client.Client) error {
if err := ensureClusterNotExists(ctx, cli, clusterConfig.ClusterName); err != nil {
return errors.Wrapf(err, "cannot use cluster name %s", clusterConfig.ClusterName)
}
var credentialType clusterv1alpha1.CredentialType
data := map[string][]byte{
"endpoint": []byte(clusterConfig.Cluster.Server),
"ca.crt": clusterConfig.Cluster.CertificateAuthorityData,
}
if len(clusterConfig.AuthInfo.Token) > 0 {
credentialType = clusterv1alpha1.CredentialTypeServiceAccountToken
data["token"] = []byte(clusterConfig.AuthInfo.Token)
} else {
credentialType = clusterv1alpha1.CredentialTypeX509Certificate
data["tls.crt"] = clusterConfig.AuthInfo.ClientCertificateData
data["tls.key"] = clusterConfig.AuthInfo.ClientKeyData
}
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: clusterConfig.ClusterName,
Namespace: ClusterGatewaySecretNamespace,
Labels: map[string]string{
clusterv1alpha1.LabelKeyClusterCredentialType: string(credentialType),
},
},
Type: corev1.SecretTypeOpaque,
Data: data,
}
if err := cli.Create(ctx, secret); err != nil {
return errors.Wrapf(err, "failed to add cluster to kubernetes")
}
// TODO(somefive): create namespace now only work for cluster secret
if clusterConfig.CreateNamespace != "" {
if err := ensureNamespaceExists(ctx, cli, clusterConfig.ClusterName, clusterConfig.CreateNamespace); err != nil {
_ = cli.Delete(ctx, secret)
return errors.Wrapf(err, "failed to ensure %s namespace installed in cluster %s", clusterConfig.CreateNamespace, clusterConfig.ClusterName)
}
}
return nil
}
// RegisterClusterManagedByOCM create ocm managed cluster for use
// TODO(somefive): OCM ManagedCluster only support cli join now
func (clusterConfig *KubeClusterConfig) RegisterClusterManagedByOCM(ctx context.Context, args *JoinClusterArgs) error {
newTrackingSpinner := args.trackingSpinnerFactory
hubCluster, err := hub.NewHubCluster(args.hubConfig)
if err != nil {
return errors.Wrap(err, "fail to create client connect to hub cluster")
}
hubTracker := newTrackingSpinner("Checking the environment of hub cluster..")
hubTracker.FinalMSG = "Hub cluster all set, continue registration.\n"
hubTracker.Start()
crdName := apitypes.NamespacedName{Name: "managedclusters." + ocmclusterv1.GroupName}
if err := hubCluster.Client.Get(context.Background(), crdName, &apiextensionsv1.CustomResourceDefinition{}); err != nil {
return err
}
clusters, err := ListVirtualClusters(context.Background(), hubCluster.Client)
if err != nil {
return err
}
for _, cluster := range clusters {
if cluster.Name == clusterConfig.ClusterName && cluster.Accepted {
return errors.Errorf("you have register a cluster named %s", clusterConfig.ClusterName)
}
}
hubTracker.Stop()
spokeRestConf, err := clientcmd.BuildConfigFromKubeconfigGetter("", func() (*clientcmdapi.Config, error) {
return clusterConfig.Config, nil
})
if err != nil {
return errors.Wrap(err, "fail to convert spoke-cluster kubeconfig")
}
spokeTracker := newTrackingSpinner("Building registration config for the managed cluster")
spokeTracker.FinalMSG = "Successfully prepared registration config.\n"
spokeTracker.Start()
overridingRegistrationEndpoint := ""
if !*args.inClusterBootstrap {
args.ioStreams.Infof("Using the api endpoint from hub kubeconfig %q as registration entry.\n", args.hubConfig.Host)
overridingRegistrationEndpoint = args.hubConfig.Host
}
hubKubeToken, err := hubCluster.GenerateHubClusterKubeConfig(ctx, overridingRegistrationEndpoint)
if err != nil {
return errors.Wrap(err, "fail to generate the token for spoke-cluster")
}
spokeCluster, err := spoke.NewSpokeCluster(clusterConfig.ClusterName, spokeRestConf, hubKubeToken)
if err != nil {
return errors.Wrap(err, "fail to connect spoke cluster")
}
err = spokeCluster.InitSpokeClusterEnv(ctx)
if err != nil {
return errors.Wrap(err, "fail to prepare the env for spoke-cluster")
}
spokeTracker.Stop()
registrationOperatorTracker := newTrackingSpinner("Waiting for registration operators running: (`kubectl -n open-cluster-management get pod -l app=klusterlet`)")
registrationOperatorTracker.FinalMSG = "Registration operator successfully deployed.\n"
registrationOperatorTracker.Start()
if err := spokeCluster.WaitForRegistrationOperatorReady(ctx); err != nil {
return errors.Wrap(err, "fail to setup registration operator for spoke-cluster")
}
registrationOperatorTracker.Stop()
registrationAgentTracker := newTrackingSpinner("Waiting for registration agent running: (`kubectl -n open-cluster-management-agent get pod -l app=klusterlet-registration-agent`)")
registrationAgentTracker.FinalMSG = "Registration agent successfully deployed.\n"
registrationAgentTracker.Start()
if err := spokeCluster.WaitForRegistrationAgentReady(ctx); err != nil {
return errors.Wrap(err, "fail to setup registration agent for spoke-cluster")
}
registrationAgentTracker.Stop()
csrCreationTracker := newTrackingSpinner("Waiting for CSRs created (`kubectl get csr -l open-cluster-management.io/cluster-name=" + spokeCluster.Name + "`)")
csrCreationTracker.FinalMSG = "Successfully found corresponding CSR from the agent.\n"
csrCreationTracker.Start()
if err := hubCluster.WaitForCSRCreated(ctx, spokeCluster.Name); err != nil {
return errors.Wrap(err, "failed found CSR created by registration agent")
}
csrCreationTracker.Stop()
args.ioStreams.Infof("Approving the CSR for cluster %q.\n", spokeCluster.Name)
if err := hubCluster.ApproveCSR(ctx, spokeCluster.Name); err != nil {
return errors.Wrap(err, "failed found CSR created by registration agent")
}
ready, err := hubCluster.WaitForSpokeClusterReady(ctx, clusterConfig.ClusterName)
if err != nil || !ready {
return errors.Errorf("fail to waiting for register request")
}
if err = hubCluster.RegisterSpokeCluster(ctx, spokeCluster.Name); err != nil {
return errors.Wrap(err, "fail to approve spoke cluster")
}
return nil
}
// LoadKubeClusterConfigFromFile create KubeClusterConfig from kubeconfig file
func LoadKubeClusterConfigFromFile(filepath string) (*KubeClusterConfig, error) {
clusterConfig := &KubeClusterConfig{}
var err error
clusterConfig.Config, err = clientcmd.LoadFromFile(filepath)
if err != nil {
return nil, errors.Wrapf(err, "failed to get kubeconfig")
}
if len(clusterConfig.Config.CurrentContext) == 0 {
return nil, fmt.Errorf("current-context is not set")
}
var ok bool
ctx, ok := clusterConfig.Config.Contexts[clusterConfig.Config.CurrentContext]
if !ok {
return nil, fmt.Errorf("current-context %s not found", clusterConfig.Config.CurrentContext)
}
clusterConfig.Cluster, ok = clusterConfig.Config.Clusters[ctx.Cluster]
if !ok {
return nil, fmt.Errorf("cluster %s not found", ctx.Cluster)
}
clusterConfig.AuthInfo, ok = clusterConfig.Config.AuthInfos[ctx.AuthInfo]
if !ok {
return nil, fmt.Errorf("authInfo %s not found", ctx.AuthInfo)
}
clusterConfig.ClusterName = ctx.Cluster
if endpoint, err := utils.ParseAPIServerEndpoint(clusterConfig.Cluster.Server); err == nil {
clusterConfig.Cluster.Server = endpoint
} else {
_, _ = fmt.Fprintf(&clusterConfig.Logs, "failed to parse server endpoint: %v", err)
}
return clusterConfig, nil
}
const (
// ClusterGateWayEngine cluster-gateway cluster management solution
ClusterGateWayEngine = "cluster-gateway"
// OCMEngine ocm cluster management solution
OCMEngine = "ocm"
)
// JoinClusterArgs args for join cluster
type JoinClusterArgs struct {
engine string
createNamespace string
ioStreams cmdutil.IOStreams
hubConfig *rest.Config
inClusterBootstrap *bool
trackingSpinnerFactory func(string) *spinner.Spinner
}
func newJoinClusterArgs(options ...JoinClusterOption) *JoinClusterArgs {
args := &JoinClusterArgs{
engine: ClusterGateWayEngine,
}
for _, op := range options {
op.ApplyToArgs(args)
}
return args
}
// JoinClusterOption option for join cluster
type JoinClusterOption interface {
ApplyToArgs(args *JoinClusterArgs)
}
// JoinClusterCreateNamespaceOption create namespace when join cluster, if empty, no creation
type JoinClusterCreateNamespaceOption string
// ApplyToArgs apply to args
func (op JoinClusterCreateNamespaceOption) ApplyToArgs(args *JoinClusterArgs) {
args.createNamespace = string(op)
}
// JoinClusterEngineOption configure engine for join cluster, either cluster-gateway or ocm
type JoinClusterEngineOption string
// ApplyToArgs apply to args
func (op JoinClusterEngineOption) ApplyToArgs(args *JoinClusterArgs) {
args.engine = string(op)
}
// JoinClusterOCMOptions options used when joining clusters by ocm, only support cli for now
type JoinClusterOCMOptions struct {
IoStreams cmdutil.IOStreams
HubConfig *rest.Config
InClusterBootstrap *bool
TrackingSpinnerFactory func(string) *spinner.Spinner
}
// ApplyToArgs apply to args
func (op JoinClusterOCMOptions) ApplyToArgs(args *JoinClusterArgs) {
args.ioStreams = op.IoStreams
args.hubConfig = op.HubConfig
args.inClusterBootstrap = op.InClusterBootstrap
args.trackingSpinnerFactory = op.TrackingSpinnerFactory
}
// JoinClusterByKubeConfig add child cluster by kubeconfig path, return cluster info and error
func JoinClusterByKubeConfig(ctx context.Context, cli client.Client, kubeconfigPath string, clusterName string, options ...JoinClusterOption) (*KubeClusterConfig, error) {
args := newJoinClusterArgs(options...)
clusterConfig, err := LoadKubeClusterConfigFromFile(kubeconfigPath)
if err != nil {
return nil, err
}
if err := clusterConfig.SetClusterName(clusterName).SetCreateNamespace(args.createNamespace).Validate(); err != nil {
return nil, err
}
switch args.engine {
case ClusterGateWayEngine:
if err = clusterConfig.RegisterByVelaSecret(ctx, cli); err != nil {
return nil, err
}
case OCMEngine:
if args.inClusterBootstrap == nil {
return nil, errors.Wrapf(err, "failed to determine the registration endpoint for the hub cluster "+
"when parsing --in-cluster-bootstrap flag")
}
if err = clusterConfig.RegisterClusterManagedByOCM(ctx, args); err != nil {
return clusterConfig, err
}
}
return clusterConfig, nil
}
// DetachClusterArgs args for detaching cluster
type DetachClusterArgs struct {
managedClusterKubeConfigPath string
}
func newDetachClusterArgs(options ...DetachClusterOption) *DetachClusterArgs {
args := &DetachClusterArgs{}
for _, op := range options {
op.ApplyToArgs(args)
}
return args
}
// DetachClusterOption option for detach cluster
type DetachClusterOption interface {
ApplyToArgs(args *DetachClusterArgs)
}
// DetachClusterManagedClusterKubeConfigPathOption configure the managed cluster kubeconfig path while detach ocm cluster
type DetachClusterManagedClusterKubeConfigPathOption string
// ApplyToArgs apply to args
func (op DetachClusterManagedClusterKubeConfigPathOption) ApplyToArgs(args *DetachClusterArgs) {
args.managedClusterKubeConfigPath = string(op)
}
// DetachCluster detach cluster by name, if cluster is using by application, it will return error
func DetachCluster(ctx context.Context, cli client.Client, clusterName string, options ...DetachClusterOption) error {
args := newDetachClusterArgs(options...)
if clusterName == ClusterLocalName {
return ErrReservedLocalClusterName
}
vc, err := GetVirtualCluster(ctx, cli, clusterName)
if err != nil {
return err
}
switch vc.Type {
case clusterv1alpha1.CredentialTypeX509Certificate, clusterv1alpha1.CredentialTypeServiceAccountToken:
clusterSecret, err := getMutableClusterSecret(ctx, cli, clusterName)
if err != nil {
return errors.Wrapf(err, "cluster %s is not mutable now", clusterName)
}
if err := cli.Delete(ctx, clusterSecret); err != nil {
return errors.Wrapf(err, "failed to detach cluster %s", clusterName)
}
case types.CredentialTypeOCMManagedCluster:
if args.managedClusterKubeConfigPath == "" {
return errors.New("kubeconfig-path must be set to detach ocm managed cluster")
}
config, err := clientcmd.LoadFromFile(args.managedClusterKubeConfigPath)
if err != nil {
return err
}
restConfig, err := clientcmd.BuildConfigFromKubeconfigGetter("", func() (*clientcmdapi.Config, error) {
return config, nil
})
if err != nil {
return err
}
if err = spoke.CleanSpokeClusterEnv(restConfig); err != nil {
return err
}
managedCluster := ocmclusterv1.ManagedCluster{ObjectMeta: metav1.ObjectMeta{Name: clusterName}}
if err = cli.Delete(context.Background(), &managedCluster); err != nil {
if !apierrors.IsNotFound(err) {
return err
}
}
}
return nil
}
// RenameCluster rename cluster
func RenameCluster(ctx context.Context, k8sClient client.Client, oldClusterName string, newClusterName string) error {
if newClusterName == ClusterLocalName {
return ErrReservedLocalClusterName
}
clusterSecret, err := getMutableClusterSecret(ctx, k8sClient, oldClusterName)
if err != nil {
return errors.Wrapf(err, "cluster %s is not mutable now", oldClusterName)
}
if err := ensureClusterNotExists(ctx, k8sClient, newClusterName); err != nil {
return errors.Wrapf(err, "cannot set cluster name to %s", newClusterName)
}
if err := k8sClient.Delete(ctx, clusterSecret); err != nil {
return errors.Wrapf(err, "failed to rename cluster from %s to %s", oldClusterName, newClusterName)
}
clusterSecret.ObjectMeta = metav1.ObjectMeta{
Name: newClusterName,
Namespace: ClusterGatewaySecretNamespace,
Labels: clusterSecret.Labels,
Annotations: clusterSecret.Annotations,
}
if err := k8sClient.Create(ctx, clusterSecret); err != nil {
return errors.Wrapf(err, "failed to rename cluster from %s to %s", oldClusterName, newClusterName)
}
return nil
}
// AliasCluster alias cluster
func AliasCluster(ctx context.Context, cli client.Client, clusterName string, aliasName string) error {
if clusterName == ClusterLocalName {
return ErrReservedLocalClusterName
}
vc, err := GetVirtualCluster(ctx, cli, clusterName)
if err != nil {
return err
}
setClusterAlias(vc.Object, aliasName)
return cli.Update(ctx, vc.Object)
}
// ensureClusterNotExists will check the cluster is not existed in control plane
func ensureClusterNotExists(ctx context.Context, c client.Client, clusterName string) error {
_, err := GetVirtualCluster(ctx, c, clusterName)
if err != nil {
if IsClusterNotExists(err) {
return nil
}
return err
}
return ErrClusterExists
}
// ensureNamespaceExists ensures vela namespace to be installed in child cluster
func ensureNamespaceExists(ctx context.Context, c client.Client, clusterName string, createNamespace string) error {
remoteCtx := ContextWithClusterName(ctx, clusterName)
if err := c.Get(remoteCtx, apitypes.NamespacedName{Name: createNamespace}, &corev1.Namespace{}); err != nil {
if !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "failed to check if namespace %s exists", createNamespace)
}
if err = c.Create(remoteCtx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: createNamespace}}); err != nil {
return errors.Wrapf(err, "failed to create namespace %s", createNamespace)
}
}
return nil
}
// getMutableClusterSecret retrieves the cluster secret and check if any application is using the cluster
// TODO(somefive): should rework the logic of checking application cluster usage
func getMutableClusterSecret(ctx context.Context, c client.Client, clusterName string) (*corev1.Secret, error) {
clusterSecret := &corev1.Secret{}
if err := c.Get(ctx, apitypes.NamespacedName{Namespace: ClusterGatewaySecretNamespace, Name: clusterName}, clusterSecret); err != nil {
return nil, errors.Wrapf(err, "failed to find target cluster secret %s", clusterName)
}
labels := clusterSecret.GetLabels()
if labels == nil || labels[clusterv1alpha1.LabelKeyClusterCredentialType] == "" {
return nil, fmt.Errorf("invalid cluster secret %s: cluster credential type label %s is not set", clusterName, clusterv1alpha1.LabelKeyClusterCredentialType)
}
apps := &v1beta1.ApplicationList{}
if err := c.List(ctx, apps); err != nil {
return nil, errors.Wrap(err, "failed to find applications to check clusters")
}
errs := velaerrors.ErrorList{}
for _, app := range apps.Items {
status, err := envbinding.GetEnvBindingPolicyStatus(app.DeepCopy(), "")
if err == nil && status != nil {
for _, env := range status.Envs {
for _, placement := range env.Placements {
if placement.Cluster == clusterName {
errs = append(errs, fmt.Errorf("application %s/%s (env: %s) is currently using cluster %s", app.Namespace, app.Name, env.Env, clusterName))
}
}
}
}
}
if errs.HasError() {
return nil, errors.Wrapf(errs, "cluster %s is in use now", clusterName)
}
return clusterSecret, nil
}