/
node.go
720 lines (632 loc) · 27.1 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
// Copyright © 2017 The virtual-kubelet authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package node
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
pkgerrors "github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
coordclientset "k8s.io/client-go/kubernetes/typed/coordination/v1"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/retry"
"k8s.io/utils/clock"
)
const (
// Annotation with the JSON-serialized last applied node conditions. Based on kube ctl apply. Used to calculate
// the three-way patch
virtualKubeletLastNodeAppliedNodeStatus = "virtual-kubelet.io/last-applied-node-status"
virtualKubeletLastNodeAppliedObjectMeta = "virtual-kubelet.io/last-applied-object-meta"
)
var (
// ErrConflictingLeaseControllerConfiguration is returned when the lease controller related options have been
// specified multiple times
ErrConflictingLeaseControllerConfiguration = pkgerrors.New("Multiple, conflicting lease configurations have been put into place")
)
// NodeProvider is the interface used for registering a node and updating its
// status in Kubernetes.
//
// Note: Implementers can choose to manage a node themselves, in which case
// it is not needed to provide an implementation for this interface.
type NodeProvider interface { //nolint:revive
// Ping checks if the node is still active.
// This is intended to be lightweight as it will be called periodically as a
// heartbeat to keep the node marked as ready in Kubernetes.
Ping(context.Context) error
// NotifyNodeStatus is used to asynchronously monitor the node.
// The passed in callback should be called any time there is a change to the
// node's status.
// This will generally trigger a call to the Kubernetes API server to update
// the status.
//
// NotifyNodeStatus should not block callers.
NotifyNodeStatus(ctx context.Context, cb func(*corev1.Node))
}
// NewNodeController creates a new node controller.
// This does not have any side-effects on the system or kubernetes.
//
// Use the node's `Run` method to register and run the loops to update the node
// in Kubernetes.
//
// Note: When if there are multiple NodeControllerOpts which apply against the same
// underlying options, the last NodeControllerOpt will win.
func NewNodeController(p NodeProvider, node *corev1.Node, nodes v1.NodeInterface, opts ...NodeControllerOpt) (*NodeController, error) {
n := &NodeController{
p: p,
serverNode: node,
nodes: nodes,
chReady: make(chan struct{}),
chDone: make(chan struct{}),
}
for _, o := range opts {
if err := o(n); err != nil {
return nil, pkgerrors.Wrap(err, "error applying node option")
}
}
if n.pingInterval == time.Duration(0) {
n.pingInterval = DefaultPingInterval
}
if n.statusInterval == time.Duration(0) {
n.statusInterval = DefaultStatusUpdateInterval
}
n.nodePingController = newNodePingController(n.p, n.pingInterval, n.pingTimeout)
return n, nil
}
// NodeControllerOpt are the functional options used for configuring a node
type NodeControllerOpt func(*NodeController) error //nolint:revive
// WithNodeEnableLeaseV1 enables support for v1 leases.
// V1 Leases share all the same properties as v1beta1 leases, except they do not fallback like
// the v1beta1 lease controller does if the API server does not support it. If the lease duration is not specified (0)
// then DefaultLeaseDuration will be used
func WithNodeEnableLeaseV1(client coordclientset.LeaseInterface, leaseDurationSeconds int32) NodeControllerOpt {
if leaseDurationSeconds == 0 {
leaseDurationSeconds = DefaultLeaseDuration
}
interval := float64(leaseDurationSeconds) * DefaultRenewIntervalFraction
intervalDuration := time.Second * time.Duration(int(interval))
return WithNodeEnableLeaseV1WithRenewInterval(client, leaseDurationSeconds, intervalDuration)
}
// WithNodeEnableLeaseV1WithRenewInterval enables support for v1 leases, and sets a specific renew interval,
// as opposed to the standard multiplier specified by DefaultRenewIntervalFraction
func WithNodeEnableLeaseV1WithRenewInterval(client coordclientset.LeaseInterface, leaseDurationSeconds int32, interval time.Duration) NodeControllerOpt {
if client == nil {
panic("client is nil")
}
if leaseDurationSeconds == 0 {
leaseDurationSeconds = DefaultLeaseDuration
}
return func(n *NodeController) error {
if n.leaseController != nil {
return ErrConflictingLeaseControllerConfiguration
}
leaseController, err := newLeaseControllerWithRenewInterval(
&clock.RealClock{},
client,
leaseDurationSeconds,
interval,
n,
)
if err != nil {
return fmt.Errorf("Unable to configure lease controller: %w", err)
}
n.leaseController = leaseController
return nil
}
}
// WithNodePingTimeout limits the amount of time that the virtual kubelet will wait for the node provider to
// respond to the ping callback. If it does not return within this time, it will be considered an error
// condition
func WithNodePingTimeout(timeout time.Duration) NodeControllerOpt {
return func(n *NodeController) error {
n.pingTimeout = &timeout
return nil
}
}
// WithNodePingInterval sets the interval between checking for node statuses via Ping()
// If node leases are not supported (or not enabled), this is the frequency
// with which the node status will be updated in Kubernetes.
func WithNodePingInterval(d time.Duration) NodeControllerOpt {
return func(n *NodeController) error {
n.pingInterval = d
return nil
}
}
// WithNodeStatusUpdateInterval sets the interval for updating node status
// This is only used when leases are supported and only for updating the actual
// node status, not the node lease.
// When node leases are not enabled (or are not supported on the cluster) this
// has no affect and node status is updated on the "ping" interval.
func WithNodeStatusUpdateInterval(d time.Duration) NodeControllerOpt {
return func(n *NodeController) error {
n.statusInterval = d
return nil
}
}
// WithNodeStatusUpdateErrorHandler adds an error handler for cases where there is an error
// when updating the node status.
// This allows the caller to have some control on how errors are dealt with when
// updating a node's status.
//
// The error passed to the handler will be the error received from kubernetes
// when updating node status.
func WithNodeStatusUpdateErrorHandler(h ErrorHandler) NodeControllerOpt {
return func(n *NodeController) error {
n.nodeStatusUpdateErrorHandler = h
return nil
}
}
// ErrorHandler is a type of function used to allow callbacks for handling errors.
// It is expected that if a nil error is returned that the error is handled and
// progress can continue (or a retry is possible).
type ErrorHandler func(context.Context, error) error
// NodeController deals with creating and managing a node object in Kubernetes.
// It can register a node with Kubernetes and periodically update its status.
// NodeController manages a single node entity.
type NodeController struct { //nolint:revive
p NodeProvider
// serverNode must be updated each time it is updated in API Server
serverNodeLock sync.Mutex
serverNode *corev1.Node
nodes v1.NodeInterface
leaseController *leaseController
pingInterval time.Duration
statusInterval time.Duration
chStatusUpdate chan *corev1.Node
nodeStatusUpdateErrorHandler ErrorHandler
// chReady is closed once the controller is ready to start the control loop
chReady chan struct{}
// chDone is closed once the control loop has exited
chDone chan struct{}
errMu sync.Mutex
err error
nodePingController *nodePingController
pingTimeout *time.Duration
group wait.Group
}
// The default intervals used for lease and status updates.
const (
DefaultPingInterval = 10 * time.Second
DefaultStatusUpdateInterval = 1 * time.Minute
)
// Run registers the node in kubernetes and starts loops for updating the node
// status in Kubernetes.
//
// The node status must be updated periodically in Kubernetes to keep the node
// active. Newer versions of Kubernetes support node leases, which are
// essentially light weight pings. Older versions of Kubernetes require updating
// the node status periodically.
//
// If Kubernetes supports node leases this will use leases with a much slower
// node status update (because some things still expect the node to be updated
// periodically), otherwise it will only use node status update with the configured
// ping interval.
func (n *NodeController) Run(ctx context.Context) (retErr error) {
defer func() {
n.errMu.Lock()
n.err = retErr
n.errMu.Unlock()
close(n.chDone)
}()
n.chStatusUpdate = make(chan *corev1.Node, 1)
n.p.NotifyNodeStatus(ctx, func(node *corev1.Node) {
n.chStatusUpdate <- node
})
n.group.StartWithContext(ctx, n.nodePingController.Run)
n.serverNodeLock.Lock()
providerNode := n.serverNode.DeepCopy()
n.serverNodeLock.Unlock()
if err := n.ensureNode(ctx, providerNode); err != nil {
return err
}
if n.leaseController != nil {
log.G(ctx).WithField("leaseController", n.leaseController).Debug("Starting leasecontroller")
n.group.StartWithContext(ctx, n.leaseController.Run)
}
return n.controlLoop(ctx, providerNode)
}
// Done signals to the caller when the controller is done and the control loop is exited.
//
// Call n.Err() to find out if there was an error.
func (n *NodeController) Done() <-chan struct{} {
return n.chDone
}
// Err returns any errors that have occurred that trigger the control loop to exit.
//
// Err only returns a non-nil error after `<-n.Done()` returns.
func (n *NodeController) Err() error {
n.errMu.Lock()
defer n.errMu.Unlock()
return n.err
}
func (n *NodeController) ensureNode(ctx context.Context, providerNode *corev1.Node) (err error) {
ctx, span := trace.StartSpan(ctx, "node.ensureNode")
defer span.End()
defer func() {
span.SetStatus(err)
}()
err = n.updateStatus(ctx, providerNode, true)
if err == nil || !errors.IsNotFound(err) {
return err
}
n.serverNodeLock.Lock()
serverNode := n.serverNode
n.serverNodeLock.Unlock()
node, err := n.nodes.Create(ctx, serverNode, metav1.CreateOptions{})
if err != nil {
return pkgerrors.Wrap(err, "error registering node with kubernetes")
}
n.serverNodeLock.Lock()
n.serverNode = node
n.serverNodeLock.Unlock()
// Bad things will happen if the node is deleted in k8s and recreated by someone else
// we rely on this persisting
providerNode.ObjectMeta.Name = node.Name
providerNode.ObjectMeta.Namespace = node.Namespace
providerNode.ObjectMeta.UID = node.UID
return nil
}
// Ready returns a channel that gets closed when the node is fully up and
// running. Note that if there is an error on startup this channel will never
// be closed.
func (n *NodeController) Ready() <-chan struct{} {
return n.chReady
}
func (n *NodeController) controlLoop(ctx context.Context, providerNode *corev1.Node) error {
defer n.group.Wait()
var sleepInterval time.Duration
if n.leaseController == nil {
log.G(ctx).WithField("pingInterval", n.pingInterval).Debug("lease controller is not enabled, updating node status in Kube API server at Ping Time Interval")
sleepInterval = n.pingInterval
} else {
log.G(ctx).WithField("statusInterval", n.statusInterval).Debug("lease controller in use, updating at statusInterval")
sleepInterval = n.statusInterval
}
loop := func() bool {
ctx, span := trace.StartSpan(ctx, "node.controlLoop.loop")
defer span.End()
var timer *time.Timer
ctx = span.WithField(ctx, "sleepTime", n.pingInterval)
timer = time.NewTimer(sleepInterval)
defer timer.Stop()
select {
case <-ctx.Done():
return true
case updated := <-n.chStatusUpdate:
log.G(ctx).Debug("Received node status update")
providerNode.Status = updated.Status
providerNode.ObjectMeta.Annotations = updated.Annotations
providerNode.ObjectMeta.Labels = updated.Labels
if err := n.updateStatus(ctx, providerNode, false); err != nil {
log.G(ctx).WithError(err).Error("Error handling node status update")
}
case <-timer.C:
if err := n.updateStatus(ctx, providerNode, false); err != nil {
log.G(ctx).WithError(err).Error("Error handling node status update")
}
}
return false
}
close(n.chReady)
for {
shouldTerminate := loop()
if shouldTerminate {
return nil
}
}
}
func (n *NodeController) updateStatus(ctx context.Context, providerNode *corev1.Node, skipErrorCb bool) (err error) {
ctx, span := trace.StartSpan(ctx, "node.updateStatus")
defer span.End()
defer func() {
span.SetStatus(err)
}()
if result, err := n.nodePingController.getResult(ctx); err != nil {
return err
} else if result.error != nil {
return fmt.Errorf("Not updating node status because node ping failed: %w", result.error)
}
updateNodeStatusHeartbeat(providerNode)
node, err := updateNodeStatus(ctx, n.nodes, providerNode)
if err != nil {
if skipErrorCb || n.nodeStatusUpdateErrorHandler == nil {
return err
}
if err := n.nodeStatusUpdateErrorHandler(ctx, err); err != nil {
return err
}
// This might have recreated the node, which may cause problems with our leases until a node update succeeds
node, err = updateNodeStatus(ctx, n.nodes, providerNode)
if err != nil {
return err
}
}
n.serverNodeLock.Lock()
n.serverNode = node
n.serverNodeLock.Unlock()
return nil
}
// Returns a copy of the server node object
func (n *NodeController) getServerNode(ctx context.Context) (*corev1.Node, error) {
n.serverNodeLock.Lock()
defer n.serverNodeLock.Unlock()
if n.serverNode == nil {
return nil, pkgerrors.New("Server node does not yet exist")
}
return n.serverNode.DeepCopy(), nil
}
// just so we don't have to allocate this on every get request
var emptyGetOptions = metav1.GetOptions{}
func prepareThreewayPatchBytesForNodeStatus(nodeFromProvider, apiServerNode *corev1.Node) ([]byte, error) {
// We use these two values to calculate a patch. We use a three-way patch in order to avoid
// causing state regression server side. For example, let's consider the scenario:
/*
UML Source:
@startuml
participant VK
participant K8s
participant ExternalUpdater
note right of VK: Updates internal node conditions to [A, B]
VK->K8s: Patch Upsert [A, B]
note left of K8s: Node conditions are [A, B]
ExternalUpdater->K8s: Patch Upsert [C]
note left of K8s: Node Conditions are [A, B, C]
note right of VK: Updates internal node conditions to [A]
VK->K8s: Patch: delete B, upsert A\nThis is where things go wrong,\nbecause the patch is written to replace all node conditions\nit overwrites (drops) [C]
note left of K8s: Node Conditions are [A]\nNode condition C from ExternalUpdater is no longer present
@enduml
,--. ,---. ,---------------.
|VK| |K8s| |ExternalUpdater|
`+-' `-+-' `-------+-------'
| ,------------------------------------------!. | |
| |Updates internal node conditions to [A, B]|_\ | |
| `--------------------------------------------' | |
| Patch Upsert [A, B] | |
| -----------------------------------------------------------> |
| | |
| ,--------------------------!. | |
| |Node conditions are [A, B]|_\| |
| `----------------------------'| |
| | Patch Upsert [C] |
| | <-------------------
| | |
| ,-----------------------------!. | |
| |Node Conditions are [A, B, C]|_\| |
| `-------------------------------'| |
| ,---------------------------------------!. | |
| |Updates internal node conditions to [A]|_\ | |
| `-----------------------------------------' | |
| Patch: delete B, upsert A | |
| This is where things go wrong, | |
| because the patch is written to replace all node conditions| |
| it overwrites (drops) [C] | |
| -----------------------------------------------------------> |
| | |
,----------------------------------------------------------!. | |
|Node Conditions are [A] |_\| |
|Node condition C from ExternalUpdater is no longer present || |
`------------------------------------------------------------'+-. ,-------+-------.
|VK| |K8s| |ExternalUpdater|
`--' `---' `---------------'
*/
// In order to calculate that last patch to delete B, and upsert C, we need to know that C was added by
// "someone else". So, we keep track of our last applied value, and our current value. We then generate
// our patch based on the diff of these and *not* server side state.
oldVKStatus, ok1 := apiServerNode.Annotations[virtualKubeletLastNodeAppliedNodeStatus]
oldVKObjectMeta, ok2 := apiServerNode.Annotations[virtualKubeletLastNodeAppliedObjectMeta]
oldNode := corev1.Node{}
// Check if there were no labels, which means someone else probably created the node, or this is an upgrade. Either way, we will consider
// ourselves as never having written the node object before, so oldNode will be left empty. We will overwrite values if
// our new node conditions / status / objectmeta have them
if ok1 && ok2 {
err := json.Unmarshal([]byte(oldVKObjectMeta), &oldNode.ObjectMeta)
if err != nil {
return nil, pkgerrors.Wrapf(err, "Cannot unmarshal old node object metadata (key: %q): %q", virtualKubeletLastNodeAppliedObjectMeta, oldVKObjectMeta)
}
err = json.Unmarshal([]byte(oldVKStatus), &oldNode.Status)
if err != nil {
return nil, pkgerrors.Wrapf(err, "Cannot unmarshal old node status (key: %q): %q", virtualKubeletLastNodeAppliedNodeStatus, oldVKStatus)
}
}
// newNode is the representation of the node the provider "wants"
newNode := corev1.Node{}
newNode.ObjectMeta = simplestObjectMetadata(&apiServerNode.ObjectMeta, &nodeFromProvider.ObjectMeta)
nodeFromProvider.Status.DeepCopyInto(&newNode.Status)
// virtualKubeletLastNodeAppliedObjectMeta must always be set before virtualKubeletLastNodeAppliedNodeStatus,
// otherwise we capture virtualKubeletLastNodeAppliedNodeStatus in virtualKubeletLastNodeAppliedObjectMeta,
// which is wrong
virtualKubeletLastNodeAppliedObjectMetaBytes, err := json.Marshal(newNode.ObjectMeta)
if err != nil {
return nil, pkgerrors.Wrap(err, "Cannot marshal object meta from provider")
}
newNode.Annotations[virtualKubeletLastNodeAppliedObjectMeta] = string(virtualKubeletLastNodeAppliedObjectMetaBytes)
virtualKubeletLastNodeAppliedNodeStatusBytes, err := json.Marshal(newNode.Status)
if err != nil {
return nil, pkgerrors.Wrap(err, "Cannot marshal node status from provider")
}
newNode.Annotations[virtualKubeletLastNodeAppliedNodeStatus] = string(virtualKubeletLastNodeAppliedNodeStatusBytes)
// Generate three way patch from oldNode -> newNode, without deleting the changes in api server
// Should we use the Kubernetes serialization / deserialization libraries here?
oldNodeBytes, err := json.Marshal(oldNode)
if err != nil {
return nil, pkgerrors.Wrap(err, "Cannot marshal old node bytes")
}
newNodeBytes, err := json.Marshal(newNode)
if err != nil {
return nil, pkgerrors.Wrap(err, "Cannot marshal new node bytes")
}
apiServerNodeBytes, err := json.Marshal(apiServerNode)
if err != nil {
return nil, pkgerrors.Wrap(err, "Cannot marshal api server node")
}
schema, err := strategicpatch.NewPatchMetaFromStruct(&corev1.Node{})
if err != nil {
return nil, pkgerrors.Wrap(err, "Cannot get patch schema from node")
}
return strategicpatch.CreateThreeWayMergePatch(oldNodeBytes, newNodeBytes, apiServerNodeBytes, schema, true)
}
// updateNodeStatus triggers an update to the node status in Kubernetes.
// It first fetches the current node details and then sets the status according
// to the passed in node object.
//
// If you use this function, it is up to you to synchronize this with other operations.
// This reduces the time to second-level precision.
func updateNodeStatus(ctx context.Context, nodes v1.NodeInterface, nodeFromProvider *corev1.Node) (_ *corev1.Node, retErr error) {
ctx, span := trace.StartSpan(ctx, "UpdateNodeStatus")
defer func() {
span.End()
span.SetStatus(retErr)
}()
var updatedNode *corev1.Node
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
apiServerNode, err := nodes.Get(ctx, nodeFromProvider.Name, emptyGetOptions)
if err != nil {
return err
}
ctx = addNodeAttributes(ctx, span, apiServerNode)
log.G(ctx).Debug("got node from api server")
patchBytes, err := prepareThreewayPatchBytesForNodeStatus(nodeFromProvider, apiServerNode)
if err != nil {
return pkgerrors.Wrap(err, "Cannot generate patch")
}
log.G(ctx).WithError(err).WithField("patch", string(patchBytes)).Debug("Generated three way patch")
updatedNode, err = nodes.Patch(ctx, nodeFromProvider.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
if err != nil {
// We cannot wrap this error because the kubernetes error module doesn't understand wrapping
log.G(ctx).WithField("patch", string(patchBytes)).WithError(err).Warn("Failed to patch node status")
return err
}
return nil
})
if err != nil {
return nil, err
}
log.G(ctx).WithField("node.resourceVersion", updatedNode.ResourceVersion).
WithField("node.Status.Conditions", updatedNode.Status.Conditions).
Debug("updated node status in api server")
return updatedNode, nil
}
func updateNodeStatusHeartbeat(n *corev1.Node) {
now := metav1.NewTime(time.Now())
for i := range n.Status.Conditions {
n.Status.Conditions[i].LastHeartbeatTime = now
}
}
// NaiveNodeProvider is a basic node provider that only uses the passed in context
// on `Ping` to determine if the node is healthy.
type NaiveNodeProvider struct{}
// Ping just implements the NodeProvider interface.
// It returns the error from the passed in context only.
func (NaiveNodeProvider) Ping(ctx context.Context) error {
return ctx.Err()
}
// NotifyNodeStatus implements the NodeProvider interface.
//
// This NaiveNodeProvider does not support updating node status and so this
// function is a no-op.
func (n NaiveNodeProvider) NotifyNodeStatus(_ context.Context, _ func(*corev1.Node)) {
}
// NaiveNodeProviderV2 is like NaiveNodeProvider except it supports accepting node status updates.
// It must be used with as a pointer and must be created with `NewNaiveNodeProvider`
type NaiveNodeProviderV2 struct {
notify func(*corev1.Node)
updateReady chan struct{}
}
// Ping just implements the NodeProvider interface.
// It returns the error from the passed in context only.
func (*NaiveNodeProviderV2) Ping(ctx context.Context) error {
return ctx.Err()
}
// NotifyNodeStatus implements the NodeProvider interface.
//
// NaiveNodeProvider does not support updating node status unless created with `NewNaiveNodeProvider`
// Otherwise this is a no-op
func (n *NaiveNodeProviderV2) NotifyNodeStatus(_ context.Context, f func(*corev1.Node)) {
n.notify = f
// This is a little sloppy and assumes `NotifyNodeStatus` is only called once, which is indeed currently true.
// The reason a channel is preferred here is so we can use a context in `UpdateStatus` to cancel waiting for this.
close(n.updateReady)
}
// UpdateStatus sends a node status update to the node controller
func (n *NaiveNodeProviderV2) UpdateStatus(ctx context.Context, node *corev1.Node) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-n.updateReady:
}
n.notify(node)
return nil
}
// NewNaiveNodeProvider creates a new NaiveNodeProviderV2
// You must use this to create a NaiveNodeProviderV2 if you want to be able to send node status updates to the node
// controller.
func NewNaiveNodeProvider() *NaiveNodeProviderV2 {
return &NaiveNodeProviderV2{
updateReady: make(chan struct{}),
}
}
type taintsStringer []corev1.Taint
func (t taintsStringer) String() string {
var s string
for _, taint := range t {
if s == "" {
s = taint.Key + "=" + taint.Value + ":" + string(taint.Effect)
} else {
s += ", " + taint.Key + "=" + taint.Value + ":" + string(taint.Effect)
}
}
return s
}
func addNodeAttributes(ctx context.Context, span trace.Span, n *corev1.Node) context.Context {
return span.WithFields(ctx, log.Fields{
"node.UID": string(n.UID),
"node.name": n.Name,
"node.taints": taintsStringer(n.Spec.Taints),
})
}
// Provides the simplest object metadata to match the previous object. Name, namespace, UID. It copies labels and
// annotations from the second object if defined. It exempts the patch metadata
func simplestObjectMetadata(baseObjectMeta, objectMetaWithLabelsAndAnnotations *metav1.ObjectMeta) metav1.ObjectMeta {
ret := metav1.ObjectMeta{
Namespace: baseObjectMeta.Namespace,
Name: baseObjectMeta.Name,
UID: baseObjectMeta.UID,
Annotations: make(map[string]string),
}
if objectMetaWithLabelsAndAnnotations != nil {
if objectMetaWithLabelsAndAnnotations.Labels != nil {
ret.Labels = objectMetaWithLabelsAndAnnotations.Labels
} else {
ret.Labels = make(map[string]string)
}
if objectMetaWithLabelsAndAnnotations.Annotations != nil {
// We want to copy over all annotations except the special embedded ones.
for key := range objectMetaWithLabelsAndAnnotations.Annotations {
if key == virtualKubeletLastNodeAppliedNodeStatus || key == virtualKubeletLastNodeAppliedObjectMeta {
continue
}
ret.Annotations[key] = objectMetaWithLabelsAndAnnotations.Annotations[key]
}
}
}
return ret
}