/
ec2.go
255 lines (212 loc) · 7.01 KB
/
ec2.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
// Package ec2 provides an interface to EC2 instances, that are polled from the
// API. It manages a local cache, which is updated periodically.
package ec2
import (
"context"
"sort"
"strings"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/pkg/errors"
"github.com/rebuy-de/rebuy-go-sdk/v3/pkg/logutil"
"github.com/rebuy-de/rebuy-go-sdk/v3/pkg/syncutil"
)
const (
InstanceStatePending = "pending"
InstanceStateRunning = "running"
InstanceStateTerminated = "terminated"
InstanceStateShuttingDown = "shutting-down"
)
// Instance is the instance-related data that is retrieved via API.
type Instance struct {
InstanceID string `logfield:"instance-id,omitempty"`
InstanceName string `logfield:"instance-name"`
NodeName string `logfield:"node-name,omitempty"`
InstanceType string `logfield:"instance-type"`
AvailabilityZone string `logfield:"availability-zone"`
InstanceLifecycle string `logfield:"ec2-instance-lifecycle"`
State string `logfield:"ec2-instance-state"`
LaunchTime time.Time `logfield:"ec2-launch-time"`
TerminationTime *time.Time `logfield:"ec2-termination-time,omitempty"`
}
func (i Instance) IsRunning() bool {
return i.State == InstanceStateRunning
}
// Changed returns true, if relevant fields of the instance changed.
func (i Instance) Changed(old Instance) bool {
return i.State != old.State
}
// Client is an interface to EC2 data.
type Client interface {
// Run executes the EC2 API poller. It will update the instance cache
// periodically.
Run(context.Context) error
// List returns all EC2 Instances that are currently in the cache. Those
// instance cache will be updated in the background.
List() []Instance
// SignalEmitter gets triggered every time the cache changes. See syncutil
// package for more information.
SignalEmitter() *syncutil.SignalEmitter
// Healthy indicates whether the background job is running correctly.
Healthy() bool
}
type store struct {
api *ec2.Client
refresh time.Duration
cache map[string]Instance
emitter *syncutil.SignalEmitter
failureCount int
}
// New creates a new client for the EC2 API. It needs to be started with Run so
// it actually reads messages. See Client interface for more information.
func New(conf *aws.Config, refresh time.Duration) Client {
return &store{
api: ec2.NewFromConfig(*conf),
refresh: refresh,
emitter: new(syncutil.SignalEmitter),
}
}
func (s *store) Healthy() bool {
return s.failureCount == 0
}
func (s *store) SignalEmitter() *syncutil.SignalEmitter {
return s.emitter
}
func (s *store) List() []Instance {
result := []Instance{}
for _, instance := range s.cache {
result = append(result, instance)
}
sort.Slice(result, func(i, j int) bool {
// Sorting by something other than LaunchTime is required, because the
// time has only second precision and it is quite likely that some
// instances are started at the same time. And since the list is based
// on a map, the order would be flaky.
return result[i].InstanceID < result[j].InstanceID
})
sort.SliceStable(result, func(i, j int) bool {
return result[i].LaunchTime.Before(result[j].LaunchTime)
})
return result
}
func (s *store) Run(ctx context.Context) error {
for ctx.Err() == nil {
err := s.runOnce(ctx)
if err != nil {
logutil.Get(ctx).
WithError(errors.WithStack(err)).
Errorf("main loop run failed %d times in a row", s.failureCount)
s.failureCount++
} else {
s.failureCount = 0
}
time.Sleep(s.refresh)
}
return nil
}
func (s *store) runOnce(ctx context.Context) error {
ctx = logutil.Start(ctx, "update")
instances, err := s.fetchInstances(ctx)
if err != nil {
return errors.Wrap(err, "fetching instances failed")
}
changed := false
// check whether a new instance was added or an existing was changed
for _, instance := range instances {
old, ok := s.cache[instance.InstanceID]
if !ok {
logutil.Get(ctx).
WithFields(logutil.FromStruct(instance)).
Debugf("add new instance to cache")
changed = true
continue
}
if instance.Changed(old) {
logutil.Get(ctx).
WithFields(logutil.FromStruct(instance)).
Debugf("cached instance changed")
changed = true
continue
}
}
// check whether an instance was removed
for _, instance := range s.cache {
_, ok := instances[instance.InstanceID]
if !ok {
logutil.Get(ctx).
WithFields(logutil.FromStruct(instance)).
Debugf("cached instance was removed")
changed = true
continue
}
}
// Replacing the whole map has the advantage that we do not need locking.
s.cache = instances
// Emitting a signal AFTER refreshing the cache, if anything changed.
if changed {
s.emitter.Emit()
}
return nil
}
func (s *store) fetchInstances(ctx context.Context) (map[string]Instance, error) {
params := &ec2.DescribeInstancesInput{}
instances := map[string]Instance{}
for {
resp, err := s.api.DescribeInstances(ctx, params)
if err != nil {
return nil, errors.WithStack(err)
}
for _, reservation := range resp.Reservations {
for _, dto := range reservation.Instances {
id := aws.ToString(dto.InstanceId)
if id == "" {
// No idea how this could happend. If it happens anyways,
// we at least skip the item and log it, so the alerting
// gets triggered if it happens more often.
logutil.Get(ctx).WithField("instance-dto", dto).Error("got instance with empty instance ID")
continue
}
instance := Instance{
InstanceID: id,
NodeName: aws.ToString(dto.PrivateDnsName),
State: string(dto.State.Name),
InstanceType: string(dto.InstanceType),
InstanceName: ec2tag(&dto, "Name"),
AvailabilityZone: aws.ToString(dto.Placement.AvailabilityZone),
InstanceLifecycle: string(dto.InstanceLifecycle),
LaunchTime: aws.ToTime(dto.LaunchTime),
}
if instance.State == InstanceStateTerminated || instance.State == InstanceStateShuttingDown {
// Parsing the termination date from the
// StateTransitionReason is not very reliable, since it is
// not standarized and we do tolarate other reasons. This
// is fine, since we use it only for displaying purposes.
// If we need a reliable value, we would need to get it
// from CloudTrail.
text := aws.ToString(dto.StateTransitionReason)
text = strings.TrimPrefix(text, "User initiated")
text = strings.TrimPrefix(text, "Service initiated")
text = strings.TrimSpace(text)
terminationTime, err := time.Parse("(2006-01-02 15:04:05 MST)", text)
if err != nil {
logutil.Get(ctx).
WithField("state-transition-reason", dto.StateTransitionReason).
WithError(errors.WithStack(err)).
Warn("failed to parse state transition reason")
} else {
instance.TerminationTime = &terminationTime
}
}
instances[id] = instance
}
}
if resp.NextToken == nil {
break
}
params = &ec2.DescribeInstancesInput{
NextToken: resp.NextToken,
}
}
return instances, nil
}