forked from jetstack/tarmak
/
configuration.go
130 lines (103 loc) · 2.91 KB
/
configuration.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// Copyright Jetstack Ltd. See LICENSE for details.
package cluster
import (
"bytes"
"crypto/md5"
"encoding/hex"
"fmt"
"time"
wingv1alpha1 "github.com/jetstack/tarmak/pkg/apis/wing/v1alpha1"
)
const (
retries = 100
)
// This upload the puppet.tar.gz to the cluster, warning there is some duplication as terraform is also uploading this puppet.tar.gz
func (c *Cluster) UploadConfiguration() error {
buffer := new(bytes.Buffer)
// get puppet config
err := c.Environment().Tarmak().Puppet().TarGz(buffer)
if err != nil {
return err
}
// build reader from config
reader := bytes.NewReader(buffer.Bytes())
hasher := md5.New()
hasher.Write(buffer.Bytes())
return c.Environment().Provider().UploadConfiguration(
c,
reader,
hex.EncodeToString(hasher.Sum(nil)),
)
}
// This enforces a reapply of the puppet.tar.gz on every instance in the cluster
func (c *Cluster) ReapplyConfiguration() error {
c.log.Infof("making sure all instances apply the latest manifest")
// connect to wing
client, err := c.wingInstanceClient()
if err != nil {
return fmt.Errorf("failed to connect to wing API on bastion: %s", err)
}
// list instances
instances, err := c.listInstances()
if err != nil {
return fmt.Errorf("failed to list instances: %s", err)
}
for pos, _ := range instances {
instance := instances[pos]
if instance.Spec == nil {
instance.Spec = &wingv1alpha1.InstanceSpec{}
}
instance.Spec.Converge = &wingv1alpha1.InstanceSpecManifest{}
if _, err := client.Update(instance); err != nil {
c.log.Warnf("error updating instance %s in wing API: %s", instance.Name, err)
}
}
// TODO: solve this on the API server side
time.Sleep(time.Second * 5)
return nil
}
// This waits until all instances have congverged successfully
func (c *Cluster) WaitForConvergance() error {
c.log.Debugf("making sure all instances have converged using puppet")
retries := retries
for {
instances, err := c.listInstances()
if err != nil {
return fmt.Errorf("failed to list instances: %s", err)
}
instanceByState := make(map[wingv1alpha1.InstanceManifestState][]*wingv1alpha1.Instance)
for pos, _ := range instances {
instance := instances[pos]
// index by instance convergance state
if instance.Status == nil || instance.Status.Converge == nil || instance.Status.Converge.State == "" {
continue
}
state := instance.Status.Converge.State
if _, ok := instanceByState[state]; !ok {
instanceByState[state] = []*wingv1alpha1.Instance{}
}
instanceByState[state] = append(
instanceByState[state],
instance,
)
}
err = c.checkAllInstancesConverged(instanceByState)
if err == nil {
c.log.Info("all instances converged")
return nil
} else {
c.log.Debug(err)
}
select {
case <-c.ctx.Done():
return c.ctx.Err()
default:
}
retries--
if retries == 0 {
break
}
time.Sleep(time.Second * 5)
}
return fmt.Errorf("instances failed to converge in time")
}