forked from kubernetes/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 1
/
registry.go
340 lines (289 loc) · 10.2 KB
/
registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package executor
import (
"encoding/json"
"errors"
"sync"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/contrib/mesos/pkg/executor/messages"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/pkg/api"
log "github.com/golang/glog"
)
type (
podEventType int
PodEvent struct {
pod *api.Pod
taskID string
eventType podEventType
}
// Registry is a state store for pod task metadata. Clients are expected to watch() the
// event stream to observe changes over time.
Registry interface {
// Update modifies the registry's iternal representation of the pod; it may also
// modify the pod argument itself. An update may fail because either a pod isn't
// labeled with a task ID, the task ID is unknown, or the nature of the update may
// be incompatible with what's supported in kubernetes-mesos.
Update(pod *api.Pod) (*PodEvent, error)
// Remove the task from this registry, returns an error if the taskID is unknown.
Remove(taskID string) error
// bind associates a taskID with a pod, triggers the binding API on the k8s apiserver
// and stores the resulting pod-task metadata.
bind(taskID string, pod *api.Pod) error
// watch returns the event stream of the registry. clients are expected to read this
// stream otherwise the event buffer will fill up and registry ops will block.
watch() <-chan *PodEvent
// return true if there are no tasks registered
empty() bool
// return the api.Pod registered to the given taskID or else nil
pod(taskID string) *api.Pod
// shutdown any related async processing and clear the internal state of the registry
shutdown()
}
registryImpl struct {
client *clientset.Clientset
updates chan *PodEvent
m sync.RWMutex
boundTasks map[string]*api.Pod
}
)
var (
errCreateBindingFailed = errors.New(messages.CreateBindingFailure)
errAnnotationUpdateFailure = errors.New(messages.AnnotationUpdateFailure)
errUnknownTask = errors.New("unknown task ID")
errUnsupportedUpdate = errors.New("pod update allowed by k8s is incompatible with this version of k8s-mesos")
)
const (
PodEventBound podEventType = iota
PodEventUpdated
PodEventDeleted
PodEventIncompatibleUpdate
updatesBacklogSize = 200
)
func IsUnsupportedUpdate(err error) bool {
return err == errUnsupportedUpdate
}
func (rp *PodEvent) Task() string {
return rp.taskID
}
func (rp *PodEvent) Pod() *api.Pod {
return rp.pod
}
func (rp *PodEvent) FormatShort() string {
return "task '" + rp.taskID + "' pod '" + rp.pod.Namespace + "/" + rp.pod.Name + "'"
}
func NewRegistry(client *clientset.Clientset) Registry {
r := ®istryImpl{
client: client,
updates: make(chan *PodEvent, updatesBacklogSize),
boundTasks: make(map[string]*api.Pod),
}
return r
}
func (r *registryImpl) watch() <-chan *PodEvent {
return r.updates
}
func taskIDFor(pod *api.Pod) (taskID string, err error) {
taskID = pod.Annotations[meta.TaskIdKey]
if taskID == "" {
err = errUnknownTask
}
return
}
func (r *registryImpl) shutdown() {
//TODO(jdef) flesh this out
r.m.Lock()
defer r.m.Unlock()
r.boundTasks = map[string]*api.Pod{}
}
func (r *registryImpl) empty() bool {
r.m.RLock()
defer r.m.RUnlock()
return len(r.boundTasks) == 0
}
func (r *registryImpl) pod(taskID string) *api.Pod {
r.m.RLock()
defer r.m.RUnlock()
return r.boundTasks[taskID]
}
func (r *registryImpl) Remove(taskID string) error {
r.m.Lock()
defer r.m.Unlock()
pod, ok := r.boundTasks[taskID]
if !ok {
return errUnknownTask
}
delete(r.boundTasks, taskID)
r.updates <- &PodEvent{
pod: pod,
taskID: taskID,
eventType: PodEventDeleted,
}
log.V(1).Infof("unbound task %v from pod %v/%v", taskID, pod.Namespace, pod.Name)
return nil
}
func (r *registryImpl) Update(pod *api.Pod) (*PodEvent, error) {
// Don't do anything for pods without task anotation which means:
// - "pre-scheduled" pods which have a NodeName set to this node without being scheduled already.
// - static/mirror pods: they'll never have a TaskID annotation, and we don't expect them to ever change.
// - all other pods that haven't passed through the launch-task-binding phase, which would set annotations.
taskID, err := taskIDFor(pod)
if err != nil {
// There also could be a race between the overall launch-task process and this update, but here we
// will never be able to process such a stale update because the "update pod" that we're receiving
// in this func won't yet have a task ID annotation. It follows that we can safely drop such a stale
// update on the floor because we'll get another update later that, in addition to the changes that
// we're dropping now, will also include the changes from the binding process.
log.V(5).Infof("ignoring pod update for %s/%s because %s annotation is missing", pod.Namespace, pod.Name, meta.TaskIdKey)
return nil, err
}
// be a good citizen: copy the arg before making any changes to it
clone, err := api.Scheme.DeepCopy(pod)
if err != nil {
return nil, err
}
pod = clone.(*api.Pod)
r.m.Lock()
defer r.m.Unlock()
oldPod, ok := r.boundTasks[taskID]
if !ok {
return nil, errUnknownTask
}
registeredPod := &PodEvent{
pod: pod,
taskID: taskID,
eventType: PodEventUpdated,
}
// TODO(jdef) would be nice to only execute this logic based on the presence of
// some particular annotation:
// - preserve the original container port spec since the k8sm scheduler
// has likely changed it.
if !copyPorts(pod, oldPod) {
// TODO(jdef) the state of "pod" is possibly inconsistent at this point.
// we don't care for the moment - we might later.
registeredPod.eventType = PodEventIncompatibleUpdate
r.updates <- registeredPod
log.Warningf("pod containers changed in an incompatible way; aborting update")
return registeredPod, errUnsupportedUpdate
}
// update our internal copy and broadcast the change
r.boundTasks[taskID] = pod
r.updates <- registeredPod
log.V(1).Infof("updated task %v pod %v/%v", taskID, pod.Namespace, pod.Name)
return registeredPod, nil
}
// copyPorts copies the container pod specs from src to dest and returns
// true if all ports (in both dest and src) are accounted for, otherwise
// false. if returning false then it's possible that only a partial copy
// has been performed.
func copyPorts(dest, src *api.Pod) bool {
containers := src.Spec.Containers
ctPorts := make(map[string][]api.ContainerPort, len(containers))
for i := range containers {
ctPorts[containers[i].Name] = containers[i].Ports
}
containers = dest.Spec.Containers
for i := range containers {
name := containers[i].Name
if ports, found := ctPorts[name]; found {
containers[i].Ports = ports
delete(ctPorts, name)
} else {
// old pod spec is missing this container?!
return false
}
}
if len(ctPorts) > 0 {
// new pod spec has containers that aren't in the old pod spec
return false
}
return true
}
func (r *registryImpl) bind(taskID string, pod *api.Pod) error {
// validate taskID matches that of the annotation
annotatedTaskID, err := taskIDFor(pod)
if err != nil {
log.Warning("failed to bind: missing task ID annotation for pod ", pod.Namespace+"/"+pod.Name)
return errCreateBindingFailed
}
if annotatedTaskID != taskID {
log.Warningf("failed to bind: expected task-id %v instead of %v for pod %v/%v", taskID, annotatedTaskID, pod.Namespace, pod.Name)
return errCreateBindingFailed
}
// record this as a bound task for now so that we can avoid racing with the mesos pod source, who is
// watching the apiserver for pod updates and will verify pod-task validity with us upon receiving such
boundSuccessfully := false
defer func() {
if !boundSuccessfully {
r.m.Lock()
defer r.m.Unlock()
delete(r.boundTasks, taskID)
}
}()
func() {
r.m.Lock()
defer r.m.Unlock()
r.boundTasks[taskID] = pod
}()
if pod.Spec.NodeName == "" {
//HACK(jdef): cloned binding construction from k8s plugin/pkg/scheduler/framework.go
binding := &api.Binding{
ObjectMeta: api.ObjectMeta{
Namespace: pod.Namespace,
Name: pod.Name,
Annotations: make(map[string]string),
},
Target: api.ObjectReference{
Kind: "Node",
Name: pod.Annotations[meta.BindingHostKey],
},
}
// forward the annotations that the scheduler wants to apply
for k, v := range pod.Annotations {
binding.Annotations[k] = v
}
// create binding on apiserver
log.Infof("Binding task %v pod '%v/%v' to '%v' with annotations %+v...",
taskID, pod.Namespace, pod.Name, binding.Target.Name, binding.Annotations)
ctx := api.WithNamespace(api.NewContext(), binding.Namespace)
err := r.client.CoreClient.Post().Namespace(api.NamespaceValue(ctx)).Resource("bindings").Body(binding).Do().Error()
if err != nil {
log.Warningf("failed to bind task %v pod %v/%v: %v", taskID, pod.Namespace, pod.Name, err)
return errCreateBindingFailed
}
} else {
// post annotations update to apiserver
patch := struct {
Metadata struct {
Annotations map[string]string `json:"annotations"`
} `json:"metadata"`
}{}
patch.Metadata.Annotations = pod.Annotations
patchJson, _ := json.Marshal(patch)
log.V(4).Infof("Patching annotations %v of task %v pod %v/%v: %v", pod.Annotations, taskID, pod.Namespace, pod.Name, string(patchJson))
err := r.client.CoreClient.Patch(api.MergePatchType).RequestURI(pod.SelfLink).Body(patchJson).Do().Error()
if err != nil {
log.Errorf("Error updating annotations of ready-to-launch task %v pod %v/%v: %v", taskID, pod.Namespace, pod.Name, err)
return errAnnotationUpdateFailure
}
}
boundSuccessfully = true
r.updates <- &PodEvent{
pod: pod,
taskID: taskID,
eventType: PodEventBound,
}
log.V(1).Infof("bound task %v to pod %v/%v", taskID, pod.Namespace, pod.Name)
return nil
}