/
async-actor.go
228 lines (200 loc) · 6.77 KB
/
async-actor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
package service
import (
"errors"
"fmt"
"github.com/metal-stack/metal-api/cmd/metal-api/internal/headscale"
"go.uber.org/zap"
ipamer "github.com/metal-stack/go-ipam"
"github.com/metal-stack/metal-api/cmd/metal-api/internal/datastore"
"github.com/metal-stack/metal-api/cmd/metal-api/internal/ipam"
"github.com/metal-stack/metal-api/cmd/metal-api/internal/metal"
"github.com/metal-stack/metal-lib/bus"
"github.com/metal-stack/metal-lib/pkg/tag"
)
type asyncActor struct {
log *zap.SugaredLogger
ipam.IPAMer
*datastore.RethinkStore
machineNetworkReleaser bus.Func
ipReleaser bus.Func
}
func newAsyncActor(l *zap.SugaredLogger, ep *bus.Endpoints, ds *datastore.RethinkStore, ip ipam.IPAMer) (*asyncActor, error) {
actor := &asyncActor{
log: l,
IPAMer: ip,
RethinkStore: ds,
}
var err error
_, actor.machineNetworkReleaser, err = ep.Function("releaseMachineNetworks", actor.releaseMachineNetworks)
if err != nil {
return nil, fmt.Errorf("cannot create async bus function for machine releasing: %w", err)
}
_, actor.ipReleaser, err = ep.Function("releaseIP", actor.releaseIP)
if err != nil {
return nil, fmt.Errorf("cannot create bus function for ip releasing: %w", err)
}
return actor, nil
}
func (a *asyncActor) freeMachine(pub bus.Publisher, m *metal.Machine, headscaleClient *headscale.HeadscaleClient, logger *zap.SugaredLogger) error {
if m.State.Value == metal.LockedState {
return errors.New("machine is locked")
}
if headscaleClient != nil {
// always call DeleteMachine, in case machine is not registered it will return nil
if err := headscaleClient.DeleteMachine(m.ID, m.Allocation.Project); err != nil {
logger.Error("unable to delete Node entry from headscale DB", zap.String("machineID", m.ID), zap.Error(err))
}
}
err := deleteVRFSwitches(a.RethinkStore, m, a.log.Desugar())
if err != nil {
return err
}
err = publishDeleteEvent(pub, m, a.log.Desugar())
if err != nil {
return err
}
// call the releaser async
err = a.machineNetworkReleaser(m)
if err != nil {
// log error, but what should we do here? we already called
// deleteVRFSwitches and publishDeleteEvent, so should we return
// an error or "fall through"?
a.log.Errorw("cannot call async machine cleanup", "error", err)
}
old := *m
m.Allocation = nil
m.Tags = nil
m.PreAllocated = false
err = a.UpdateMachine(&old, m)
if err != nil {
return err
}
a.log.Infow("freed machine", "machineID", m.ID)
return nil
}
func (a *asyncActor) releaseMachineNetworks(machine *metal.Machine) error {
if machine.Allocation == nil {
return nil
}
var asn uint32
for _, machineNetwork := range machine.Allocation.MachineNetworks {
if machineNetwork.IPs == nil {
continue
}
for _, ipString := range machineNetwork.IPs {
ip, err := a.FindIPByID(ipString)
if err != nil {
if metal.IsNotFound(err) {
// if we do not skip here we will always fail releasing the next ip addresses
// after the first ip was released
continue
}
return err
}
err = a.disassociateIP(ip, machine)
if err != nil {
return err
}
}
// all machineNetworks have the same ASN, must only be released once
// in the old way asn was in the range of 4200000000 + offset from last two octets of private ip
// but we must only release asn which are acquired from 4210000000 and above from the ASNIntegerPool
if machineNetwork.ASN >= ASNBase {
asn = machineNetwork.ASN
}
}
if asn >= ASNBase {
err := releaseASN(a.RethinkStore, asn)
if err != nil {
return err
}
}
// it can happen that an IP gets properly allocated for a machine but
// the machine was not added to the machine network. We call these
// IPs "dangling".
var danglingIPs metal.IPs
err := a.SearchIPs(&datastore.IPSearchQuery{
Tags: []string{metal.IpTag(tag.MachineID, machine.ID)},
}, &danglingIPs)
if err != nil {
return err
}
for i := range danglingIPs {
ip := danglingIPs[i]
err = a.disassociateIP(&ip, machine)
if err != nil {
return err
}
}
return nil
}
func (a *asyncActor) disassociateIP(ip *metal.IP, machine *metal.Machine) error {
// ignore ips that were associated with the machine for allocation but the association is not present anymore at the ip
if !ip.HasMachineId(machine.GetID()) {
return nil
}
// disassociate machine from ip
newIP := *ip
newIP.RemoveMachineId(machine.GetID())
err := a.UpdateIP(ip, &newIP)
if err != nil {
return err
}
// static ips should not be released automatically
if ip.Type == metal.Static {
return nil
}
// ips that are associated to other machines will should not be released automatically
if len(newIP.GetMachineIds()) > 0 {
return nil
}
// at this point the machine is removed from the IP, so the release of the
// IP must not fail, because this loop will never come to this point again because the
// begin of this loop checks if the IP contains the machine.
// so we fork a new job to delete the IP. if this fails .... well then ("houston we have a problem")
// we do not report the error to the caller, because this whole function cannot be re-do'ed.
a.log.Infow("async release IP", "ip", *ip)
if err := a.ipReleaser(*ip); err != nil {
// what should we do here? this error shows a problem with the nsq-bus system
a.log.Errorw("cannot call ip releaser", "error", err)
}
return nil
}
// releaseIP releases the given IP. This function only checks if the given IP is connected
// to a machine and only deletes the IP if this is not the case. It is the duty of the caller
// to implement more validations: when a machine is deleted the caller has to check if the IP
// is static or not. If only an IP is freed, the caller has to check if the IP has machine scope.
func (a *asyncActor) releaseIP(ip metal.IP) error {
a.log.Infow("release IP", "ip", ip)
dbip, err := a.FindIPByID(ip.IPAddress)
if err != nil && !metal.IsNotFound(err) {
// some unknown error, we will let nsq resend the command
a.log.Errorw("cannot find IP", "ip", ip, "error", err)
return err
}
if err == nil {
// if someone calls freeMachine for the same machine multiple times at the same
// moment it can happen that we already deleted and released the IP in the ipam
// so make sure that this IP is not already connected to a new machine
if len(dbip.GetMachineIds()) > 0 {
a.log.Infow("do not delete IP, it is connected to a machine", "ip", ip)
return nil
}
// the ip is in our database and is not connected to a machine so cleanup
err = a.DeleteIP(&ip)
if err != nil {
a.log.Errorw("cannot delete IP in datastore", "ip", ip, "error", err)
return err
}
}
// now the IP should not exist any more in our datastore
// so cleanup the ipam
err = a.ReleaseIP(ip)
if err != nil {
if errors.Is(err, ipamer.ErrNotFound) {
return nil
}
return fmt.Errorf("cannot release IP %q: %w", ip, err)
}
return nil
}