forked from kubesphere/kubekey
/
machine_helpers.go
277 lines (236 loc) Β· 14.1 KB
/
machine_helpers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"context"
. "github.com/onsi/gomega"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
. "sigs.k8s.io/cluster-api/test/framework/ginkgoextensions"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/patch"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/kubesphere/kubekey/v3/test/e2e/framework/internal/log"
)
// GetMachinesByMachineDeploymentsInput is the input for GetMachinesByMachineDeployments.
type GetMachinesByMachineDeploymentsInput struct {
Lister Lister
ClusterName string
Namespace string
MachineDeployment clusterv1.MachineDeployment
}
// GetMachinesByMachineDeployments returns Machine objects for a cluster belonging to a machine deployment.
// Important! this method relies on labels that are created by the CAPI controllers during the first reconciliation, so
// it is necessary to ensure this is already happened before calling it.
func GetMachinesByMachineDeployments(ctx context.Context, input GetMachinesByMachineDeploymentsInput) []clusterv1.Machine {
Expect(ctx).NotTo(BeNil(), "ctx is required for GetMachinesByMachineDeployments")
Expect(input.Lister).ToNot(BeNil(), "Invalid argument. input.Lister can't be nil when calling GetMachinesByMachineDeployments")
Expect(input.ClusterName).ToNot(BeEmpty(), "Invalid argument. input.ClusterName can't be empty when calling GetMachinesByMachineDeployments")
Expect(input.Namespace).ToNot(BeEmpty(), "Invalid argument. input.Namespace can't be empty when calling GetMachinesByMachineDeployments")
Expect(input.MachineDeployment).ToNot(BeNil(), "Invalid argument. input.MachineDeployment can't be nil when calling GetMachinesByMachineDeployments")
opts := byClusterOptions(input.ClusterName, input.Namespace)
opts = append(opts, machineDeploymentOptions(input.MachineDeployment)...)
machineList := &clusterv1.MachineList{}
Eventually(func() error {
return input.Lister.List(ctx, machineList, opts...)
}, retryableOperationTimeout, retryableOperationInterval).Should(Succeed(), "Failed to list MachineList object for Cluster %s", klog.KRef(input.Namespace, input.ClusterName))
return machineList.Items
}
// GetMachinesByMachineHealthCheckInput is the input for GetMachinesByMachineHealthCheck.
type GetMachinesByMachineHealthCheckInput struct {
Lister Lister
ClusterName string
MachineHealthCheck *clusterv1.MachineHealthCheck
}
// GetMachinesByMachineHealthCheck returns Machine objects for a cluster that match with MachineHealthCheck selector.
func GetMachinesByMachineHealthCheck(ctx context.Context, input GetMachinesByMachineHealthCheckInput) []clusterv1.Machine {
Expect(ctx).NotTo(BeNil(), "ctx is required for GetMachinesByMachineDeployments")
Expect(input.Lister).ToNot(BeNil(), "Invalid argument. input.Lister can't be nil when calling GetMachinesByMachineHealthCheck")
Expect(input.ClusterName).ToNot(BeEmpty(), "Invalid argument. input.ClusterName can't be empty when calling GetMachinesByMachineHealthCheck")
Expect(input.MachineHealthCheck).ToNot(BeNil(), "Invalid argument. input.MachineHealthCheck can't be nil when calling GetMachinesByMachineHealthCheck")
opts := byClusterOptions(input.ClusterName, input.MachineHealthCheck.Namespace)
opts = append(opts, machineHealthCheckOptions(*input.MachineHealthCheck)...)
machineList := &clusterv1.MachineList{}
Eventually(func() error {
return input.Lister.List(ctx, machineList, opts...)
}, retryableOperationTimeout, retryableOperationInterval).Should(Succeed(), "Failed to list MachineList object for Cluster %s", klog.KRef(input.MachineHealthCheck.Namespace, input.ClusterName))
return machineList.Items
}
// GetControlPlaneMachinesByClusterInput is the input for GetControlPlaneMachinesByCluster.
type GetControlPlaneMachinesByClusterInput struct {
Lister Lister
ClusterName string
Namespace string
}
// GetControlPlaneMachinesByCluster returns the Machine objects for a cluster.
// Important! this method relies on labels that are created by the CAPI controllers during the first reconciliation, so
// it is necessary to ensure this is already happened before calling it.
func GetControlPlaneMachinesByCluster(ctx context.Context, input GetControlPlaneMachinesByClusterInput) []clusterv1.Machine {
Expect(ctx).NotTo(BeNil(), "ctx is required for GetControlPlaneMachinesByCluster")
Expect(input.Lister).ToNot(BeNil(), "Invalid argument. input.Lister can't be nil when calling GetControlPlaneMachinesByCluster")
Expect(input.ClusterName).ToNot(BeEmpty(), "Invalid argument. input.ClusterName can't be empty when calling GetControlPlaneMachinesByCluster")
Expect(input.Namespace).ToNot(BeEmpty(), "Invalid argument. input.Namespace can't be empty when calling GetControlPlaneMachinesByCluster")
options := append(byClusterOptions(input.ClusterName, input.Namespace), controlPlaneMachineOptions()...)
machineList := &clusterv1.MachineList{}
Eventually(func() error {
return input.Lister.List(ctx, machineList, options...)
}, retryableOperationTimeout, retryableOperationInterval).Should(Succeed(), "Failed to list MachineList object for Cluster %s", klog.KRef(input.Namespace, input.ClusterName))
return machineList.Items
}
// WaitForControlPlaneMachinesToBeUpgradedInput is the input for WaitForControlPlaneMachinesToBeUpgraded.
type WaitForControlPlaneMachinesToBeUpgradedInput struct {
Lister Lister
Cluster *clusterv1.Cluster
KubernetesUpgradeVersion string
MachineCount int
}
// WaitForControlPlaneMachinesToBeUpgraded waits until all machines are upgraded to the correct Kubernetes version.
func WaitForControlPlaneMachinesToBeUpgraded(ctx context.Context, input WaitForControlPlaneMachinesToBeUpgradedInput, intervals ...interface{}) {
Expect(ctx).NotTo(BeNil(), "ctx is required for WaitForControlPlaneMachinesToBeUpgraded")
Expect(input.Lister).ToNot(BeNil(), "Invalid argument. input.Lister can't be nil when calling WaitForControlPlaneMachinesToBeUpgraded")
Expect(input.KubernetesUpgradeVersion).ToNot(BeEmpty(), "Invalid argument. input.KubernetesUpgradeVersion can't be empty when calling WaitForControlPlaneMachinesToBeUpgraded")
Expect(input.MachineCount).To(BeNumerically(">", 0), "Invalid argument. input.MachineCount can't be smaller than 1 when calling WaitForControlPlaneMachinesToBeUpgraded")
Byf("Ensuring all control-plane machines have upgraded kubernetes version %s", input.KubernetesUpgradeVersion)
Eventually(func() (int, error) {
machines := GetControlPlaneMachinesByCluster(ctx, GetControlPlaneMachinesByClusterInput{
Lister: input.Lister,
ClusterName: input.Cluster.Name,
Namespace: input.Cluster.Namespace,
})
upgraded := 0
for _, machine := range machines {
m := machine
if *m.Spec.Version == input.KubernetesUpgradeVersion && conditions.IsTrue(&m, clusterv1.MachineNodeHealthyCondition) {
upgraded++
}
}
if len(machines) > upgraded {
return 0, errors.New("old nodes remain")
}
return upgraded, nil
}, intervals...).Should(Equal(input.MachineCount), "Timed out waiting for all control-plane machines in Cluster %s to be upgraded to kubernetes version %s", klog.KObj(input.Cluster), input.KubernetesUpgradeVersion)
}
// WaitForMachineDeploymentMachinesToBeUpgradedInput is the input for WaitForMachineDeploymentMachinesToBeUpgraded.
type WaitForMachineDeploymentMachinesToBeUpgradedInput struct {
Lister Lister
Cluster *clusterv1.Cluster
KubernetesUpgradeVersion string
MachineCount int
MachineDeployment clusterv1.MachineDeployment
}
// WaitForMachineDeploymentMachinesToBeUpgraded waits until all machines belonging to a MachineDeployment are upgraded to the correct kubernetes version.
func WaitForMachineDeploymentMachinesToBeUpgraded(ctx context.Context, input WaitForMachineDeploymentMachinesToBeUpgradedInput, intervals ...interface{}) {
Expect(ctx).NotTo(BeNil(), "ctx is required for WaitForMachineDeploymentMachinesToBeUpgraded")
Expect(input.Lister).ToNot(BeNil(), "Invalid argument. input.Getter can't be nil when calling WaitForMachineDeploymentMachinesToBeUpgraded")
Expect(input.Cluster).ToNot(BeNil(), "Invalid argument. input.Cluster can't be nil when calling WaitForMachineDeploymentMachinesToBeUpgraded")
Expect(input.KubernetesUpgradeVersion).ToNot(BeNil(), "Invalid argument. input.KubernetesUpgradeVersion can't be nil when calling WaitForMachineDeploymentMachinesToBeUpgraded")
Expect(input.MachineDeployment).ToNot(BeNil(), "Invalid argument. input.MachineDeployment can't be nil when calling WaitForMachineDeploymentMachinesToBeUpgraded")
Expect(input.MachineCount).To(BeNumerically(">", 0), "Invalid argument. input.MachineCount can't be smaller than 1 when calling WaitForMachineDeploymentMachinesToBeUpgraded")
log.Logf("Ensuring all MachineDeployment Machines have upgraded kubernetes version %s", input.KubernetesUpgradeVersion)
Eventually(func() (int, error) {
machines := GetMachinesByMachineDeployments(ctx, GetMachinesByMachineDeploymentsInput{
Lister: input.Lister,
ClusterName: input.Cluster.Name,
Namespace: input.Cluster.Namespace,
MachineDeployment: input.MachineDeployment,
})
upgraded := 0
for _, machine := range machines {
if *machine.Spec.Version == input.KubernetesUpgradeVersion {
upgraded++
}
}
if len(machines) > upgraded {
return 0, errors.New("old nodes remain")
}
return upgraded, nil
}, intervals...).Should(Equal(input.MachineCount), "Timed out waiting for all MachineDeployment %s Machines to be upgraded to kubernetes version %s", klog.KObj(&input.MachineDeployment), input.KubernetesUpgradeVersion)
}
// PatchNodeConditionInput is the input for PatchNodeCondition.
type PatchNodeConditionInput struct {
ClusterProxy ClusterProxy
Cluster *clusterv1.Cluster
NodeCondition corev1.NodeCondition
Machine clusterv1.Machine
}
// PatchNodeCondition patches a node condition to any one of the machines with a node ref.
func PatchNodeCondition(ctx context.Context, input PatchNodeConditionInput) {
Expect(ctx).NotTo(BeNil(), "ctx is required for PatchNodeConditions")
Expect(input.ClusterProxy).ToNot(BeNil(), "Invalid argument. input.ClusterProxy can't be nil when calling PatchNodeConditions")
Expect(input.Cluster).ToNot(BeNil(), "Invalid argument. input.Cluster can't be nil when calling PatchNodeConditions")
Expect(input.NodeCondition).ToNot(BeNil(), "Invalid argument. input.NodeCondition can't be nil when calling PatchNodeConditions")
Expect(input.Machine).ToNot(BeNil(), "Invalid argument. input.Machine can't be nil when calling PatchNodeConditions")
log.Logf("Patching the node condition to the node")
Expect(input.Machine.Status.NodeRef).ToNot(BeNil())
node := &corev1.Node{}
Eventually(func() error {
return input.ClusterProxy.GetWorkloadCluster(ctx, input.Cluster.Namespace, input.Cluster.Name).GetClient().Get(ctx, types.NamespacedName{Name: input.Machine.Status.NodeRef.Name, Namespace: input.Machine.Status.NodeRef.Namespace}, node)
}, retryableOperationTimeout, retryableOperationInterval).Should(Succeed(), "Failed to get node %s", input.Machine.Status.NodeRef.Name)
patchHelper, err := patch.NewHelper(node, input.ClusterProxy.GetWorkloadCluster(ctx, input.Cluster.Namespace, input.Cluster.Name).GetClient())
Expect(err).ToNot(HaveOccurred())
node.Status.Conditions = append(node.Status.Conditions, input.NodeCondition)
Eventually(func() error {
return patchHelper.Patch(ctx, node)
}, retryableOperationTimeout, retryableOperationInterval).Should(Succeed(), "Failed to patch node %s", input.Machine.Status.NodeRef.Name)
}
// MachineStatusCheck is a type that operates a status check on a Machine.
type MachineStatusCheck func(p *clusterv1.Machine) error
// WaitForMachineStatusCheckInput is the input for WaitForMachineStatusCheck.
type WaitForMachineStatusCheckInput struct {
Getter Getter
Machine *clusterv1.Machine
StatusChecks []MachineStatusCheck
}
// WaitForMachineStatusCheck waits for the specified status to be true for the machine.
func WaitForMachineStatusCheck(ctx context.Context, input WaitForMachineStatusCheckInput, intervals ...interface{}) {
Expect(ctx).NotTo(BeNil(), "ctx is required for WaitForMachineStatusCheck")
Expect(input.Machine).ToNot(BeNil(), "Invalid argument. input.Machine can't be nil when calling WaitForMachineStatusCheck")
Expect(input.StatusChecks).ToNot(BeEmpty(), "Invalid argument. input.StatusCheck can't be empty when calling WaitForMachineStatusCheck")
Eventually(func() (bool, error) {
machine := &clusterv1.Machine{}
key := client.ObjectKey{
Namespace: input.Machine.Namespace,
Name: input.Machine.Name,
}
err := input.Getter.Get(ctx, key, machine)
Expect(err).NotTo(HaveOccurred())
for _, statusCheck := range input.StatusChecks {
err := statusCheck(machine)
if err != nil {
return false, err
}
}
return true, nil
}, intervals...).Should(BeTrue())
}
// MachineNodeRefCheck is a MachineStatusCheck ensuring that a NodeRef is assigned to the machine.
func MachineNodeRefCheck() MachineStatusCheck {
return func(machine *clusterv1.Machine) error {
if machine.Status.NodeRef == nil {
return errors.Errorf("NodeRef is not assigned to the machine %s", klog.KObj(machine))
}
return nil
}
}
// MachinePhaseCheck is a MachineStatusCheck ensuring that a machines is in the expected phase.
func MachinePhaseCheck(expectedPhase string) MachineStatusCheck {
return func(machine *clusterv1.Machine) error {
if machine.Status.Phase != expectedPhase {
return errors.Errorf("Machine %s is not in phase %s", klog.KObj(machine), expectedPhase)
}
return nil
}
}