-
Notifications
You must be signed in to change notification settings - Fork 24
/
gkeclusterprovider.go
537 lines (446 loc) · 15.9 KB
/
gkeclusterprovider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
package provider
import (
"context"
"fmt"
"strings"
"time"
"github.com/kubecost/cluster-turndown/pkg/async"
"github.com/kubecost/cluster-turndown/pkg/cluster/helper"
"github.com/kubecost/cluster-turndown/pkg/file"
"github.com/kubecost/cluster-turndown/pkg/logging"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
container "google.golang.org/genproto/googleapis/container/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
gke "cloud.google.com/go/container/apiv1"
)
const (
LabelGKENodePool = "cloud.google.com/gke-nodepool"
GKECredsEnvVar = "GOOGLE_APPLICATION_CREDENTIALS"
GKEAuthServiceAccount = "/var/keys/service-key.json"
GKEDefaultMachineType = "n1-standard-1"
GKEDefaultDiskType = "pd-standard"
GKEDefaultDiskSizeGB = 100
)
//--------------------------------------------------------------------------
// Default GKE Values
//--------------------------------------------------------------------------
// GetGKEDefaultOAuthScopes returns the default oauth scopes used when creating
// a new node pool
func GetGKEDefaultOAuthScopes() []string {
return []string{
"https://www.googleapis.com/auth/cloud-platform",
"https://www.googleapis.com/auth/devstorage.read_only",
"https://www.googleapis.com/auth/logging.write",
"https://www.googleapis.com/auth/monitoring",
"https://www.googleapis.com/auth/servicecontrol",
"https://www.googleapis.com/auth/service.management.readonly",
"https://www.googleapis.com/auth/trace.append",
}
}
// GetGKEDefaultMetadata returns the default metadata used when creating
// a new node pool
func GetGKEDefaultMetadata() map[string]string {
return map[string]string{
"disable-legacy-endpoints": "true",
}
}
// GetGKEDefaultNodeManagement returns the default nod management used when
// creating a new node pool
func GetGKEDefaultNodeManagement() *container.NodeManagement {
return &container.NodeManagement{
AutoUpgrade: true,
AutoRepair: true,
}
}
//--------------------------------------------------------------------------
// GKE NodePool Implementation
//--------------------------------------------------------------------------
// NodePool contains a node pool identifier and the initial number of nodes
// in the pool
type GKENodePool struct {
name string
project string
zone string
clusterID string
min int32
max int32
count int32
autoscaling bool
machineType string
tags map[string]string
}
func (np *GKENodePool) Name() string { return np.name }
func (np *GKENodePool) Project() string { return np.project }
func (np *GKENodePool) Zone() string { return np.zone }
func (np *GKENodePool) ClusterID() string { return np.clusterID }
func (np *GKENodePool) MinNodes() int32 { return np.min }
func (np *GKENodePool) MaxNodes() int32 { return np.max }
func (np *GKENodePool) NodeCount() int32 { return np.count }
func (np *GKENodePool) AutoScaling() bool { return np.autoscaling }
func (np *GKENodePool) MachineType() string { return np.machineType }
func (np *GKENodePool) Tags() map[string]string { return np.tags }
func (np *GKENodePool) IsMaster() bool { return false }
//--------------------------------------------------------------------------
// GKE ClusterProvider Implementation
//--------------------------------------------------------------------------
// ClusterProvider implementation for GKE
type GKEClusterProvider struct {
kubernetes kubernetes.Interface
clusterManager *gke.ClusterManagerClient
metadata *GKEMetaData
log logging.NamedLogger
}
// NewGKEClusterProvider creates a new GKEClusterProvider instance as the ClusterProvider
func NewGKEClusterProvider(kubernetes kubernetes.Interface) (ClusterProvider, error) {
clusterManager, err := newGKEClusterManager()
if err != nil {
return nil, err
}
return &GKEClusterProvider{
kubernetes: kubernetes,
clusterManager: clusterManager,
metadata: NewGKEMetaData(),
log: logging.NamedLogger("GKEClusterProvider"),
}, nil
}
// IsNodePool determines if there is a node pool with the name or not.
func (p *GKEClusterProvider) IsNodePool(name string) bool {
req := &container.GetNodePoolRequest{Name: p.toNodePoolResourceByName(name)}
resp, err := p.clusterManager.GetNodePool(context.TODO(), req)
if err != nil {
return false
}
return resp != nil
}
// GetNodePoolName returns the name of a NodePool for a specific kubernetes node.
func (p *GKEClusterProvider) GetNodePoolName(node *v1.Node) string {
_, _, pool := p.projectInfoFor(node)
return pool
}
// GetNodesFor returns a slice of kubernetes Node instances for the NodePool instance
// provided.
func (p *GKEClusterProvider) GetNodesFor(np NodePool) ([]*v1.Node, error) {
name := np.Name()
allNodes, err := p.kubernetes.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, err
}
nodes := []*v1.Node{}
for _, n := range allNodes.Items {
node := helper.NodePtr(n)
_, _, nodePool := p.projectInfoFor(node)
if strings.EqualFold(nodePool, name) {
nodes = append(nodes, node)
}
}
return nodes, nil
}
// GetNodePools loads all of the provider NodePools in a cluster and returns them.
func (p *GKEClusterProvider) GetNodePools() ([]NodePool, error) {
ctx := context.TODO()
projectID := p.metadata.GetProjectID()
zone := p.metadata.GetMasterZone()
cluster := p.metadata.GetClusterID()
req := &container.ListNodePoolsRequest{Parent: p.getClusterResourcePath()}
p.log.Log("Loading node pools for: [ProjectID: %s, Zone: %s, ClusterID: %s]", projectID, zone, cluster)
resp, err := p.clusterManager.ListNodePools(ctx, req)
if err != nil {
return nil, err
}
pools := []NodePool{}
for _, np := range resp.GetNodePools() {
nodeCount := np.GetInitialNodeCount()
autoscaling := np.Autoscaling.GetEnabled()
var min int32 = nodeCount
var max int32 = nodeCount
if autoscaling {
min = np.Autoscaling.GetMinNodeCount()
max = np.Autoscaling.GetMaxNodeCount()
}
tags := np.GetConfig().GetLabels()
if tags == nil {
tags = make(map[string]string)
}
pools = append(pools, &GKENodePool{
name: np.GetName(),
project: projectID,
clusterID: cluster,
zone: zone,
min: min,
max: max,
count: nodeCount,
autoscaling: autoscaling,
machineType: np.GetConfig().GetMachineType(),
tags: tags,
})
}
return pools, nil
}
// CreateNodePool creates a new node pool with the provided specs.
func (p *GKEClusterProvider) CreateNodePool(c context.Context, name, machineType string, nodeCount int32, diskType string, diskSizeGB int32, labels map[string]string) error {
// Fix any optional empty parameters
if diskType == "" {
diskType = GKEDefaultDiskType
}
if diskSizeGB <= 0 {
diskSizeGB = GKEDefaultDiskSizeGB
}
if labels == nil {
labels = make(map[string]string)
}
// Create the request, fill in necessary defaults
request := &container.CreateNodePoolRequest{
Parent: p.getClusterResourcePath(),
NodePool: &container.NodePool{
Name: name,
Config: &container.NodeConfig{
MachineType: machineType,
DiskType: diskType,
DiskSizeGb: diskSizeGB,
Labels: labels,
OauthScopes: GetGKEDefaultOAuthScopes(),
Metadata: GetGKEDefaultMetadata(),
},
InitialNodeCount: nodeCount,
Management: GetGKEDefaultNodeManagement(),
},
}
ctx, cancel := context.WithCancel(c)
defer cancel()
for {
_, err := p.clusterManager.CreateNodePool(ctx, request)
if err == nil {
p.log.Log("Created NodePool Successfully: %s. Waiting for nodes to become available...", name)
break
}
// If the error represents a temporary state where we can retry, log and continue
if isRetriableError(err) {
p.log.Log("NodePool operation already in queue, retrying...")
} else {
return err
}
select {
case <-time.After(30 * time.Second):
case <-ctx.Done():
return fmt.Errorf("NodePool Creation Cancelled")
}
}
return helper.WaitUntilNodesCreated(p.kubernetes, LabelGKENodePool, name, int(nodeCount), 5*time.Second, 20*time.Minute)
}
// CreateAutoScalingNodePool creates a new autoscaling node pool. The semantics behind autoscaling depend on the provider.
func (p *GKEClusterProvider) CreateAutoScalingNodePool(c context.Context, name, machineType string, minNodes, nodeCount, maxNodes int32, diskType string, diskSizeGB int32, labels map[string]string) error {
// Fix any optional empty parameters
if diskType == "" {
diskType = GKEDefaultDiskType
}
if diskSizeGB <= 0 {
diskSizeGB = GKEDefaultDiskSizeGB
}
if labels == nil {
labels = make(map[string]string)
}
// Create the request, fill in necessary defaults
request := &container.CreateNodePoolRequest{
Parent: p.getClusterResourcePath(),
NodePool: &container.NodePool{
Name: name,
Config: &container.NodeConfig{
MachineType: machineType,
DiskType: diskType,
DiskSizeGb: diskSizeGB,
Labels: labels,
OauthScopes: GetGKEDefaultOAuthScopes(),
Metadata: GetGKEDefaultMetadata(),
},
InitialNodeCount: nodeCount,
Management: GetGKEDefaultNodeManagement(),
Autoscaling: &container.NodePoolAutoscaling{
Enabled: true,
MinNodeCount: minNodes,
MaxNodeCount: maxNodes,
},
},
}
ctx, cancel := context.WithCancel(c)
defer cancel()
for {
_, err := p.clusterManager.CreateNodePool(ctx, request)
if err == nil {
p.log.Log("Created NodePool Successfully: %s. Waiting for nodes to become available...", name)
break
}
// If the error represents a temporary state where we can retry, log and continue
if isRetriableError(err) {
p.log.Log("NodePool operation already in queue, retrying...")
} else {
return err
}
select {
case <-time.After(30 * time.Second):
case <-ctx.Done():
return fmt.Errorf("NodePool Creation Cancelled")
}
}
// wait for at least a single node
return helper.WaitUntilNodesCreated(p.kubernetes, LabelGKENodePool, name, 1, 5*time.Second, 20*time.Minute)
}
// UpdateNodePoolSize updates the number of nodes in a NodePool
func (p *GKEClusterProvider) UpdateNodePoolSize(c context.Context, nodePool NodePool, size int32) error {
if nodePool == nil {
return fmt.Errorf("Provided nodePool was nil.")
}
request := &container.SetNodePoolSizeRequest{
Name: p.toNodePoolResource(nodePool),
NodeCount: size,
}
ctx, cancel := context.WithCancel(c)
defer cancel()
for {
_, err := p.clusterManager.SetNodePoolSize(ctx, request)
if err == nil {
p.log.Log("Resized NodePool Successfully: %s", request.NodePoolId)
return nil
}
// If the error represents a temporary state where we can retry, log and continue
if isRetriableError(err) {
p.log.Log("NodePool operation already in queue, retrying...")
} else {
return err
}
select {
case <-time.After(30 * time.Second):
case <-ctx.Done():
return fmt.Errorf("NodePool Resize Cancelled")
}
}
}
// UpdateNodePoolSizes updates the number of nodes in multiple NodePool instances.
func (p *GKEClusterProvider) UpdateNodePoolSizes(c context.Context, nodePools []NodePool, size int32) error {
if len(nodePools) == 0 {
return nil
}
ctx, cancel := context.WithCancel(c)
waitChannel := async.NewWaitChannel()
waitChannel.Add(len(nodePools))
for _, np := range nodePools {
go func(nodePool NodePool) {
defer waitChannel.Done()
p.UpdateNodePoolSize(ctx, nodePool, size)
}(np)
}
defer cancel()
select {
case <-waitChannel.Wait():
return nil
case <-time.After(30 * time.Minute):
return fmt.Errorf("Resize Requests timed out after 30 minutes.")
}
}
// DeleteNodePool deletes the NodePool.
func (p *GKEClusterProvider) DeleteNodePool(c context.Context, nodePool NodePool) error {
if nodePool == nil {
return fmt.Errorf("Provided nodePool was nil.")
}
request := &container.DeleteNodePoolRequest{
Name: p.toNodePoolResource(nodePool),
}
ctx, cancel := context.WithCancel(c)
defer cancel()
for {
_, err := p.clusterManager.DeleteNodePool(ctx, request)
if err == nil {
p.log.Log("Deleted NodePool Successfully: %s", nodePool.Name())
return nil
}
// If the error represents a temporary state where we can retry, log and continue
if isRetriableError(err) {
p.log.Log("NodePool operation already in queue, retrying...")
} else {
return err
}
select {
case <-time.After(30 * time.Second):
case <-ctx.Done():
return fmt.Errorf("NodePool Deletion Cancelled")
}
}
}
// CreateOrUpdateTags creates or updates the tags for NodePool instances.
func (p *GKEClusterProvider) CreateOrUpdateTags(c context.Context, nodePool NodePool, updateNodes bool, tags map[string]string) error {
// NOTE: In GKE, it's not possible to update the node pool labels. This is because it propagates the labels
// NOTE: via kubelet, which are set at creation. We could update all of the nodes, but that doesn't seem
// NOTE: quite right, as any new nodes will not include the labels. Depending on how important this
// NOTE: specific functionality is, we may have to recreate a NodePool, which currently seems very wasteful.
return fmt.Errorf("GKE does not support modifying labels after a node pool has been created.")
}
// DeleteTags deletes the tags by key on a NodePool instance.
func (p *GKEClusterProvider) DeleteTags(c context.Context, nodePool NodePool, keys []string) error {
// NOTE: In GKE, it's not possible to update the node pool labels. This is because it propagates the labels
// NOTE: via kubelet, which are set at creation. We could update all of the nodes, but that doesn't seem
// NOTE: quite right, as any new nodes will not include the labels. Depending on how important this
// NOTE: specific functionality is, we may have to recreate a NodePool, which currently seems very wasteful.
return fmt.Errorf("GKE does not support modifying labels after a node pool has been created.")
}
func (p *GKEClusterProvider) projectInfoFor(node *v1.Node) (project string, zone string, nodePool string) {
nodeProviderID := node.Spec.ProviderID[6:]
props := strings.Split(nodeProviderID, "/")
nodePool = node.Labels[LabelGKENodePool]
if len(props) < 2 {
project = ""
zone = ""
return
}
project = props[0]
zone = props[1]
return
}
// gets the fully qualified resource path for the node pool
func (p *GKEClusterProvider) toNodePoolResourceByName(name string) string {
return fmt.Sprintf("projects/%s/locations/%s/clusters/%s/nodePools/%s",
p.metadata.GetProjectID(),
p.metadata.GetMasterZone(),
p.metadata.GetClusterID(),
name)
}
// gets the fully qualified resource path for the node pool
func (p *GKEClusterProvider) toNodePoolResource(nodePool NodePool) string {
return fmt.Sprintf("projects/%s/locations/%s/clusters/%s/nodePools/%s",
nodePool.Project(), nodePool.Zone(), nodePool.ClusterID(), nodePool.Name())
}
// gets the fully qualified resource path for the cluster
func (p *GKEClusterProvider) getClusterResourcePath() string {
return fmt.Sprintf("projects/%s/locations/%s/clusters/%s",
p.metadata.GetProjectID(), p.metadata.GetMasterZone(), p.metadata.GetClusterID())
}
// Creates a new GKE based cluster manager API to execute GRPC commands
func newGKEClusterManager() (*gke.ClusterManagerClient, error) {
if !file.FileExists(GKEAuthServiceAccount) {
return nil, fmt.Errorf("Failed to located service account file: %s", GKEAuthServiceAccount)
}
ctx := context.Background()
clusterManager, err := gke.NewClusterManagerClient(ctx)
if err != nil {
return nil, err
}
return clusterManager, nil
}
// isRetriableError returns true if the error is a Status error and it's error
// code matches a retriable error code
func isRetriableError(err error) bool {
s, ok := status.FromError(err)
return ok && isStatusCode(s, codes.FailedPrecondition, codes.Unavailable, codes.DeadlineExceeded)
}
// isStatusCode tests a Status against multiple codes and returns true
// if any match
func isStatusCode(s *status.Status, codes ...codes.Code) bool {
c := s.Code()
for _, code := range codes {
if c == code {
return true
}
}
return false
}