forked from cilium/cilium
-
Notifications
You must be signed in to change notification settings - Fork 0
/
instances.go
248 lines (211 loc) · 8.15 KB
/
instances.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium
// Copyright 2017 Lyft, Inc.
package eni
import (
"context"
"time"
"github.com/sirupsen/logrus"
eniTypes "github.com/cilium/cilium/pkg/aws/eni/types"
"github.com/cilium/cilium/pkg/aws/types"
"github.com/cilium/cilium/pkg/ipam"
ipamTypes "github.com/cilium/cilium/pkg/ipam/types"
v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
"github.com/cilium/cilium/pkg/lock"
)
// EC2API is the API surface used of the EC2 API
type EC2API interface {
GetInstances(ctx context.Context, vpcs ipamTypes.VirtualNetworkMap, subnets ipamTypes.SubnetMap) (*ipamTypes.InstanceMap, error)
GetSubnets(ctx context.Context) (ipamTypes.SubnetMap, error)
GetVpcs(ctx context.Context) (ipamTypes.VirtualNetworkMap, error)
GetSecurityGroups(ctx context.Context) (types.SecurityGroupMap, error)
GetDetachedNetworkInterfaces(ctx context.Context, tags ipamTypes.Tags, maxResults int32) ([]string, error)
CreateNetworkInterface(ctx context.Context, toAllocate int32, subnetID, desc string, groups []string, allocatePrefixes bool) (string, *eniTypes.ENI, error)
AttachNetworkInterface(ctx context.Context, index int32, instanceID, eniID string) (string, error)
DeleteNetworkInterface(ctx context.Context, eniID string) error
ModifyNetworkInterface(ctx context.Context, eniID, attachmentID string, deleteOnTermination bool) error
AssignPrivateIpAddresses(ctx context.Context, eniID string, addresses int32) error
UnassignPrivateIpAddresses(ctx context.Context, eniID string, addresses []string) error
AssignENIPrefixes(ctx context.Context, eniID string, prefixes int32) error
UnassignENIPrefixes(ctx context.Context, eniID string, prefixes []string) error
}
// InstancesManager maintains the list of instances. It must be kept up to date
// by calling resync() regularly.
type InstancesManager struct {
mutex lock.RWMutex
instances *ipamTypes.InstanceMap
subnets ipamTypes.SubnetMap
vpcs ipamTypes.VirtualNetworkMap
securityGroups types.SecurityGroupMap
api EC2API
}
// NewInstancesManager returns a new instances manager
func NewInstancesManager(api EC2API) *InstancesManager {
return &InstancesManager{
instances: ipamTypes.NewInstanceMap(),
api: api,
}
}
// CreateNode is called on discovery of a new node and returns the ENI node
// allocation implementation for the new node
func (m *InstancesManager) CreateNode(obj *v2.CiliumNode, n *ipam.Node) ipam.NodeOperations {
return NewNode(n, obj, m)
}
// HasInstance returns whether the instance is in instances
func (m *InstancesManager) HasInstance(instanceID string) bool {
m.mutex.RLock()
defer m.mutex.RUnlock()
return m.instances.Exists(instanceID)
}
// GetPoolQuota returns the number of available IPs in all IP pools
func (m *InstancesManager) GetPoolQuota() ipamTypes.PoolQuotaMap {
pool := ipamTypes.PoolQuotaMap{}
for subnetID, subnet := range m.GetSubnets(context.TODO()) {
pool[ipamTypes.PoolID(subnetID)] = ipamTypes.PoolQuota{
AvailabilityZone: subnet.AvailabilityZone,
AvailableIPs: subnet.AvailableAddresses,
}
}
return pool
}
// GetSubnet returns the subnet by subnet ID
//
// The returned subnet is immutable so it can be safely accessed
func (m *InstancesManager) GetSubnet(subnetID string) *ipamTypes.Subnet {
m.mutex.RLock()
defer m.mutex.RUnlock()
return m.subnets[subnetID]
}
// GetSubnets returns all the tracked subnets
//
// The returned subnetMap is immutable so it can be safely accessed
func (m *InstancesManager) GetSubnets(ctx context.Context) ipamTypes.SubnetMap {
m.mutex.RLock()
defer m.mutex.RUnlock()
subnetsCopy := make(ipamTypes.SubnetMap)
for k, v := range m.subnets {
subnetsCopy[k] = v
}
return subnetsCopy
}
// FindSubnetByIDs returns the subnet with the most addresses matching VPC ID,
// availability zone within a provided list of subnet ids
//
// The returned subnet is immutable so it can be safely accessed
func (m *InstancesManager) FindSubnetByIDs(vpcID, availabilityZone string, subnetIDs []string) (bestSubnet *ipamTypes.Subnet) {
m.mutex.RLock()
defer m.mutex.RUnlock()
for _, s := range m.subnets {
if s.VirtualNetworkID == vpcID && s.AvailabilityZone == availabilityZone {
for _, subnetID := range subnetIDs {
if s.ID == subnetID {
if bestSubnet == nil || bestSubnet.AvailableAddresses < s.AvailableAddresses {
bestSubnet = s
}
continue
}
}
}
}
return
}
// FindSubnetByTags returns the subnet with the most addresses matching VPC ID,
// availability zone and all required tags
//
// The returned subnet is immutable so it can be safely accessed
func (m *InstancesManager) FindSubnetByTags(vpcID, availabilityZone string, required ipamTypes.Tags) (bestSubnet *ipamTypes.Subnet) {
m.mutex.RLock()
defer m.mutex.RUnlock()
for _, s := range m.subnets {
if s.VirtualNetworkID == vpcID && s.AvailabilityZone == availabilityZone && s.Tags.Match(required) {
if bestSubnet == nil || bestSubnet.AvailableAddresses < s.AvailableAddresses {
bestSubnet = s
}
}
}
return
}
// FindSecurityGroupByTags returns the security groups matching VPC ID and all required tags
//
// The returned security groups slice is immutable so it can be safely accessed
func (m *InstancesManager) FindSecurityGroupByTags(vpcID string, required ipamTypes.Tags) []*types.SecurityGroup {
m.mutex.RLock()
defer m.mutex.RUnlock()
securityGroups := []*types.SecurityGroup{}
for _, securityGroup := range m.securityGroups {
if securityGroup.VpcID == vpcID && securityGroup.Tags.Match(required) {
securityGroups = append(securityGroups, securityGroup)
}
}
return securityGroups
}
// Resync fetches the list of EC2 instances and subnets and updates the local
// cache in the instanceManager. It returns the time when the resync has
// started or time.Time{} if it did not complete.
func (m *InstancesManager) Resync(ctx context.Context) time.Time {
resyncStart := time.Now()
vpcs, err := m.api.GetVpcs(ctx)
if err != nil {
log.WithError(err).Warning("Unable to synchronize EC2 VPC list")
return time.Time{}
}
subnets, err := m.api.GetSubnets(ctx)
if err != nil {
log.WithError(err).Warning("Unable to retrieve EC2 subnets list")
return time.Time{}
}
securityGroups, err := m.api.GetSecurityGroups(ctx)
if err != nil {
log.WithError(err).Warning("Unable to retrieve EC2 security group list")
return time.Time{}
}
instances, err := m.api.GetInstances(ctx, vpcs, subnets)
if err != nil {
log.WithError(err).Warning("Unable to synchronize EC2 interface list")
return time.Time{}
}
log.WithFields(logrus.Fields{
"numInstances": instances.NumInstances(),
"numVPCs": len(vpcs),
"numSubnets": len(subnets),
"numSecurityGroups": len(securityGroups),
}).Info("Synchronized ENI information")
m.mutex.Lock()
m.instances = instances
m.subnets = subnets
m.vpcs = vpcs
m.securityGroups = securityGroups
m.mutex.Unlock()
return resyncStart
}
// UpdateENI updates the ENI definition of an ENI for a particular instance. If
// the ENI is already known, the definition is updated, otherwise the ENI is
// added to the instance.
func (m *InstancesManager) UpdateENI(instanceID string, eni *eniTypes.ENI) {
m.mutex.Lock()
eniRevision := ipamTypes.InterfaceRevision{Resource: eni}
m.instances.Update(instanceID, eniRevision)
m.mutex.Unlock()
}
// ForeachInstance will iterate over each interface for a particular instance inside `instances`
// and call `fn`.
// This function is read-locked for the entire execution.
func (m *InstancesManager) ForeachInstance(instanceID string, fn ipamTypes.InterfaceIterator) {
// This is a safety net in case the InstanceID is not known for some
// reason. If we don't know the instanceID, we also can't derive the
// list of ENIs attached to this instance. Without this,
// ForeachInstance() would return the ENIs of all instances.
if instanceID == "" {
log.Error("BUG: Inconsistent CiliumNode state. The InstanceID is not known")
return
}
m.mutex.RLock()
defer m.mutex.RUnlock()
m.instances.ForeachInterface(instanceID, fn)
}
// DeleteInstance delete instance from m.instances
func (m *InstancesManager) DeleteInstance(instanceID string) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.instances.Delete(instanceID)
}