forked from virtual-kubelet/virtual-kubelet
/
node_ping_controller.go
114 lines (96 loc) · 3.41 KB
/
node_ping_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package node
import (
"context"
"time"
"github.com/virtual-kubelet/virtual-kubelet/internal/lock"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/trace"
"golang.org/x/sync/singleflight"
"k8s.io/apimachinery/pkg/util/wait"
)
// nodePingController is responsible for node pinging behaviour
type nodePingController struct {
nodeProvider NodeProvider
pingInterval time.Duration
pingTimeout *time.Duration
cond lock.MonitorVariable
}
// pingResult encapsulates the result of the last ping. It is the time the ping was started, and the error.
// If there is a timeout, the error will be context.DeadlineExceeded
type pingResult struct {
time time.Time
error error
}
// newNodePingController creates a new node ping controller. pingInterval must be non-zero. Optionally, a timeout may be specfied on
// how long to wait for the provider to respond
func newNodePingController(node NodeProvider, pingInterval time.Duration, timeout *time.Duration) *nodePingController {
if pingInterval == 0 {
panic("Node ping interval is 0")
}
if timeout != nil && *timeout == 0 {
panic("Node ping timeout is 0")
}
return &nodePingController{
nodeProvider: node,
pingInterval: pingInterval,
pingTimeout: timeout,
cond: lock.NewMonitorVariable(),
}
}
// Run runs the controller until context is cancelled
func (npc *nodePingController) Run(ctx context.Context) {
const key = "key"
sf := &singleflight.Group{}
// 1. If the node is "stuck" and not responding to pings, we want to set the status
// to that the node provider has timed out responding to pings
// 2. We want it so that the context is cancelled, and whatever the node might have
// been stuck on uses context so it might be unstuck
// 3. We want to retry pinging the node, but we do not ever want more than one
// ping in flight at a time.
mkContextFunc := context.WithCancel
if npc.pingTimeout != nil {
mkContextFunc = func(ctx2 context.Context) (context.Context, context.CancelFunc) {
return context.WithTimeout(ctx2, *npc.pingTimeout)
}
}
checkFunc := func(ctx context.Context) {
ctx, cancel := mkContextFunc(ctx)
defer cancel()
ctx, span := trace.StartSpan(ctx, "node.pingLoop")
defer span.End()
doChan := sf.DoChan(key, func() (interface{}, error) {
now := time.Now()
ctx, span := trace.StartSpan(ctx, "node.pingNode")
defer span.End()
err := npc.nodeProvider.Ping(ctx)
span.SetStatus(err)
return now, err
})
var pingResult pingResult
select {
case <-ctx.Done():
pingResult.error = ctx.Err()
log.G(ctx).WithError(pingResult.error).Warn("Failed to ping node due to context cancellation")
case result := <-doChan:
pingResult.error = result.Err
pingResult.time = result.Val.(time.Time)
}
npc.cond.Set(&pingResult)
span.SetStatus(pingResult.error)
}
// Run the first check manually
checkFunc(ctx)
wait.UntilWithContext(ctx, checkFunc, npc.pingInterval)
}
// GetResult returns the current ping result in a non-blocking fashion except for the first ping. It waits for the
// first ping to be successful before returning. If the context is cancelled while waiting for that value, it will
// return immediately.
func (npc *nodePingController) getResult(ctx context.Context) (*pingResult, error) {
sub := npc.cond.Subscribe()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-sub.NewValueReady():
}
return sub.Value().Value.(*pingResult), nil
}