forked from openshift/origin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
controller.go
211 lines (186 loc) · 8.64 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
package deployerpod
import (
"fmt"
"sort"
"github.com/golang/glog"
kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
kutil "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
deployapi "github.com/openshift/origin/pkg/deploy/api"
deployutil "github.com/openshift/origin/pkg/deploy/util"
)
// DeployerPodController keeps a deployment's status in sync with the deployer pod
// handling the deployment.
//
// Use the DeployerPodControllerFactory to create this controller.
type DeployerPodController struct {
// deploymentClient provides access to deployments.
deploymentClient deploymentClient
// deployerPodsFor returns all deployer pods for the named deployment.
deployerPodsFor func(namespace, name string) (*kapi.PodList, error)
// deletePod deletes a pod.
deletePod func(namespace, name string) error
}
// transientError is an error which will be retried indefinitely.
type transientError string
func (e transientError) Error() string { return "transient error handling deployer pod: " + string(e) }
// Handle syncs pod's status with any associated deployment.
func (c *DeployerPodController) Handle(pod *kapi.Pod) error {
// Find the deployment associated with the deployer pod.
deploymentName := deployutil.DeploymentNameFor(pod)
if len(deploymentName) == 0 {
return nil
}
// Reject updates to anything but the main deployer pod
// TODO: Find a way to filter this on the watch side.
if pod.Name != deployutil.DeployerPodNameForDeployment(deploymentName) {
return nil
}
deployment, err := c.deploymentClient.getDeployment(pod.Namespace, deploymentName)
// If the deployment for this pod has disappeared, we should clean up this
// and any other deployer pods, then bail out.
if err != nil {
// Some retrieval error occured. Retry.
if !kerrors.IsNotFound(err) {
return fmt.Errorf("couldn't get deployment %s/%s which owns deployer pod %s/%s", pod.Namespace, deploymentName, pod.Name, pod.Namespace)
}
// Find all the deployer pods for the deployment (including this one).
deployers, err := c.deployerPodsFor(pod.Namespace, deploymentName)
if err != nil {
// Retry.
return fmt.Errorf("couldn't get deployer pods for %s: %v", deployutil.LabelForDeployment(deployment), err)
}
// Delete all deployers.
for _, deployer := range deployers.Items {
err := c.deletePod(deployer.Namespace, deployer.Name)
if err != nil {
if !kerrors.IsNotFound(err) {
// TODO: Should this fire an event?
glog.V(2).Infof("Couldn't delete orphaned deployer pod %s/%s: %v", deployer.Namespace, deployer.Name, err)
}
} else {
// TODO: Should this fire an event?
glog.V(2).Infof("Deleted orphaned deployer pod %s/%s", deployer.Namespace, deployer.Name)
}
}
return nil
}
currentStatus := deployutil.DeploymentStatusFor(deployment)
nextStatus := currentStatus
switch pod.Status.Phase {
case kapi.PodRunning:
nextStatus = deployapi.DeploymentStatusRunning
case kapi.PodSucceeded:
// Detect failure based on the container state
nextStatus = deployapi.DeploymentStatusComplete
for _, info := range pod.Status.ContainerStatuses {
if info.State.Terminated != nil && info.State.Terminated.ExitCode != 0 {
nextStatus = deployapi.DeploymentStatusFailed
}
}
if nextStatus == deployapi.DeploymentStatusComplete {
delete(deployment.Annotations, deployapi.DesiredReplicasAnnotation)
}
case kapi.PodFailed:
// if the deployment is already marked Failed, do not attempt clean up again
if currentStatus != deployapi.DeploymentStatusFailed {
// clean up will also update the deployment status to Failed
// failure to clean up will result in retries and
// the deployment will not be marked Failed
// Note: this will prevent new deployments from being created for this config
err := c.cleanupFailedDeployment(deployment)
if err != nil {
return transientError(fmt.Sprintf("couldn't clean up failed deployment: %v", err))
}
}
}
if currentStatus != nextStatus {
deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(nextStatus)
if _, err := c.deploymentClient.updateDeployment(deployment.Namespace, deployment); err != nil {
if kerrors.IsNotFound(err) {
return nil
}
return fmt.Errorf("couldn't update Deployment %s to status %s: %v", deployutil.LabelForDeployment(deployment), nextStatus, err)
}
glog.V(4).Infof("Updated Deployment %s status from %s to %s", deployutil.LabelForDeployment(deployment), currentStatus, nextStatus)
}
return nil
}
func (c *DeployerPodController) cleanupFailedDeployment(deployment *kapi.ReplicationController) error {
// Scale down the current failed deployment
configName := deployutil.DeploymentConfigNameFor(deployment)
existingDeployments, err := c.deploymentClient.listDeploymentsForConfig(deployment.Namespace, configName)
if err != nil {
return fmt.Errorf("couldn't list Deployments for DeploymentConfig %s: %v", configName, err)
}
desiredReplicas, ok := deployutil.DeploymentDesiredReplicas(deployment)
if !ok {
// if desired replicas could not be found, then log the error
// and update the failed deployment
// this cannot be treated as a transient error
kutil.HandleError(fmt.Errorf("Could not determine desired replicas from %s to reset replicas for last completed deployment", deployutil.LabelForDeployment(deployment)))
}
if ok && len(existingDeployments.Items) > 0 {
sort.Sort(deployutil.DeploymentsByLatestVersionDesc(existingDeployments.Items))
for index, existing := range existingDeployments.Items {
// if a newer deployment exists:
// - set the replicas for the current failed deployment to 0
// - there is no point in scaling up the last completed deployment
// since that will be scaled down by the later deployment
if index == 0 && existing.Name != deployment.Name {
break
}
// the latest completed deployment is the one that needs to be scaled back up
if deployutil.DeploymentStatusFor(&existing) == deployapi.DeploymentStatusComplete {
if existing.Spec.Replicas == desiredReplicas {
break
}
// scale back the completed deployment to the target of the failed deployment
existing.Spec.Replicas = desiredReplicas
if _, err := c.deploymentClient.updateDeployment(existing.Namespace, &existing); err != nil {
if kerrors.IsNotFound(err) {
return nil
}
return fmt.Errorf("couldn't update replicas to %d for deployment %s: %v", desiredReplicas, deployutil.LabelForDeployment(&existing), err)
}
glog.V(4).Infof("Updated replicas to %d for deployment %s", desiredReplicas, deployutil.LabelForDeployment(&existing))
break
}
}
}
// set the replicas for the failed deployment to 0
// and set the status to Failed
deployment.Spec.Replicas = 0
deployment.Annotations[deployapi.DeploymentStatusAnnotation] = string(deployapi.DeploymentStatusFailed)
if _, err := c.deploymentClient.updateDeployment(deployment.Namespace, deployment); err != nil {
if kerrors.IsNotFound(err) {
return nil
}
return fmt.Errorf("couldn't scale down the deployment %s and mark it as failed: %v", deployutil.LabelForDeployment(deployment), err)
}
glog.V(4).Infof("Scaled down the deployment %s and marked it as failed", deployutil.LabelForDeployment(deployment))
return nil
}
// deploymentClient abstracts access to deployments.
type deploymentClient interface {
getDeployment(namespace, name string) (*kapi.ReplicationController, error)
updateDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error)
// listDeploymentsForConfig should return deployments associated with the
// provided config.
listDeploymentsForConfig(namespace, configName string) (*kapi.ReplicationControllerList, error)
}
// deploymentClientImpl is a pluggable deploymentControllerDeploymentClient.
type deploymentClientImpl struct {
getDeploymentFunc func(namespace, name string) (*kapi.ReplicationController, error)
updateDeploymentFunc func(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error)
listDeploymentsForConfigFunc func(namespace, configName string) (*kapi.ReplicationControllerList, error)
}
func (i *deploymentClientImpl) getDeployment(namespace, name string) (*kapi.ReplicationController, error) {
return i.getDeploymentFunc(namespace, name)
}
func (i *deploymentClientImpl) updateDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error) {
return i.updateDeploymentFunc(namespace, deployment)
}
func (i *deploymentClientImpl) listDeploymentsForConfig(namespace, configName string) (*kapi.ReplicationControllerList, error) {
return i.listDeploymentsForConfigFunc(namespace, configName)
}