forked from openshift/origin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
deployment_controller.go
211 lines (184 loc) · 8.46 KB
/
deployment_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
package controller
import (
"github.com/golang/glog"
kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
kerrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
deployapi "github.com/openshift/origin/pkg/deploy/api"
deployutil "github.com/openshift/origin/pkg/deploy/util"
)
// DeploymentController performs a deployment by creating a pod which is defined by a strategy.
// The status of the resulting deployment will follow the status of the corresponding pod.
//
// Deployments are represented by a ReplicationController.
type DeploymentController struct {
// ContainerCreator makes the container for the deployment pod based on the strategy.
ContainerCreator DeploymentContainerCreator
// DeploymentInterface provides access to deployments.
DeploymentInterface dcDeploymentInterface
// PodInterface provides access to pods.
PodInterface dcPodInterface
// NextDeployment blocks until the next deployment is available.
NextDeployment func() *kapi.ReplicationController
// NextPod blocks until the next pod is available.
NextPod func() *kapi.Pod
// DeploymentStore is a cache of deployments.
DeploymentStore cache.Store
// Environment is a set of environment which should be injected into all deployment pod
// containers, in addition to whatever environment is specified by the ContainerCreator.
Environment []kapi.EnvVar
// UseLocalImages configures the ImagePullPolicy for containers in the deployment pod.
UseLocalImages bool
// Codec is used to decode DeploymentConfigs.
Codec runtime.Codec
// Stop is an optional channel that controls when the controller exits.
Stop <-chan struct{}
}
// DeploymentContainerCreator knows how to create a deployment pod's container based on
// the deployment's strategy.
type DeploymentContainerCreator interface {
CreateContainer(*deployapi.DeploymentStrategy) *kapi.Container
}
type dcDeploymentInterface interface {
UpdateDeployment(namespace string, deployment *kapi.ReplicationController) (*kapi.ReplicationController, error)
}
type dcPodInterface interface {
CreatePod(namespace string, pod *kapi.Pod) (*kapi.Pod, error)
DeletePod(namespace, id string) error
}
// Run begins watching and synchronizing deployment states.
func (dc *DeploymentController) Run() {
go util.Until(func() { dc.HandleDeployment() }, 0, dc.Stop)
go util.Until(func() { dc.HandlePod() }, 0, dc.Stop)
}
// HandleDeployment processes a new deployment and creates a new Pod which implements the specific
// deployment behavior. The deployment and pod are correlated with annotations. If the pod was
// successfully created, the deployment's status is transitioned to pending; otherwise, the status
// is transitioned to failed.
func (dc *DeploymentController) HandleDeployment() {
deployment := dc.NextDeployment()
if deployment.Annotations[deployapi.DeploymentStatusAnnotation] != string(deployapi.DeploymentStatusNew) {
glog.V(4).Infof("Ignoring deployment %s with non-New status", deployment.Name)
return
}
// TODO: transition to a failed state? seems like yes since this is probably not recoverable
var deploymentPod *kapi.Pod
var deploymentPodError error
if deploymentPod, deploymentPodError = dc.makeDeploymentPod(deployment); deploymentPodError != nil {
glog.V(0).Infof("Failed to make deployment pod for %s: %v", deployment.Name, deploymentPodError)
return
}
nextStatus := deployment.Annotations[deployapi.DeploymentStatusAnnotation]
if pod, err := dc.PodInterface.CreatePod(deployment.Namespace, deploymentPod); err != nil {
// If the pod already exists, it's possible that a previous CreatePod succeeded but
// the deployment state update failed and now we're re-entering.
if kerrors.IsAlreadyExists(err) {
nextStatus = string(deployapi.DeploymentStatusPending)
} else {
glog.Infof("Error creating pod for deployment %s: %v", deployment.Name, err)
nextStatus = string(deployapi.DeploymentStatusFailed)
}
} else {
glog.V(2).Infof("Created pod %s for deployment %s", pod.Name, deployment.Name)
deployment.Annotations[deployapi.DeploymentPodAnnotation] = pod.Name
nextStatus = string(deployapi.DeploymentStatusPending)
}
deployment.Annotations[deployapi.DeploymentStatusAnnotation] = nextStatus
glog.V(2).Infof("Updating deployment %s status %s -> %s", deployment.Name, deployment.Status, nextStatus)
if _, err := dc.DeploymentInterface.UpdateDeployment(deployment.Namespace, deployment); err != nil {
glog.V(2).Infof("Failed to update deployment %s: %v", deployment.Name, err)
}
}
// HandlePod reconciles a pod's current state with its associated deployment and updates the
// deployment appropriately.
func (dc *DeploymentController) HandlePod() {
pod := dc.NextPod()
// Verify the assumption that we'll be given only pods correlated to a deployment
deploymentID, hasDeploymentID := pod.Annotations[deployapi.DeploymentAnnotation]
if !hasDeploymentID {
glog.V(2).Infof("Unexpected state: Pod %s has no deployment annotation; skipping", pod.Name)
return
}
deploymentObj, deploymentExists, err := dc.DeploymentStore.Get(&kapi.ReplicationController{ObjectMeta: kapi.ObjectMeta{Name: deploymentID, Namespace: pod.Namespace}})
if err != nil {
glog.Errorf("Unable to retrieve deployment from store: %v", err)
return
}
if !deploymentExists {
glog.V(2).Infof("Couldn't find deployment %s associated with pod %s", deploymentID, pod.Name)
return
}
deployment := deploymentObj.(*kapi.ReplicationController)
nextDeploymentStatus := deployment.Annotations[deployapi.DeploymentStatusAnnotation]
switch pod.Status.Phase {
case kapi.PodRunning:
nextDeploymentStatus = string(deployapi.DeploymentStatusRunning)
case kapi.PodSucceeded, kapi.PodFailed:
nextDeploymentStatus = string(deployapi.DeploymentStatusComplete)
// Detect failure based on the container state
for _, info := range pod.Status.Info {
if info.State.Termination != nil && info.State.Termination.ExitCode != 0 {
nextDeploymentStatus = string(deployapi.DeploymentStatusFailed)
}
}
// Automatically clean up successful pods
if nextDeploymentStatus == string(deployapi.DeploymentStatusComplete) {
if err := dc.PodInterface.DeletePod(deployment.Namespace, pod.Name); err != nil {
glog.V(4).Infof("Couldn't delete completed pod %s for deployment %s: %#v", pod.Name, deployment.Name, err)
} else {
glog.V(4).Infof("Deleted completed pod %s for deployment %s", pod.Name, deployment.Name)
}
}
}
if deployment.Annotations[deployapi.DeploymentStatusAnnotation] != nextDeploymentStatus {
glog.V(2).Infof("Updating deployment %s status %s -> %s", deployment.Name, deployment.Annotations[deployapi.DeploymentStatusAnnotation], nextDeploymentStatus)
deployment.Annotations[deployapi.DeploymentStatusAnnotation] = nextDeploymentStatus
if _, err := dc.DeploymentInterface.UpdateDeployment(pod.Namespace, deployment); err != nil {
glog.V(2).Infof("Failed to update deployment %v: %v", deployment.Name, err)
}
}
}
// makeDeploymentPod creates a pod which implements deployment behavior. The pod is correlated to
// the deployment with an annotation.
func (dc *DeploymentController) makeDeploymentPod(deployment *kapi.ReplicationController) (*kapi.Pod, error) {
var deploymentConfig *deployapi.DeploymentConfig
var decodeError error
if deploymentConfig, decodeError = deployutil.DecodeDeploymentConfig(deployment, dc.Codec); decodeError != nil {
return nil, decodeError
}
container := dc.ContainerCreator.CreateContainer(&deploymentConfig.Template.Strategy)
// Combine the container environment, controller environment, and deployment values into
// the pod's environment.
envVars := container.Env
envVars = append(envVars, kapi.EnvVar{Name: "OPENSHIFT_DEPLOYMENT_NAME", Value: deployment.Name})
envVars = append(envVars, kapi.EnvVar{Name: "OPENSHIFT_DEPLOYMENT_NAMESPACE", Value: deployment.Namespace})
for _, env := range dc.Environment {
envVars = append(envVars, env)
}
pod := &kapi.Pod{
ObjectMeta: kapi.ObjectMeta{
Annotations: map[string]string{
deployapi.DeploymentAnnotation: deployment.Name,
},
},
Spec: kapi.PodSpec{
Containers: []kapi.Container{
{
Name: "deployment",
Command: container.Command,
Image: container.Image,
Env: envVars,
},
},
RestartPolicy: kapi.RestartPolicy{
Never: &kapi.RestartPolicyNever{},
},
},
}
if dc.UseLocalImages {
pod.Spec.Containers[0].ImagePullPolicy = kapi.PullIfNotPresent
}
return pod, nil
}