-
Notifications
You must be signed in to change notification settings - Fork 221
/
machine_set.go
149 lines (131 loc) · 3.95 KB
/
machine_set.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package machine
import (
"context"
"errors"
"fmt"
"slices"
"sync"
"time"
fly "github.com/superfly/fly-go"
"github.com/superfly/fly-go/flaps"
"github.com/superfly/flyctl/internal/tracing"
"github.com/superfly/flyctl/iostreams"
"github.com/superfly/flyctl/terminal"
)
type MachineSet interface {
AcquireLeases(context.Context, time.Duration) error
ReleaseLeases(context.Context) error
RemoveMachines(ctx context.Context, machines []LeasableMachine) error
StartBackgroundLeaseRefresh(context.Context, time.Duration, time.Duration)
IsEmpty() bool
GetMachines() []LeasableMachine
}
type machineSet struct {
machines []LeasableMachine
}
func NewMachineSet(flapsClient *flaps.Client, io *iostreams.IOStreams, machines []*fly.Machine) *machineSet {
leaseMachines := make([]LeasableMachine, 0)
for _, m := range machines {
leaseMachines = append(leaseMachines, NewLeasableMachine(flapsClient, io, m))
}
return &machineSet{
machines: leaseMachines,
}
}
func (ms *machineSet) IsEmpty() bool {
return len(ms.machines) == 0
}
func (ms *machineSet) GetMachines() []LeasableMachine {
return ms.machines
}
func (ms *machineSet) AcquireLeases(ctx context.Context, duration time.Duration) error {
if len(ms.machines) == 0 {
return nil
}
results := make(chan error, len(ms.machines))
var wg sync.WaitGroup
for _, m := range ms.machines {
wg.Add(1)
go func(m LeasableMachine) {
defer wg.Done()
results <- m.AcquireLease(ctx, duration)
}(m)
}
go func() {
wg.Wait()
close(results)
}()
hadError := false
for err := range results {
if err != nil {
hadError = true
terminal.Warnf("failed to acquire lease: %v\n", err)
}
}
if hadError {
if err := ms.ReleaseLeases(ctx); err != nil {
terminal.Warnf("error releasing machine leases: %v\n", err)
}
return fmt.Errorf("error acquiring leases on all machines")
}
return nil
}
func (ms *machineSet) RemoveMachines(ctx context.Context, machines []LeasableMachine) error {
// Rewrite machines array to exclude the ones we just released.
tempMachines := ms.machines[:0]
// Compute the intersection between all of the machines on machineSet with the machines we want to remove.
for _, oldMach := range ms.machines {
if !slices.ContainsFunc(machines, func(m LeasableMachine) bool { return oldMach.Machine().ID == m.Machine().ID }) {
tempMachines = append(tempMachines, oldMach)
}
}
ms.machines = tempMachines
subset := machineSet{machines: machines}
return subset.ReleaseLeases(ctx)
}
func (ms *machineSet) ReleaseLeases(ctx context.Context) error {
if len(ms.machines) == 0 {
return nil
}
// when context is canceled, take 500ms to attempt to release the leases
contextWasAlreadyCanceled := errors.Is(ctx.Err(), context.Canceled)
if contextWasAlreadyCanceled {
var cancel context.CancelFunc
cancelTimeout := 500 * time.Millisecond
ctx, cancel = context.WithTimeout(ctx, cancelTimeout)
terminal.Infof("detected canceled context and allowing %s to release machine leases\n", cancelTimeout)
defer cancel()
}
results := make(chan error, len(ms.machines))
var wg sync.WaitGroup
for _, m := range ms.machines {
wg.Add(1)
go func(m LeasableMachine) {
defer wg.Done()
results <- m.ReleaseLease(ctx)
}(m)
}
go func() {
wg.Wait()
close(results)
}()
hadError := false
for err := range results {
contextTimedOutOrCanceled := errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)
if err != nil && (!contextWasAlreadyCanceled || !contextTimedOutOrCanceled) {
hadError = true
terminal.Warnf("failed to release lease: %v\n", err)
}
}
if hadError {
return fmt.Errorf("error releasing leases on machines")
}
return nil
}
func (ms *machineSet) StartBackgroundLeaseRefresh(ctx context.Context, leaseDuration time.Duration, delayBetween time.Duration) {
ctx, span := tracing.GetTracer().Start(ctx, "start_background_lease_refresh")
defer span.End()
for _, m := range ms.machines {
m.StartBackgroundLeaseRefresh(ctx, leaseDuration, delayBetween)
}
}