forked from kubernetes-sigs/kube-storage-version-migrator
/
kubemigrator.go
174 lines (158 loc) · 6.16 KB
/
kubemigrator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"fmt"
"reflect"
"time"
"k8s.io/klog/glog"
migrationv1alpha1 "github.com/kubernetes-sigs/kube-storage-version-migrator/pkg/apis/migration/v1alpha1"
migrationclient "github.com/kubernetes-sigs/kube-storage-version-migrator/pkg/clients/clientset"
"github.com/kubernetes-sigs/kube-storage-version-migrator/pkg/migrator"
"github.com/kubernetes-sigs/kube-storage-version-migrator/pkg/migrator/metrics"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
)
// KubeMigrator monitors storageVersionMigraiton objects, fulfills the
// migration, and updates the status of the storageVersionMigration objects.
type KubeMigrator struct {
dynamic dynamic.Interface
migrationClient migrationclient.Interface
migrationInformer cache.SharedIndexInformer
}
// NewKubeMigrator creates KubeMigrator.
func NewKubeMigrator(dynamic dynamic.Interface, migrationClient migrationclient.Interface) *KubeMigrator {
informer := NewStatusIndexedInformer(migrationClient)
return &KubeMigrator{
dynamic: dynamic,
migrationClient: migrationClient,
migrationInformer: informer,
}
}
func (km *KubeMigrator) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go km.migrationInformer.Run(stopCh)
if !cache.WaitForCacheSync(stopCh, km.migrationInformer.HasSynced) {
utilruntime.HandleError(fmt.Errorf("Unable to sync caches"))
return
}
wait.Until(km.process, time.Second, stopCh)
}
func (km *KubeMigrator) process() {
// KubeMigrator has only one worker, so it doesn't need to use a
// workqueue to ensure only there is a single thread processing a
// storageVersionMigration.
// The already "Running" storageVersionMigrations are the priority.
runnings, err := km.migrationInformer.GetIndexer().ByIndex(StatusIndex, StatusRunning)
if err != nil {
utilruntime.HandleError(err)
return
}
if len(runnings) != 0 {
utilruntime.HandleError(km.processOne(runnings[0]))
return
}
// The next priority is the pending storageVersionMigrations.
pendings, err := km.migrationInformer.GetIndexer().ByIndex(StatusIndex, StatusPending)
if err != nil {
utilruntime.HandleError(err)
return
}
if len(pendings) != 0 {
utilruntime.HandleError(km.processOne(pendings[0]))
return
}
}
func (km *KubeMigrator) processOne(obj interface{}) error {
m, ok := obj.(*migrationv1alpha1.StorageVersionMigration)
if !ok {
return fmt.Errorf("expected StorageVersionMigration, got %#v", reflect.TypeOf(obj))
}
// get the fresh object from the apiserver to make sure the object
// still exists, and the object is not completed.
m, err := km.migrationClient.MigrationV1alpha1().StorageVersionMigrations().Get(m.Name, metav1.GetOptions{})
if err != nil {
return err
}
if HasCondition(m, migrationv1alpha1.MigrationSucceeded) || HasCondition(m, migrationv1alpha1.MigrationFailed) {
glog.V(2).Infof("The migration has already completed for %#v", m)
return nil
}
m, err = km.updateStatus(m, migrationv1alpha1.MigrationRunning, "")
if err != nil {
return err
}
progressTracker := migrator.NewProgressTracker(km.migrationClient.MigrationV1alpha1().StorageVersionMigrations(), m.Name)
core := migrator.NewMigrator(resource(m), km.dynamic, progressTracker)
// If the storageVersionMigration object is deleted during Run(), Run()
// will return an error when it tries to write the continueToken into the
// migration object. Thus, it's not necessary to register a deletion
// event handler with the migrationInformer to interrupt the Run().
err = core.Run()
utilruntime.HandleError(err)
if err == nil {
_, err = km.updateStatus(m, migrationv1alpha1.MigrationSucceeded, "")
metrics.Metrics.ObserveSucceededMigration(resource(m).String())
return err
}
_, err = km.updateStatus(m, migrationv1alpha1.MigrationFailed, err.Error())
metrics.Metrics.ObserveFailedMigration(resource(m).String())
return err
}
// updateStatus always retries no matter what kind of error is returned by the
// apiserver, because it's a pity to start over the entire migration merely
// because a status update failure.
// updateStatus also removes other KNOWN conditions.
func (km *KubeMigrator) updateStatus(m *migrationv1alpha1.StorageVersionMigration, condition migrationv1alpha1.MigrationConditionType, message string) (*migrationv1alpha1.StorageVersionMigration, error) {
backoff := wait.Backoff{
Steps: 6,
Duration: 10 * time.Millisecond,
Factor: 5.0,
Jitter: 0.1,
}
return m, wait.ExponentialBackoff(backoff, func() (bool, error) {
var newConditions []migrationv1alpha1.MigrationCondition
for _, c := range m.Status.Conditions {
switch c.Type {
case migrationv1alpha1.MigrationRunning:
case migrationv1alpha1.MigrationSucceeded:
case migrationv1alpha1.MigrationFailed:
default:
// keeps unknown conditions
newConditions = append(newConditions, c)
}
}
newCondition := migrationv1alpha1.MigrationCondition{
Type: condition,
Status: corev1.ConditionTrue,
LastUpdateTime: metav1.Now(),
Message: message,
}
newConditions = append(newConditions, newCondition)
m.Status.Conditions = newConditions
_, err := km.migrationClient.MigrationV1alpha1().StorageVersionMigrations().UpdateStatus(m)
if err == nil {
return true, nil
}
// Always refresh and retry, no matter what kind of error is returned by the apiserver.
updated, err := km.migrationClient.MigrationV1alpha1().StorageVersionMigrations().Get(m.Name, metav1.GetOptions{})
if err == nil {
m = updated
}
return false, nil
})
}