-
Notifications
You must be signed in to change notification settings - Fork 6
/
upgrade.go
352 lines (294 loc) · 12.4 KB
/
upgrade.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
package osd
import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/Masterminds/semver/v3"
clustersmgmtv1 "github.com/openshift-online/ocm-sdk-go/clustersmgmt/v1"
"github.com/openshift/osde2e-common/pkg/clients/openshift"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
"sigs.k8s.io/e2e-framework/klient/k8s"
"sigs.k8s.io/e2e-framework/klient/wait"
"sigs.k8s.io/e2e-framework/klient/wait/conditions"
)
const (
managedUpgradeOperatorDeploymentName = "managed-upgrade-operator"
managedUpgradeOperatorNamespace = "openshift-managed-upgrade-operator"
versionGateLabel = "api.openshift.com/gate-ocp"
upgradeMaxAttempts = 1080
upgradeDelay = 10
)
// upgradeError represents the cluster upgrade custom error
type upgradeError struct {
err error
}
// Error returns the formatted error message when upgradeError is invoked
func (e *upgradeError) Error() string {
return fmt.Sprintf("osd upgrade failed: %v", e.err)
}
// versionGates returns a list of available version gates from ocm
func (o *Provider) versionGates(ctx context.Context) (*clustersmgmtv1.VersionGateList, error) {
response, err := o.ClustersMgmt().V1().VersionGates().List().SendContext(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get version gates: %v", err)
}
return response.Items(), nil
}
// getVersionGateID returns the version gate agreement id
func (o *Provider) getVersionGateID(ctx context.Context, version string) (string, error) {
versionGates, err := o.versionGates(ctx)
if err != nil {
return "", fmt.Errorf("failed to get version gate id for version %q, %v", version, err)
}
for _, versionGate := range versionGates.Slice() {
if versionGate.VersionRawIDPrefix() == version && versionGate.Label() == versionGateLabel {
return versionGate.ID(), nil
}
}
return "", fmt.Errorf("no version gate exists for %q", version)
}
// getVersionGateAgreement returns the gate agreement ocm resource
func (o *Provider) getVersionGateAgreement(ctx context.Context, versionGateID string) (*clustersmgmtv1.VersionGate, error) {
response, err := o.ClustersMgmt().V1().VersionGates().VersionGate(versionGateID).Get().SendContext(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get version gate agreement %q, %v", versionGateID, err)
}
return response.Body(), nil
}
// gateAgreementExistForCluster checks to see if the version gate agreement id provided for the cluster already exists
func (o *Provider) gateAgreementExistForCluster(ctx context.Context, clusterID, gateAgreementID string) (bool, error) {
response, err := o.ClustersMgmt().V1().Clusters().Cluster(clusterID).GateAgreements().List().SendContext(ctx)
if err != nil {
return false, fmt.Errorf("failed to get cluster %q version gate agreement: %v", clusterID, err)
}
for _, gateAgreement := range response.Items().Slice() {
if gateAgreement.VersionGate().ID() == gateAgreementID {
o.log.Info("Gate agreement exists", clusterIDLoggerKey, clusterID, "gate_agreement_id", gateAgreementID, ocmEnvironmentLoggerKey, o.ocmEnvironment)
return true, nil
}
}
return false, nil
}
// addGateAgreement adds a version gate agreement to the cluster ocm resource.
// Version gate agreement are used to acknowledge the cluster can be upgraded between versions
func (o *Provider) addGateAgreement(ctx context.Context, clusterID string, currentVersion, upgradeVersion semver.Version) error {
if !(currentVersion.Minor() < upgradeVersion.Minor()) {
o.log.Info("Gate agreement not required for z stream upgrades", clusterIDLoggerKey, clusterID, ocmEnvironmentLoggerKey, o.ocmEnvironment)
return nil
}
majorMinor := fmt.Sprintf("%d.%d", upgradeVersion.Major(), upgradeVersion.Minor())
versionGateID, err := o.getVersionGateID(ctx, majorMinor)
if err != nil {
return err
}
exist, err := o.gateAgreementExistForCluster(ctx, clusterID, versionGateID)
if err != nil {
return err
}
if exist {
return nil
}
gateAgreement, err := o.getVersionGateAgreement(ctx, versionGateID)
if err != nil {
return err
}
versionGateAgreement, err := clustersmgmtv1.NewVersionGateAgreement().
VersionGate(clustersmgmtv1.NewVersionGate().Copy(gateAgreement)).
Build()
if err != nil {
return fmt.Errorf("failed to build version gate agreement for cluster %q, %v", clusterID, err)
}
_, err = o.ClustersMgmt().V1().Clusters().Cluster(clusterID).GateAgreements().Add().Body(versionGateAgreement).SendContext(ctx)
if err != nil {
return fmt.Errorf("failed to apply version gate agreement to cluster %q, %v", clusterID, err)
}
return nil
}
// initiateUpgrade initiates the upgrade for the cluster with ocm by applying a upgrade policy to the cluster
func (o *Provider) initiateUpgrade(ctx context.Context, clusterID, version string) error {
upgradePolicy, err := clustersmgmtv1.NewUpgradePolicy().Version(version).
NextRun(time.Now().UTC().Add(7 * time.Minute)).
ScheduleType("manual").Build()
if err != nil {
return fmt.Errorf("failed to build upgrade policy for cluster %q, %v", clusterID, err)
}
response, err := o.ClustersMgmt().V1().Clusters().Cluster(clusterID).UpgradePolicies().Add().Body(upgradePolicy).SendContext(ctx)
if err != nil || response.Status() != http.StatusCreated {
return fmt.Errorf("failed to apply upgrade policy to cluster %q, %v", clusterID, err)
}
o.log.Info("Cluster upgrade scheduled!", clusterIDLoggerKey, clusterID, "upgrade_version", response.Body().Version(),
"upgradeTime", response.Body().NextRun().Format(time.RFC3339), ocmEnvironmentLoggerKey, o.ocmEnvironment)
return nil
}
// restartManagedUpgradeOperator scales down/up the muo operator to speed up the cluster upgrade start time
func (o *Provider) restartManagedUpgradeOperator(ctx context.Context, client *openshift.Client) error {
patchReplicas := func(replicasCount int) (*k8s.Patch, error) {
patchData, err := json.Marshal(map[string]interface{}{
"spec": map[string]interface{}{
"replicas": replicasCount,
},
})
if err != nil {
return nil, fmt.Errorf("failed to build patch data to modify deployment %s replicas count: %v", managedUpgradeOperatorDeploymentName, err)
}
return &k8s.Patch{
PatchType: types.StrategicMergePatchType,
Data: patchData,
}, nil
}
deployment := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: managedUpgradeOperatorDeploymentName, Namespace: managedUpgradeOperatorNamespace}}
err := wait.For(conditions.New(client.Resources).DeploymentConditionMatch(deployment, appsv1.DeploymentAvailable, corev1.ConditionTrue))
if err != nil {
return fmt.Errorf("failed to get managed upgrade operator deployment: %v", err)
}
patchData, err := patchReplicas(0)
if err != nil {
return err
}
err = client.Patch(ctx, deployment, *patchData)
if err != nil {
return fmt.Errorf("failed to scale down %s deployment: %v", managedUpgradeOperatorDeploymentName, err)
}
patchData, err = patchReplicas(1)
if err != nil {
return err
}
err = client.Patch(ctx, deployment, *patchData)
if err != nil {
return fmt.Errorf("failed to scale up %s deployment: %v", managedUpgradeOperatorDeploymentName, err)
}
o.log.Info("Managed upgrade operator restarted!")
return nil
}
// managedUpgradeConfigExist waits/checks for the muo upgrade config to exist on the cluster
func (o *Provider) managedUpgradeConfigExist(ctx context.Context, dynamicClient *dynamic.DynamicClient) error {
for i := 1; i <= 6; i++ {
upgradeConfig, err := getManagedUpgradeOperatorConfig(ctx, dynamicClient)
if err != nil || upgradeConfig == nil {
time.Sleep(30 * time.Second)
continue
}
return nil
}
return fmt.Errorf("managed upgrade config does not exist the cluster")
}
// OCMUpgrade handles the end to end process to upgrade an openshift dedicated cluster
func (o *Provider) OCMUpgrade(ctx context.Context, client *openshift.Client, clusterID string, currentVersion, upgradeVersion semver.Version) error {
var (
conditionMessage string
dynamicClient *dynamic.DynamicClient
err error
upgradeStatus string
)
if dynamicClient, err = getKubernetesDynamicClient(client); err != nil {
return &upgradeError{err: err}
}
if err = o.addGateAgreement(ctx, clusterID, currentVersion, upgradeVersion); err != nil {
return &upgradeError{err: err}
}
if err = o.initiateUpgrade(ctx, clusterID, upgradeVersion.String()); err != nil {
return &upgradeError{err: err}
}
if err = o.restartManagedUpgradeOperator(ctx, client); err != nil {
return &upgradeError{err: err}
}
if err = o.managedUpgradeConfigExist(ctx, dynamicClient); err != nil {
return &upgradeError{err: err}
}
errorHandler := func(key string, found bool, err error) error {
if !found || err != nil {
o.log.Error(err, "Managed upgrade operator config key is missing", "key", key)
time.Sleep(10 * time.Second)
return err
}
return nil
}
for i := 1; i <= upgradeMaxAttempts; i++ {
upgradeConfig, err := getManagedUpgradeOperatorConfig(ctx, dynamicClient)
if err != nil {
o.log.Error(err, "Failed to get managed upgrade operator config")
time.Sleep(upgradeDelay * time.Second)
continue
}
status, found, err := unstructured.NestedMap(upgradeConfig.Object, "status")
if errorHandler("status", found, err) != nil {
continue
}
histories, found, err := unstructured.NestedSlice(status, "history")
if errorHandler("status.history", found, err) != nil {
continue
}
for _, h := range histories {
version, found, err := unstructured.NestedString(h.(map[string]interface{}), "version")
if errorHandler("status.history.[].version", found, err) != nil {
continue
}
if version == upgradeVersion.String() {
upgradeStatus, found, err = unstructured.NestedString(h.(map[string]interface{}), "phase")
if errorHandler("status.history.[].version.phase", found, err) != nil {
continue
}
conditions, found, err := unstructured.NestedSlice(h.(map[string]interface{}), "conditions")
if (upgradeStatus == "Pending" && len(conditions) < 1) || len(conditions) < 1 {
break
}
if errorHandler("status.history.[].version.conditions", found, err) != nil {
continue
}
conditionMessage, found, err = unstructured.NestedString(conditions[0].(map[string]interface{}), "message")
if errorHandler("status.history.[].version.message", found, err) != nil {
continue
}
break
}
}
switch upgradeStatus {
case "":
o.log.Info("Upgrade has not started yet...", clusterIDLoggerKey, clusterID, ocmEnvironmentLoggerKey, o.ocmEnvironment)
time.Sleep(upgradeDelay * time.Second)
case "Failed", clusterIDLoggerKey, clusterID:
o.log.Info("Upgrade failed!", "condition_message", conditionMessage, clusterIDLoggerKey, clusterID, ocmEnvironmentLoggerKey, o.ocmEnvironment)
return &upgradeError{err: fmt.Errorf("upgrade failed")}
case "Upgraded":
o.log.Info("Upgrade complete!", clusterIDLoggerKey, clusterID, ocmEnvironmentLoggerKey, o.ocmEnvironment)
return nil
case "Pending":
o.log.Info("Upgrade is pending...", clusterIDLoggerKey, clusterID, ocmEnvironmentLoggerKey, o.ocmEnvironment)
time.Sleep(upgradeDelay * time.Second)
case "Upgrading":
o.log.Info("Upgrade is in progress", "condition_message", conditionMessage, clusterIDLoggerKey, clusterID, ocmEnvironmentLoggerKey, o.ocmEnvironment)
time.Sleep(upgradeDelay * time.Second)
}
}
return fmt.Errorf("upgrade is still in progress, failed to finish within max wait attempts")
}
// getKubernetesDynamicClient returns the kubernetes dynamic client
func getKubernetesDynamicClient(client *openshift.Client) (*dynamic.DynamicClient, error) {
dynamicClient, err := dynamic.NewForConfig(client.GetConfig())
if err != nil {
return nil, fmt.Errorf("failed to create kubernetes dynamic client: %w", err)
}
return dynamicClient, nil
}
// getManagedUpgradeOperatorConfig returns the upgrade config object
func getManagedUpgradeOperatorConfig(ctx context.Context, dynamicClient *dynamic.DynamicClient) (*unstructured.Unstructured, error) {
upgradeConfigs, err := dynamicClient.Resource(
schema.GroupVersionResource{
Group: "upgrade.managed.openshift.io",
Version: "v1alpha1",
Resource: "upgradeconfigs",
},
).Namespace(managedUpgradeOperatorNamespace).List(ctx, metav1.ListOptions{})
if err != nil || len(upgradeConfigs.Items) < 1 {
return nil, err
}
return &upgradeConfigs.Items[0], nil
}