forked from openshift/origin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcontroller.go
284 lines (263 loc) · 13.5 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
package deploymentconfig
import (
"fmt"
"strconv"
"github.com/golang/glog"
kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client/record"
kclient "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/runtime"
osclient "github.com/openshift/origin/pkg/client"
deployapi "github.com/openshift/origin/pkg/deploy/api"
deployutil "github.com/openshift/origin/pkg/deploy/util"
)
// DeploymentConfigController is responsible for creating a new deployment
// when:
//
// 1. The config version is > 0 and,
// 2. No deployment for the version exists.
//
// The controller reconciles deployments with the replica count specified on
// the config. The active deployment (that is, the latest successful
// deployment) will always be scaled to the config replica count. All other
// deployments will be scaled to zero.
//
// If a new version is observed for which no deployment exists, any running
// deployments will be cancelled. The controller will not attempt to scale
// running deployments.
type DeploymentConfigController struct {
// kubeClient provides acceess to Kube resources.
kubeClient kclient.Interface
// osClient provides access to OpenShift resources.
osClient osclient.Interface
// codec is used to build deployments from configs.
codec runtime.Codec
// recorder is used to record events.
recorder record.EventRecorder
}
// fatalError is an error which can't be retried.
type fatalError string
// transientError is an error which should always be retried (indefinitely).
type transientError string
func (e fatalError) Error() string {
return fmt.Sprintf("fatal error handling deployment config: %s", string(e))
}
func (e transientError) Error() string {
return "transient error handling deployment config: " + string(e)
}
func NewDeploymentConfigController(kubeClient kclient.Interface, osClient osclient.Interface, codec runtime.Codec, recorder record.EventRecorder) *DeploymentConfigController {
return &DeploymentConfigController{
kubeClient: kubeClient,
osClient: osClient,
codec: codec,
recorder: recorder,
}
}
func (c *DeploymentConfigController) Handle(config *deployapi.DeploymentConfig) error {
// There's nothing to reconcile until the version is nonzero.
if config.Status.LatestVersion == 0 {
glog.V(5).Infof("Waiting for first version of %q", deployutil.LabelForDeploymentConfig(config))
return c.updateStatus(config)
}
// Find all deployments owned by the deploymentConfig.
sel := deployutil.ConfigSelector(config.Name)
existingDeployments, err := c.kubeClient.ReplicationControllers(config.Namespace).List(kapi.ListOptions{LabelSelector: sel})
if err != nil {
return err
}
latestIsDeployed, latestDeployment := deployutil.LatestDeploymentInfo(config, existingDeployments)
// If the latest deployment doesn't exist yet, cancel any running
// deployments to allow them to be superceded by the new config version.
awaitingCancellations := false
if !latestIsDeployed {
for _, deployment := range existingDeployments.Items {
// Skip deployments with an outcome.
if deployutil.IsTerminatedDeployment(&deployment) {
continue
}
// Cancel running deployments.
awaitingCancellations = true
if !deployutil.IsDeploymentCancelled(&deployment) {
deployment.Annotations[deployapi.DeploymentCancelledAnnotation] = deployapi.DeploymentCancelledAnnotationValue
deployment.Annotations[deployapi.DeploymentStatusReasonAnnotation] = deployapi.DeploymentCancelledNewerDeploymentExists
_, err := c.kubeClient.ReplicationControllers(deployment.Namespace).Update(&deployment)
if err != nil {
c.recorder.Eventf(config, kapi.EventTypeWarning, "DeploymentCancellationFailed", "Failed to cancel deployment %q superceded by version %d: %s", deployment.Name, config.Status.LatestVersion, err)
} else {
c.recorder.Eventf(config, kapi.EventTypeNormal, "DeploymentCancelled", "Cancelled deployment %q superceded by version %d", deployment.Name, config.Status.LatestVersion)
}
}
}
}
// Wait for deployment cancellations before reconciling or creating a new
// deployment to avoid competing with existing deployment processes.
if awaitingCancellations {
c.recorder.Eventf(config, kapi.EventTypeNormal, "DeploymentAwaitingCancellation", "Deployment of version %d awaiting cancellation of older running deployments", config.Status.LatestVersion)
// raise a transientError so that the deployment config can be re-queued
return transientError(fmt.Sprintf("found previous inflight deployment for %s - requeuing", deployutil.LabelForDeploymentConfig(config)))
}
// If the latest deployment already exists, reconcile existing deployments
// and return early.
if latestIsDeployed {
// If the latest deployment is still running, try again later. We don't
// want to compete with the deployer.
if !deployutil.IsTerminatedDeployment(latestDeployment) {
return c.updateStatus(config)
}
return c.reconcileDeployments(existingDeployments, config)
}
// If the config is paused we shouldn't create new deployments for it.
// TODO: Make sure cleanup policy will work for paused configs.
if config.Spec.Paused {
return c.updateStatus(config)
}
// No deployments are running and the latest deployment doesn't exist, so
// create the new deployment.
deployment, err := deployutil.MakeDeployment(config, c.codec)
if err != nil {
return fatalError(fmt.Sprintf("couldn't make deployment from (potentially invalid) deployment config %s: %v", deployutil.LabelForDeploymentConfig(config), err))
}
created, err := c.kubeClient.ReplicationControllers(config.Namespace).Create(deployment)
if err != nil {
// If the deployment was already created, just move on. The cache could be
// stale, or another process could have already handled this update.
if errors.IsAlreadyExists(err) {
return c.updateStatus(config)
}
c.recorder.Eventf(config, kapi.EventTypeWarning, "DeploymentCreationFailed", "Couldn't deploy version %d: %s", config.Status.LatestVersion, err)
return fmt.Errorf("couldn't create deployment for deployment config %s: %v", deployutil.LabelForDeploymentConfig(config), err)
}
c.recorder.Eventf(config, kapi.EventTypeNormal, "DeploymentCreated", "Created new deployment %q for version %d", created.Name, config.Status.LatestVersion)
return c.updateStatus(config)
}
// reconcileDeployments reconciles existing deployment replica counts which
// could have diverged outside the deployment process (e.g. due to auto or
// manual scaling, or partial deployments). The active deployment is the last
// successful deployment, not necessarily the latest in terms of the config
// version. The active deployment replica count should follow the config, and
// all other deployments should be scaled to zero.
//
// Previously, scaling behavior was that the config replica count was used
// only for initial deployments and the active deployment had to be scaled up
// directly. To continue supporting that old behavior we must detect when the
// deployment has been directly manipulated, and if so, preserve the directly
// updated value and sync the config with the deployment.
func (c *DeploymentConfigController) reconcileDeployments(existingDeployments *kapi.ReplicationControllerList, config *deployapi.DeploymentConfig) error {
latestIsDeployed, latestDeployment := deployutil.LatestDeploymentInfo(config, existingDeployments)
if !latestIsDeployed {
// We shouldn't be reconciling if the latest deployment hasn't been
// created; this is enforced on the calling side, but double checking
// can't hurt.
return nil
}
activeDeployment := deployutil.ActiveDeployment(config, existingDeployments)
// Compute the replica count for the active deployment (even if the active
// deployment doesn't exist). The active replica count is the value that
// should be assigned to the config, to allow the replica propagation to
// flow downward from the config.
//
// By default we'll assume the config replicas should be used to update the
// active deployment except in special cases (like first sync or externally
// updated deployments.)
activeReplicas := config.Spec.Replicas
source := "the deploymentConfig itself (no change)"
activeDeploymentExists := activeDeployment != nil
activeDeploymentIsLatest := activeDeploymentExists && activeDeployment.Name == latestDeployment.Name
latestDesiredReplicas, latestHasDesiredReplicas := deployutil.DeploymentDesiredReplicas(latestDeployment)
switch {
case activeDeploymentExists && activeDeploymentIsLatest:
// The active/latest deployment follows the config unless this is its first
// sync or if an external change to the deployment replicas is detected.
lastActiveReplicas, hasLastActiveReplicas := deployutil.DeploymentReplicas(activeDeployment)
if !hasLastActiveReplicas || lastActiveReplicas != activeDeployment.Spec.Replicas {
activeReplicas = activeDeployment.Spec.Replicas
source = fmt.Sprintf("the latest/active deployment %q which was scaled directly or has not previously been synced", deployutil.LabelForDeployment(activeDeployment))
}
case activeDeploymentExists && !activeDeploymentIsLatest:
// The active/non-latest deployment follows the config if it was
// previously synced; if this is the first sync, infer what the config
// value should be based on either the latest desired or whatever the
// deployment is currently scaled to.
_, hasLastActiveReplicas := deployutil.DeploymentReplicas(activeDeployment)
if hasLastActiveReplicas {
break
}
if latestHasDesiredReplicas {
activeReplicas = latestDesiredReplicas
source = fmt.Sprintf("the desired replicas of latest deployment %q which has not been previously synced", deployutil.LabelForDeployment(latestDeployment))
} else if activeDeployment.Spec.Replicas > 0 {
activeReplicas = activeDeployment.Spec.Replicas
source = fmt.Sprintf("the active deployment %q which has not been previously synced", deployutil.LabelForDeployment(activeDeployment))
}
case !activeDeploymentExists && latestHasDesiredReplicas:
// If there's no active deployment, use the latest desired, if available.
activeReplicas = latestDesiredReplicas
source = fmt.Sprintf("the desired replicas of latest deployment %q with no active deployment", deployutil.LabelForDeployment(latestDeployment))
}
// Bring the config in sync with the deployment. Once we know the config
// accurately represents the desired replica count of the active deployment,
// we can safely reconcile deployments.
//
// If the deployment config is test, never update the deployment config based
// on deployments, since test behavior overrides user scaling.
switch {
case config.Spec.Replicas == activeReplicas:
case config.Spec.Test:
glog.V(4).Infof("Detected changed replicas for test deploymentConfig %q, ignoring that change", deployutil.LabelForDeploymentConfig(config))
default:
oldReplicas := config.Spec.Replicas
config.Spec.Replicas = activeReplicas
var err error
config, err = c.osClient.DeploymentConfigs(config.Namespace).Update(config)
if err != nil {
return err
}
glog.V(4).Infof("Synced deploymentConfig %q replicas from %d to %d based on %s", deployutil.LabelForDeploymentConfig(config), oldReplicas, activeReplicas, source)
}
// Reconcile deployments. The active deployment follows the config, and all
// other deployments should be scaled to zero.
for _, deployment := range existingDeployments.Items {
isActiveDeployment := activeDeployment != nil && deployment.Name == activeDeployment.Name
oldReplicaCount := deployment.Spec.Replicas
newReplicaCount := 0
if isActiveDeployment {
newReplicaCount = activeReplicas
}
if config.Spec.Test {
glog.V(4).Infof("Deployment config %q is test and deployment %q will be scaled down", deployutil.LabelForDeploymentConfig(config), deployutil.LabelForDeployment(&deployment))
newReplicaCount = 0
}
lastReplicas, hasLastReplicas := deployutil.DeploymentReplicas(&deployment)
// Only update if necessary.
if !hasLastReplicas || newReplicaCount != oldReplicaCount || lastReplicas != newReplicaCount {
deployment.Spec.Replicas = newReplicaCount
deployment.Annotations[deployapi.DeploymentReplicasAnnotation] = strconv.Itoa(newReplicaCount)
_, err := c.kubeClient.ReplicationControllers(deployment.Namespace).Update(&deployment)
if err != nil {
c.recorder.Eventf(config, kapi.EventTypeWarning, "DeploymentScaleFailed",
"Failed to scale deployment %q from %d to %d: %s", deployment.Name, oldReplicaCount, newReplicaCount, err)
return err
}
// Only report scaling events if we changed the replica count.
if oldReplicaCount != newReplicaCount {
c.recorder.Eventf(config, kapi.EventTypeNormal, "DeploymentScaled",
"Scaled deployment %q from %d to %d", deployment.Name, oldReplicaCount, newReplicaCount)
} else {
glog.V(4).Infof("Updated deployment %q replica annotation to match current replica count %d", deployutil.LabelForDeployment(&deployment), newReplicaCount)
}
}
}
return c.updateStatus(config)
}
func (c *DeploymentConfigController) updateStatus(config *deployapi.DeploymentConfig) error {
if config.Generation > config.Status.ObservedGeneration {
config.Status.ObservedGeneration = config.Generation
}
if _, err := c.osClient.DeploymentConfigs(config.Namespace).UpdateStatus(config); err != nil {
glog.V(2).Infof("Cannot update the status for %q: %v", deployutil.LabelForDeploymentConfig(config), err)
return transientError(fmt.Sprintf("cannot update the status for %q - requeuing", deployutil.LabelForDeploymentConfig(config)))
}
glog.V(4).Infof("Updated the status for %q (observed generation: %d)", deployutil.LabelForDeploymentConfig(config), config.Status.ObservedGeneration)
return nil
}