-
Notifications
You must be signed in to change notification settings - Fork 88
/
node_join.go
335 lines (292 loc) · 9.38 KB
/
node_join.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
package embeddedcluster
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/replicatedhq/kots/pkg/embeddedcluster/types"
"github.com/replicatedhq/kots/pkg/util"
corev1 "k8s.io/api/core/v1"
kuberneteserrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
type joinTokenEntry struct {
Token string
Creation *time.Time
Mut sync.Mutex
}
var joinTokenMapMut = sync.Mutex{}
var joinTokenMap = map[string]*joinTokenEntry{}
// GenerateAddNodeToken will generate the embedded cluster node add command for a node with the specified roles
// join commands will last for 24 hours, and will be cached for 1 hour after first generation
func GenerateAddNodeToken(ctx context.Context, client kubernetes.Interface, nodeRole string) (string, error) {
// get the joinToken struct entry for this node role
joinTokenMapMut.Lock()
if _, ok := joinTokenMap[nodeRole]; !ok {
joinTokenMap[nodeRole] = &joinTokenEntry{}
}
joinToken := joinTokenMap[nodeRole]
joinTokenMapMut.Unlock()
// lock the joinToken struct entry
joinToken.Mut.Lock()
defer joinToken.Mut.Unlock()
// if the joinToken has been generated in the past hour, return it
if joinToken.Creation != nil && time.Now().Before(joinToken.Creation.Add(time.Hour)) {
return joinToken.Token, nil
}
newToken, err := runAddNodeCommandPod(ctx, client, nodeRole)
if err != nil {
return "", fmt.Errorf("failed to run add node command pod: %w", err)
}
now := time.Now()
joinToken.Token = newToken
joinToken.Creation = &now
return newToken, nil
}
// run a pod that will generate the add node token
func runAddNodeCommandPod(ctx context.Context, client kubernetes.Interface, nodeRole string) (string, error) {
podName := "k0s-token-generator-"
suffix := strings.Replace(nodeRole, "+", "-", -1)
podName += suffix
// cleanup the pod if it already exists
err := client.CoreV1().Pods("kube-system").Delete(ctx, podName, metav1.DeleteOptions{})
if err != nil {
if !kuberneteserrors.IsNotFound(err) {
return "", fmt.Errorf("failed to delete pod: %w", err)
}
}
hostPathFile := corev1.HostPathFile
hostPathDir := corev1.HostPathDirectory
_, err = client.CoreV1().Pods("kube-system").Create(ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: "kube-system",
Labels: map[string]string{
"replicated.app/embedded-cluster": "true",
},
},
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyOnFailure,
HostNetwork: true,
Volumes: []corev1.Volume{
{
Name: "bin",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/usr/local/bin/k0s",
Type: &hostPathFile,
},
},
},
{
Name: "lib",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/var/lib/k0s",
Type: &hostPathDir,
},
},
},
{
Name: "etc",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/etc/k0s",
Type: &hostPathDir,
},
},
},
{
Name: "run",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/run/k0s",
Type: &hostPathDir,
},
},
},
},
Affinity: &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: "node.k0sproject.io/role",
Operator: corev1.NodeSelectorOpIn,
Values: []string{
"control-plane",
},
},
},
},
},
},
},
},
Containers: []corev1.Container{
{
Name: "k0s-token-generator",
Image: "ubuntu:jammy", // this will not work on airgap, but it needs to be debian based at the moment
Command: []string{"/mnt/k0s"},
Args: []string{
"token",
"create",
"--expiry",
"12h",
"--role",
nodeRole,
},
VolumeMounts: []corev1.VolumeMount{
{
Name: "bin",
MountPath: "/mnt/k0s",
},
{
Name: "lib",
MountPath: "/var/lib/k0s",
},
{
Name: "etc",
MountPath: "/etc/k0s",
},
{
Name: "run",
MountPath: "/run/k0s",
},
},
},
},
},
}, metav1.CreateOptions{})
if err != nil {
return "", fmt.Errorf("failed to create pod: %w", err)
}
// wait for the pod to complete
for {
pod, err := client.CoreV1().Pods("kube-system").Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return "", fmt.Errorf("failed to get pod: %w", err)
}
if pod.Status.Phase == corev1.PodSucceeded {
break
}
if pod.Status.Phase == corev1.PodFailed {
return "", fmt.Errorf("pod failed")
}
time.Sleep(time.Second)
}
// get the logs from the completed pod
podLogs, err := client.CoreV1().Pods("kube-system").GetLogs(podName, &corev1.PodLogOptions{}).DoRaw(ctx)
if err != nil {
return "", fmt.Errorf("failed to get pod logs: %w", err)
}
// delete the completed pod
err = client.CoreV1().Pods("kube-system").Delete(ctx, podName, metav1.DeleteOptions{})
if err != nil {
return "", fmt.Errorf("failed to delete pod: %w", err)
}
// the logs are just a join token, which needs to be added to other things to get a join command
return string(podLogs), nil
}
// GenerateAddNodeCommand returns the command a user should run to add a node with the provided token
// the command will be of the form 'embeddedcluster node join ip:port UUID'
func GenerateAddNodeCommand(ctx context.Context, client kubernetes.Interface, token string) (string, error) {
cm, err := ReadConfigMap(client)
if err != nil {
return "", fmt.Errorf("failed to read configmap: %w", err)
}
binaryName := cm.Data["embedded-binary-name"]
// get the IP of a controller node
nodeIP, err := getControllerNodeIP(ctx, client)
if err != nil {
return "", fmt.Errorf("failed to get controller node IP: %w", err)
}
// get the port of the 'admin-console' service
port, err := getAdminConsolePort(ctx, client)
if err != nil {
return "", fmt.Errorf("failed to get admin console port: %w", err)
}
return fmt.Sprintf("sudo ./%s node join %s:%d %s", binaryName, nodeIP, port, token), nil
}
// GenerateK0sJoinCommand returns the k0s node join command, without the token but with all other required flags
// (including node labels generated from the roles etc)
func GenerateK0sJoinCommand(ctx context.Context, client kubernetes.Interface, roles []string) (string, error) {
controllerRoleName, err := ControllerRoleName(ctx)
if err != nil {
return "", fmt.Errorf("failed to get controller role name: %w", err)
}
k0sRole := "worker"
for _, role := range roles {
if role == controllerRoleName {
k0sRole = "controller"
}
}
cmd := []string{"/usr/local/bin/k0s", "install", k0sRole}
if k0sRole == "controller" {
cmd = append(cmd, "--enable-worker")
}
labels, err := getRolesNodeLabels(ctx, roles)
if err != nil {
return "", fmt.Errorf("failed to get role labels: %w", err)
}
cmd = append(cmd, "--labels", labels)
return strings.Join(cmd, " "), nil
}
// gets the port of the 'admin-console' service
func getAdminConsolePort(ctx context.Context, client kubernetes.Interface) (int32, error) {
svc, err := client.CoreV1().Services(util.PodNamespace).Get(ctx, "admin-console", metav1.GetOptions{})
if err != nil {
return -1, fmt.Errorf("failed to get admin-console service: %w", err)
}
for _, port := range svc.Spec.Ports {
if port.Name == "http" {
return port.NodePort, nil
}
}
return -1, fmt.Errorf("did not find port 'http' in service 'admin-console'")
}
// getControllerNodeIP gets the IP of a healthy controller node
func getControllerNodeIP(ctx context.Context, client kubernetes.Interface) (string, error) {
nodes, err := client.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return "", fmt.Errorf("failed to list nodes: %w", err)
}
for _, node := range nodes.Items {
if cp, ok := node.Labels["node-role.kubernetes.io/control-plane"]; !ok || cp != "true" {
continue
}
for _, condition := range node.Status.Conditions {
if condition.Type == "Ready" && condition.Status == "True" {
for _, address := range node.Status.Addresses {
if address.Type == "InternalIP" {
return address.Address, nil
}
}
}
}
}
return "", fmt.Errorf("failed to find healthy controller node")
}
func getRolesNodeLabels(ctx context.Context, roles []string) (string, error) {
roleListLabels := getRoleListLabels(roles)
labels, err := getRoleNodeLabels(ctx, roles)
if err != nil {
return "", fmt.Errorf("failed to get node labels for roles %v: %w", roles, err)
}
roleLabels := append(roleListLabels, labels...)
return strings.Join(roleLabels, ","), nil
}
// getRoleListLabels returns the labels needed to identify the roles of this node in the future
// one label will be the number of roles, and then deterministic label names will be used to store the role names
func getRoleListLabels(roles []string) []string {
toReturn := []string{}
toReturn = append(toReturn, fmt.Sprintf("%s=total-%d", types.EMBEDDED_CLUSTER_ROLE_LABEL, len(roles)))
for idx, role := range roles {
toReturn = append(toReturn, fmt.Sprintf("%s-%d=%s", types.EMBEDDED_CLUSTER_ROLE_LABEL, idx, labelify(role)))
}
return toReturn
}