forked from kubernetes-sigs/cluster-api
/
pivot.go
431 lines (360 loc) · 15.6 KB
/
pivot.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package phases
import (
"io"
"strings"
"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/klog"
clusterv1 "sigs.k8s.io/cluster-api/pkg/apis/cluster/v1alpha1"
)
type sourceClient interface {
Delete(string) error
DeleteMachineClass(namespace, name string) error
ForceDeleteCluster(string, string) error
ForceDeleteMachine(string, string) error
ForceDeleteMachineDeployment(string, string) error
ForceDeleteMachineSet(namespace, name string) error
GetClusters(string) ([]*clusterv1.Cluster, error)
GetMachineClasses(string) ([]*clusterv1.MachineClass, error)
GetMachineDeployments(string) ([]*clusterv1.MachineDeployment, error)
GetMachineDeploymentsForCluster(*clusterv1.Cluster) ([]*clusterv1.MachineDeployment, error)
GetMachines(namespace string) ([]*clusterv1.Machine, error)
GetMachineSets(namespace string) ([]*clusterv1.MachineSet, error)
GetMachineSetsForCluster(*clusterv1.Cluster) ([]*clusterv1.MachineSet, error)
GetMachineSetsForMachineDeployment(*clusterv1.MachineDeployment) ([]*clusterv1.MachineSet, error)
GetMachinesForCluster(*clusterv1.Cluster) ([]*clusterv1.Machine, error)
GetMachinesForMachineSet(*clusterv1.MachineSet) ([]*clusterv1.Machine, error)
ScaleStatefulSet(string, string, int32) error
WaitForClusterV1alpha1Ready() error
}
type targetClient interface {
Apply(string) error
CreateClusterObject(*clusterv1.Cluster) error
CreateMachineClass(*clusterv1.MachineClass) error
CreateMachineDeployments([]*clusterv1.MachineDeployment, string) error
CreateMachines([]*clusterv1.Machine, string) error
CreateMachineSets([]*clusterv1.MachineSet, string) error
EnsureNamespace(string) error
GetMachineDeployment(namespace, name string) (*clusterv1.MachineDeployment, error)
GetMachineSet(string, string) (*clusterv1.MachineSet, error)
WaitForClusterV1alpha1Ready() error
}
// Pivot deploys the provided provider components to a target cluster and then migrates
// all cluster-api resources from the source cluster to the target cluster
func Pivot(source sourceClient, target targetClient, providerComponents string) error {
klog.Info("Applying Cluster API Provider Components to Target Cluster")
if err := target.Apply(providerComponents); err != nil {
return errors.Wrap(err, "unable to apply cluster api controllers")
}
klog.Info("Pivoting Cluster API objects from bootstrap to target cluster.")
if err := pivot(source, target, providerComponents); err != nil {
return errors.Wrap(err, "unable to pivot cluster API objects")
}
return nil
}
func pivot(from sourceClient, to targetClient, providerComponents string) error {
// TODO: Attempt to handle rollback in case of pivot failure
klog.V(4).Info("Ensuring cluster v1alpha1 resources are available on the source cluster")
if err := from.WaitForClusterV1alpha1Ready(); err != nil {
return errors.New("cluster v1alpha1 resource not ready on source cluster")
}
klog.V(4).Info("Ensuring cluster v1alpha1 resources are available on the target cluster")
if err := to.WaitForClusterV1alpha1Ready(); err != nil {
return errors.New("cluster v1alpha1 resource not ready on target cluster")
}
klog.V(4).Info("Parsing list of cluster-api controllers from provider components")
controllers, err := parseControllers(providerComponents)
if err != nil {
return errors.Wrap(err, "Failed to extract Cluster API Controllers from the provider components")
}
// Scale down the controller managers in the source cluster
for _, controller := range controllers {
klog.V(4).Infof("Scaling down controller %s/%s", controller.Namespace, controller.Name)
if err := from.ScaleStatefulSet(controller.Namespace, controller.Name, 0); err != nil {
return errors.Wrapf(err, "Failed to scale down %s/%s", controller.Namespace, controller.Name)
}
}
klog.V(4).Info("Retrieving list of MachineClasses to move")
machineClasses, err := from.GetMachineClasses("")
if err != nil {
return err
}
if err := copyMachineClasses(from, to, machineClasses); err != nil {
return err
}
klog.V(4).Info("Retrieving list of Clusters to move")
clusters, err := from.GetClusters("")
if err != nil {
return err
}
if err := moveClusters(from, to, clusters); err != nil {
return err
}
klog.V(4).Info("Retrieving list of MachineDeployments not associated with a Cluster to move")
machineDeployments, err := from.GetMachineDeployments("")
if err != nil {
return err
}
if err := moveMachineDeployments(from, to, machineDeployments); err != nil {
return err
}
klog.V(4).Info("Retrieving list of MachineSets not associated with a MachineDeployment or a Cluster to move")
machineSets, err := from.GetMachineSets("")
if err != nil {
return err
}
if err := moveMachineSets(from, to, machineSets); err != nil {
return err
}
klog.V(4).Infof("Retrieving list of Machines not associated with a MachineSet or a Cluster to move")
machines, err := from.GetMachines("")
if err != nil {
return err
}
if err := moveMachines(from, to, machines); err != nil {
return err
}
if err := deleteMachineClasses(from, machineClasses); err != nil {
return err
}
klog.V(4).Infof("Deleting provider components from source cluster")
if err := from.Delete(providerComponents); err != nil {
klog.Warningf("Could not delete the provider components from the source cluster: %v", err)
}
return nil
}
func moveClusters(from sourceClient, to targetClient, clusters []*clusterv1.Cluster) error {
clusterNames := make([]string, 0, len(clusters))
for _, c := range clusters {
clusterNames = append(clusterNames, c.Name)
}
klog.V(4).Infof("Preparing to move Clusters: %v", clusterNames)
for _, c := range clusters {
if err := moveCluster(from, to, c); err != nil {
return errors.Wrapf(err, "Failed to move cluster: %s/%s", c.Namespace, c.Name)
}
}
return nil
}
func deleteMachineClasses(client sourceClient, machineClasses []*clusterv1.MachineClass) error {
machineClassNames := make([]string, 0, len(machineClasses))
for _, mc := range machineClasses {
machineClassNames = append(machineClassNames, mc.Name)
}
klog.V(4).Infof("Preparing to delete MachineClasses: %v", machineClassNames)
for _, mc := range machineClasses {
if err := deleteMachineClass(client, mc); err != nil {
return errors.Wrapf(err, "failed to delete MachineClass %s:%s", mc.Namespace, mc.Name)
}
}
return nil
}
func deleteMachineClass(client sourceClient, machineClass *clusterv1.MachineClass) error {
// New objects cannot have a specified resource version. Clear it out.
machineClass.SetResourceVersion("")
if err := client.DeleteMachineClass(machineClass.Namespace, machineClass.Name); err != nil {
return errors.Wrapf(err, "error deleting MachineClass %s/%s from source cluster", machineClass.Namespace, machineClass.Name)
}
klog.V(4).Infof("Successfully deleted MachineClass %s/%s from source cluster", machineClass.Namespace, machineClass.Name)
return nil
}
func copyMachineClasses(from sourceClient, to targetClient, machineClasses []*clusterv1.MachineClass) error {
machineClassNames := make([]string, 0, len(machineClasses))
for _, mc := range machineClasses {
machineClassNames = append(machineClassNames, mc.Name)
}
klog.V(4).Infof("Preparing to copy MachineClasses: %v", machineClassNames)
for _, mc := range machineClasses {
if err := copyMachineClass(from, to, mc); err != nil {
return errors.Wrapf(err, "failed to copy MachineClass %s:%s", mc.Namespace, mc.Name)
}
}
return nil
}
func copyMachineClass(from sourceClient, to targetClient, machineClass *clusterv1.MachineClass) error {
// New objects cannot have a specified resource version. Clear it out.
machineClass.SetResourceVersion("")
if err := to.CreateMachineClass(machineClass); err != nil {
return errors.Wrapf(err, "error copying MachineClass %s/%s to target cluster", machineClass.Namespace, machineClass.Name)
}
klog.V(4).Infof("Successfully copied MachineClass %s/%s", machineClass.Namespace, machineClass.Name)
return nil
}
func moveCluster(from sourceClient, to targetClient, cluster *clusterv1.Cluster) error {
klog.V(4).Infof("Moving Cluster %s/%s", cluster.Namespace, cluster.Name)
klog.V(4).Infof("Ensuring namespace %q exists on target cluster", cluster.Namespace)
if err := to.EnsureNamespace(cluster.Namespace); err != nil {
return errors.Wrapf(err, "unable to ensure namespace %q in target cluster", cluster.Namespace)
}
// New objects cannot have a specified resource version. Clear it out.
cluster.SetResourceVersion("")
if err := to.CreateClusterObject(cluster); err != nil {
return errors.Wrapf(err, "error copying Cluster %s/%s to target cluster", cluster.Namespace, cluster.Name)
}
klog.V(4).Infof("Retrieving list of MachineDeployments to move for Cluster %s/%s", cluster.Namespace, cluster.Name)
machineDeployments, err := from.GetMachineDeploymentsForCluster(cluster)
if err != nil {
return err
}
if err := moveMachineDeployments(from, to, machineDeployments); err != nil {
return err
}
klog.V(4).Infof("Retrieving list of MachineSets not associated with a MachineDeployment to move for Cluster %s/%s", cluster.Namespace, cluster.Name)
machineSets, err := from.GetMachineSetsForCluster(cluster)
if err != nil {
return err
}
if err := moveMachineSets(from, to, machineSets); err != nil {
return err
}
klog.V(4).Infof("Retrieving list of Machines not associated with a MachineSet to move for Cluster %s/%s", cluster.Namespace, cluster.Name)
machines, err := from.GetMachinesForCluster(cluster)
if err != nil {
return err
}
if err := moveMachines(from, to, machines); err != nil {
return err
}
if err := from.ForceDeleteCluster(cluster.Namespace, cluster.Name); err != nil {
return errors.Wrapf(err, "error force deleting cluster %s/%s", cluster.Namespace, cluster.Name)
}
klog.V(4).Infof("Successfully moved Cluster %s/%s", cluster.Namespace, cluster.Name)
return nil
}
func moveMachineDeployments(from sourceClient, to targetClient, machineDeployments []*clusterv1.MachineDeployment) error {
machineDeploymentNames := make([]string, 0, len(machineDeployments))
for _, md := range machineDeployments {
machineDeploymentNames = append(machineDeploymentNames, md.Name)
}
klog.V(4).Infof("Preparing to move MachineDeployments: %v", machineDeploymentNames)
for _, md := range machineDeployments {
if err := moveMachineDeployment(from, to, md); err != nil {
return errors.Wrapf(err, "failed to move MachineDeployment %s:%s", md.Namespace, md.Name)
}
}
return nil
}
func moveMachineDeployment(from sourceClient, to targetClient, md *clusterv1.MachineDeployment) error {
klog.V(4).Infof("Moving MachineDeployment %s/%s", md.Namespace, md.Name)
klog.V(4).Infof("Retrieving list of MachineSets for MachineDeployment %s/%s", md.Namespace, md.Name)
machineSets, err := from.GetMachineSetsForMachineDeployment(md)
if err != nil {
return err
}
if err := moveMachineSets(from, to, machineSets); err != nil {
return err
}
// New objects cannot have a specified resource version. Clear it out.
md.SetResourceVersion("")
// Remove owner reference. This currently assumes that the only owner reference would be a Cluster.
md.SetOwnerReferences(nil)
if err := to.CreateMachineDeployments([]*clusterv1.MachineDeployment{md}, md.Namespace); err != nil {
return errors.Wrapf(err, "error copying MachineDeployment %s/%s to target cluster", md.Namespace, md.Name)
}
if err := from.ForceDeleteMachineDeployment(md.Namespace, md.Name); err != nil {
return errors.Wrapf(err, "error force deleting MachineDeployment %s/%s from source cluster", md.Namespace, md.Name)
}
klog.V(4).Infof("Successfully moved MachineDeployment %s/%s", md.Namespace, md.Name)
return nil
}
func moveMachineSets(from sourceClient, to targetClient, machineSets []*clusterv1.MachineSet) error {
machineSetNames := make([]string, 0, len(machineSets))
for _, ms := range machineSets {
machineSetNames = append(machineSetNames, ms.Name)
}
klog.V(4).Infof("Preparing to move MachineSets: %v", machineSetNames)
for _, ms := range machineSets {
if err := moveMachineSet(from, to, ms); err != nil {
return errors.Wrapf(err, "failed to move MachineSet %s:%s", ms.Namespace, ms.Name)
}
}
return nil
}
func moveMachineSet(from sourceClient, to targetClient, ms *clusterv1.MachineSet) error {
klog.V(4).Infof("Moving MachineSet %s/%s", ms.Namespace, ms.Name)
klog.V(4).Infof("Retrieving list of Machines for MachineSet %s/%s", ms.Namespace, ms.Name)
machines, err := from.GetMachinesForMachineSet(ms)
if err != nil {
return err
}
if err := moveMachines(from, to, machines); err != nil {
return err
}
// New objects cannot have a specified resource version. Clear it out.
ms.SetResourceVersion("")
// Remove owner reference. This currently assumes that the only owner references would be a MachineDeployment and/or a Cluster.
ms.SetOwnerReferences(nil)
if err := to.CreateMachineSets([]*clusterv1.MachineSet{ms}, ms.Namespace); err != nil {
return errors.Wrapf(err, "error copying MachineSet %s/%s to target cluster", ms.Namespace, ms.Name)
}
if err := from.ForceDeleteMachineSet(ms.Namespace, ms.Name); err != nil {
return errors.Wrapf(err, "error force deleting MachineSet %s/%s from source cluster", ms.Namespace, ms.Name)
}
klog.V(4).Infof("Successfully moved MachineSet %s/%s", ms.Namespace, ms.Name)
return nil
}
func moveMachines(from sourceClient, to targetClient, machines []*clusterv1.Machine) error {
machineNames := make([]string, 0, len(machines))
for _, m := range machines {
if m.DeletionTimestamp != nil {
klog.V(4).Infof("Skipping to move deleted machine: %q", m.Name)
continue
}
machineNames = append(machineNames, m.Name)
}
klog.V(4).Infof("Preparing to move Machines: %v", machineNames)
for _, m := range machines {
if m.DeletionTimestamp != nil {
continue
}
if err := moveMachine(from, to, m); err != nil {
return errors.Wrapf(err, "failed to move Machine %s:%s", m.Namespace, m.Name)
}
}
return nil
}
func moveMachine(from sourceClient, to targetClient, m *clusterv1.Machine) error {
klog.V(4).Infof("Moving Machine %s/%s", m.Namespace, m.Name)
// New objects cannot have a specified resource version. Clear it out.
m.SetResourceVersion("")
// Remove owner reference. This currently assumes that the only owner references would be a MachineSet and/or a Cluster.
m.SetOwnerReferences(nil)
if err := to.CreateMachines([]*clusterv1.Machine{m}, m.Namespace); err != nil {
return errors.Wrapf(err, "error copying Machine %s/%s to target cluster", m.Namespace, m.Name)
}
if err := from.ForceDeleteMachine(m.Namespace, m.Name); err != nil {
return errors.Wrapf(err, "error force deleting Machine %s/%s from source cluster", m.Namespace, m.Name)
}
klog.V(4).Infof("Successfully moved Machine %s/%s", m.Namespace, m.Name)
return nil
}
func parseControllers(providerComponents string) ([]*appsv1.StatefulSet, error) {
decoder := yaml.NewYAMLOrJSONDecoder(strings.NewReader(providerComponents), 32)
controllers := []*appsv1.StatefulSet{}
for {
var out appsv1.StatefulSet
err := decoder.Decode(&out)
if err == io.EOF {
break
} else if err != nil {
return nil, err
}
if out.TypeMeta.Kind == "StatefulSet" {
controllers = append(controllers, &out)
}
}
return controllers, nil
}