This repository has been archived by the owner on Jul 18, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
unidling_controller.go
373 lines (310 loc) · 13.4 KB
/
unidling_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
package controller
import (
"encoding/json"
"fmt"
"sync"
"time"
unidlingapi "github.com/openshift/origin/pkg/unidling/api"
unidlingutil "github.com/openshift/origin/pkg/unidling/util"
deployclient "github.com/openshift/origin/pkg/deploy/generated/internalclientset/typed/apps/internalversion"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
kextapi "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
kextclient "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/extensions/v1beta1"
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/util/workqueue"
"github.com/golang/glog"
)
const MaxRetries = 5
type lastFiredCache struct {
sync.RWMutex
items map[types.NamespacedName]time.Time
}
func (c *lastFiredCache) Get(info types.NamespacedName) time.Time {
c.RLock()
defer c.RUnlock()
return c.items[info]
}
func (c *lastFiredCache) Clear(info types.NamespacedName) {
c.Lock()
defer c.Unlock()
delete(c.items, info)
}
func (c *lastFiredCache) AddIfNewer(info types.NamespacedName, newLastFired time.Time) bool {
c.Lock()
defer c.Unlock()
if lastFired, hasLastFired := c.items[info]; !hasLastFired || lastFired.Before(newLastFired) {
c.items[info] = newLastFired
return true
}
return false
}
type UnidlingController struct {
controller cache.Controller
scaleNamespacer kextclient.ScalesGetter
endpointsNamespacer kcoreclient.EndpointsGetter
queue workqueue.RateLimitingInterface
lastFiredCache *lastFiredCache
// TODO: remove these once we get the scale-source functionality in the scale endpoints
dcNamespacer deployclient.DeploymentConfigsGetter
rcNamespacer kcoreclient.ReplicationControllersGetter
}
func NewUnidlingController(scaleNS kextclient.ScalesGetter, endptsNS kcoreclient.EndpointsGetter, evtNS kcoreclient.EventsGetter, dcNamespacer deployclient.DeploymentConfigsGetter, rcNamespacer kcoreclient.ReplicationControllersGetter, resyncPeriod time.Duration) *UnidlingController {
fieldSet := fields.Set{}
fieldSet["reason"] = unidlingapi.NeedPodsReason
fieldSelector := fieldSet.AsSelector()
unidlingController := &UnidlingController{
scaleNamespacer: scaleNS,
endpointsNamespacer: endptsNS,
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
lastFiredCache: &lastFiredCache{
items: make(map[types.NamespacedName]time.Time),
},
dcNamespacer: dcNamespacer,
rcNamespacer: rcNamespacer,
}
_, controller := cache.NewInformer(
&cache.ListWatch{
// No need to list -- we only care about new events
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fieldSelector.String()
return evtNS.Events(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = fieldSelector.String()
return evtNS.Events(metav1.NamespaceAll).Watch(options)
},
},
&kapi.Event{},
resyncPeriod,
cache.ResourceEventHandlerFuncs{
AddFunc: unidlingController.addEvent,
UpdateFunc: unidlingController.updateEvent,
// this is just to clean up our cache of the last seen times
DeleteFunc: unidlingController.checkAndClearFromCache,
},
)
unidlingController.controller = controller
return unidlingController
}
func (c *UnidlingController) addEvent(obj interface{}) {
evt, ok := obj.(*kapi.Event)
if !ok {
utilruntime.HandleError(fmt.Errorf("got non-Event object in event action: %v", obj))
return
}
c.enqueueEvent(evt)
}
func (c *UnidlingController) updateEvent(oldObj, newObj interface{}) {
evt, ok := newObj.(*kapi.Event)
if !ok {
utilruntime.HandleError(fmt.Errorf("got non-Event object in event action: %v", newObj))
return
}
c.enqueueEvent(evt)
}
func (c *UnidlingController) checkAndClearFromCache(obj interface{}) {
evt, objIsEvent := obj.(*kapi.Event)
if !objIsEvent {
tombstone, objIsTombstone := obj.(cache.DeletedFinalStateUnknown)
if !objIsTombstone {
utilruntime.HandleError(fmt.Errorf("got non-event, non-tombstone object in event action: %v", obj))
return
}
evt, objIsEvent = tombstone.Obj.(*kapi.Event)
if !objIsEvent {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not an Event in event action: %v", obj))
return
}
}
c.clearEventFromCache(evt)
}
// clearEventFromCache removes the entry for the given event from the lastFiredCache.
func (c *UnidlingController) clearEventFromCache(event *kapi.Event) {
if event.Reason != unidlingapi.NeedPodsReason {
return
}
info := types.NamespacedName{
Namespace: event.InvolvedObject.Namespace,
Name: event.InvolvedObject.Name,
}
c.lastFiredCache.Clear(info)
}
// equeueEvent checks if the given event is relevant (i.e. if it's a NeedPods event),
// and, if so, extracts relevant information, and enqueues that information in the
// processing queue.
func (c *UnidlingController) enqueueEvent(event *kapi.Event) {
if event.Reason != unidlingapi.NeedPodsReason {
return
}
info := types.NamespacedName{
Namespace: event.InvolvedObject.Namespace,
Name: event.InvolvedObject.Name,
}
// only add things to the queue if they're newer than what we already have
if c.lastFiredCache.AddIfNewer(info, event.LastTimestamp.Time) {
c.queue.Add(info)
}
}
func (c *UnidlingController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go c.controller.Run(stopCh)
go wait.Until(c.processRequests, time.Second, stopCh)
}
// processRequests calls awaitRequest repeatedly, until told to stop by
// the return value of awaitRequest.
func (c *UnidlingController) processRequests() {
for {
if !c.awaitRequest() {
return
}
}
}
// awaitRequest awaits a new request on the queue, and sends it off for processing.
// If more requests on the queue should be processed, it returns true. If we should
// stop processing, it returns false.
func (c *UnidlingController) awaitRequest() bool {
infoRaw, stop := c.queue.Get()
if stop {
return false
}
defer c.queue.Done(infoRaw)
info := infoRaw.(types.NamespacedName)
lastFired := c.lastFiredCache.Get(info)
var retry bool
var err error
if retry, err = c.handleRequest(info, lastFired); err == nil {
// if there was no error, we succeeded in the unidling, and we need to
// tell the rate limitter to stop tracking this request
c.queue.Forget(infoRaw)
return true
}
// check to see if we think the error was transient (e.g. server error on the update request),
// and if not, do not retry
if !retry {
utilruntime.HandleError(fmt.Errorf("Unable to process unidling event for %s/%s at (%s), will not retry: %v", info.Namespace, info.Name, lastFired, err))
return true
}
// Otherwise, if we have an error, we were at least partially unsuccessful in unidling, so
// we requeue the event to process later
// don't try to process failing requests forever
if c.queue.NumRequeues(infoRaw) > MaxRetries {
utilruntime.HandleError(fmt.Errorf("Unable to process unidling event for %s/%s (at %s), will not retry again: %v", info.Namespace, info.Name, lastFired, err))
c.queue.Forget(infoRaw)
return true
}
glog.V(4).Infof("Unable to fully process unidling request for %s/%s (at %s), will retry: %v", info.Namespace, info.Name, lastFired, err)
c.queue.AddRateLimited(infoRaw)
return true
}
// handleRequest handles a single request to unidle. After checking the validity of the request,
// it will examine the endpoints in question to determine which scalables to scale, and will scale
// them and remove them from the endpoints' list of idled scalables. If it is unable to properly
// process the request, it will return a boolean indicating whether or not we should retry later,
// as well as an error (e.g. if we're unable to parse an annotation, retrying later won't help,
// so it will return false).
func (c *UnidlingController) handleRequest(info types.NamespacedName, lastFired time.Time) (bool, error) {
// fetch the endpoints associated with the service in question
targetEndpoints, err := c.endpointsNamespacer.Endpoints(info.Namespace).Get(info.Name, metav1.GetOptions{})
if err != nil {
return true, fmt.Errorf("unable to retrieve endpoints: %v", err)
}
// make sure we actually were idled...
idledTimeRaw, wasIdled := targetEndpoints.Annotations[unidlingapi.IdledAtAnnotation]
if !wasIdled {
glog.V(5).Infof("UnidlingController received a NeedPods event for a service that was not idled, ignoring")
return false, nil
}
// ...and make sure this request was to wake up from the most recent idling, and not a previous one
idledTime, err := time.Parse(time.RFC3339, idledTimeRaw)
if err != nil {
// retrying here won't help, we're just stuck as idle since we can't get parse the idled time
return false, fmt.Errorf("unable to check idled-at time: %v", err)
}
if lastFired.Before(idledTime) {
glog.V(5).Infof("UnidlingController received an out-of-date NeedPods event, ignoring")
return false, nil
}
// TODO: ew, this is metav1. Such is life when working with annotations.
var targetScalables []unidlingapi.RecordedScaleReference
if targetScalablesStr, hasTargetScalables := targetEndpoints.Annotations[unidlingapi.UnidleTargetAnnotation]; hasTargetScalables {
if err = json.Unmarshal([]byte(targetScalablesStr), &targetScalables); err != nil {
// retrying here won't help, we're just stuck as idled since we can't parse the idled scalables list
return false, fmt.Errorf("unable to unmarshal target scalable references: %v", err)
}
} else {
glog.V(4).Infof("Service %s/%s had no scalables to unidle", info.Namespace, info.Name)
targetScalables = []unidlingapi.RecordedScaleReference{}
}
targetScalablesSet := make(map[unidlingapi.RecordedScaleReference]struct{}, len(targetScalables))
for _, v := range targetScalables {
targetScalablesSet[v] = struct{}{}
}
deleteIdlingAnnotations := func(_ int32, annotations map[string]string) {
delete(annotations, unidlingapi.IdledAtAnnotation)
delete(annotations, unidlingapi.PreviousScaleAnnotation)
}
scaleAnnotater := unidlingutil.NewScaleAnnotater(c.scaleNamespacer, c.dcNamespacer, c.rcNamespacer, deleteIdlingAnnotations)
for _, scalableRef := range targetScalables {
var scale *kextapi.Scale
var obj runtime.Object
obj, scale, err = scaleAnnotater.GetObjectWithScale(info.Namespace, scalableRef.CrossGroupObjectReference)
if err != nil {
if errors.IsNotFound(err) {
utilruntime.HandleError(fmt.Errorf("%s %q does not exist, removing from list of scalables while unidling service %s/%s: %v", scalableRef.Kind, scalableRef.Name, info.Namespace, info.Name, err))
delete(targetScalablesSet, scalableRef)
} else {
utilruntime.HandleError(fmt.Errorf("Unable to get scale for %s %q while unidling service %s/%s, will try again later: %v", scalableRef.Kind, scalableRef.Name, info.Namespace, info.Name, err))
}
continue
}
if scale.Spec.Replicas > 0 {
glog.V(4).Infof("%s %q is not idle, skipping while unidling service %s/%s", scalableRef.Kind, scalableRef.Name, info.Namespace, info.Name)
continue
}
scale.Spec.Replicas = scalableRef.Replicas
updater := unidlingutil.NewScaleUpdater(kapi.Codecs.LegacyCodec(kapi.Registry.EnabledVersions()...), info.Namespace, c.dcNamespacer, c.rcNamespacer)
if err = scaleAnnotater.UpdateObjectScale(updater, info.Namespace, scalableRef.CrossGroupObjectReference, obj, scale); err != nil {
if errors.IsNotFound(err) {
utilruntime.HandleError(fmt.Errorf("%s %q does not exist, removing from list of scalables while unidling service %s/%s: %v", scalableRef.Kind, scalableRef.Name, info.Namespace, info.Name, err))
delete(targetScalablesSet, scalableRef)
} else {
utilruntime.HandleError(fmt.Errorf("Unable to scale up %s %q while unidling service %s/%s: %v", scalableRef.Kind, scalableRef.Name, info.Namespace, info.Name, err))
}
continue
} else {
glog.V(4).Infof("Scaled up %s %q while unidling service %s/%s", scalableRef.Kind, scalableRef.Name, info.Namespace, info.Name)
}
delete(targetScalablesSet, scalableRef)
}
newAnnotationList := make([]unidlingapi.RecordedScaleReference, 0, len(targetScalablesSet))
for k := range targetScalablesSet {
newAnnotationList = append(newAnnotationList, k)
}
if len(newAnnotationList) == 0 {
delete(targetEndpoints.Annotations, unidlingapi.UnidleTargetAnnotation)
delete(targetEndpoints.Annotations, unidlingapi.IdledAtAnnotation)
} else {
var newAnnotationBytes []byte
newAnnotationBytes, err = json.Marshal(newAnnotationList)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to update/remove idle annotations from %s/%s: unable to marshal list of remaining scalables, removing list entirely: %v", info.Namespace, info.Name, err))
delete(targetEndpoints.Annotations, unidlingapi.UnidleTargetAnnotation)
delete(targetEndpoints.Annotations, unidlingapi.IdledAtAnnotation)
} else {
targetEndpoints.Annotations[unidlingapi.UnidleTargetAnnotation] = string(newAnnotationBytes)
}
}
if _, err = c.endpointsNamespacer.Endpoints(info.Namespace).Update(targetEndpoints); err != nil {
return true, fmt.Errorf("unable to update/remove idle annotations from %s/%s: %v", info.Namespace, info.Name, err)
}
return false, nil
}