/
nodes.go
145 lines (115 loc) · 3.66 KB
/
nodes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package node
import (
"context"
"path"
"sort"
"time"
"github.com/pkg/errors"
"github.com/rebuy-de/rebuy-go-sdk/v3/pkg/syncutil"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
informers_v1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
)
const (
TaintEffectNoSchedule = v1.TaintEffectNoSchedule
TaintEffectNoExecute = v1.TaintEffectNoExecute
)
type Taint = v1.Taint
type Node struct {
InstanceID string `logfield:"instance-id,omitempty"`
NodeName string `logfield:"node-name,omitempty"`
Unschedulable bool `logfield:"node-unschedulable"`
Taints []Taint `logfield:"node-taints"`
}
type Client interface {
// Run executes the EC2 API poller. It will update the instance cache
// periodically.
Run(context.Context) error
// List returns all EC2 Instances that are currently in the cache. Those
// instance cache will be updated in the background.
List() []Node
// SignalEmitter gets triggered every time the cache changes. See syncutil
// package for more information.
SignalEmitter() *syncutil.SignalEmitter
// Healthy indicates whether the background job is running correctly.
Healthy() bool
Taint(context.Context, Node, string, v1.TaintEffect) error
}
type client struct {
kube kubernetes.Interface
emitter *syncutil.SignalEmitter
factory informers.SharedInformerFactory
nodes informers_v1.NodeInformer
}
func New(kube kubernetes.Interface) Client {
factory := informers.NewSharedInformerFactory(kube, 5*time.Second)
nodes := factory.Core().V1().Nodes()
return &client{
kube: kube,
factory: factory,
nodes: nodes,
}
}
func (c *client) Healthy() bool {
return c.nodes.Informer().HasSynced()
}
func (c *client) SignalEmitter() *syncutil.SignalEmitter {
return c.emitter
}
func (c *client) List() []Node {
result := []Node{}
list, err := c.nodes.Lister().List(labels.Everything())
if err != nil {
logrus.WithError(err).Errorf("lalala")
return nil
}
for _, node := range list {
result = append(result, Node{
InstanceID: path.Base(node.Spec.ProviderID),
NodeName: node.ObjectMeta.Name,
Unschedulable: node.Spec.Unschedulable,
Taints: node.Spec.Taints,
})
}
sort.Slice(result, func(i, j int) bool {
// Sorting by something other than LaunchTime is required, because the
// time has only second precision and it is quite likely that some
// instances are started at the same time. And since the list is based
// on a map, the order would be flaky.
return result[i].InstanceID < result[j].InstanceID
})
//sort.SliceStable(result, func(i, j int) bool {
// return result[i].LaunchTime.Before(result[j].LaunchTime)
//})
return result
}
func (c *client) Run(ctx context.Context) error {
// Kubernetes serves an utility to handle API crashes
defer runtime.HandleCrash()
c.nodes.Informer().Run(ctx.Done())
return nil
}
func (c *client) Taint(ctx context.Context, node Node, key string, effect v1.TaintEffect) error {
if node.NodeName == "" {
return errors.Errorf("got node with empty name")
}
taint := v1.Taint{
Key: key,
Value: "Exists",
Effect: effect,
}
// We are not getting the node from cache, because it should be as fresh as
// possible. Also we need to avoid that the append affects the cache.
upstream, err := c.kube.CoreV1().Nodes().Get(ctx, node.NodeName, meta.GetOptions{})
if err != nil {
return errors.WithStack(err)
}
upstream.Spec.Taints = append(upstream.Spec.Taints, taint)
_, err = c.kube.CoreV1().Nodes().Update(ctx, upstream, meta.UpdateOptions{})
return errors.WithStack(err)
}