-
Notifications
You must be signed in to change notification settings - Fork 220
/
lease.go
99 lines (79 loc) · 2.87 KB
/
lease.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package machine
import (
"context"
"fmt"
"strings"
"github.com/sourcegraph/conc/pool"
"github.com/superfly/flyctl/api"
"github.com/superfly/flyctl/flaps"
"github.com/superfly/flyctl/iostreams"
)
const maxConcurrentLeases = 20
type releaseLeaseFunc func()
// AcquireAllLeases works to acquire/attach a lease for each active machine.
func AcquireAllLeases(ctx context.Context) ([]*api.Machine, releaseLeaseFunc, error) {
machines, err := ListActive(ctx)
if err != nil {
return nil, func() {}, err
}
return AcquireLeases(ctx, machines)
}
// AcquireLeases works to acquire/attach a lease for each machine specified.
func AcquireLeases(ctx context.Context, machines []*api.Machine) ([]*api.Machine, releaseLeaseFunc, error) {
acquirePool := pool.NewWithResults[*api.Machine]().
WithErrors().
WithMaxGoroutines(maxConcurrentLeases)
for _, m := range machines {
m := m
acquirePool.Go(func() (*api.Machine, error) {
m, _, err := AcquireLease(ctx, m)
return m, err
})
}
leaseHoldingMachines, err := acquirePool.Wait()
releaseFunc := func() {
p := pool.New()
for _, m := range leaseHoldingMachines {
p.Go(func() { releaseLease(ctx, m) })
}
p.Wait()
}
return leaseHoldingMachines, releaseFunc, err
}
func releaseLease(ctx context.Context, machine *api.Machine) {
if machine == nil || machine.LeaseNonce == "" {
return
}
io := iostreams.FromContext(ctx)
flapsClient := flaps.FromContext(ctx)
if err := flapsClient.ReleaseLease(ctx, machine.ID, machine.LeaseNonce); err != nil {
if !strings.Contains(err.Error(), "lease not found") {
fmt.Fprintf(io.Out, "failed to release lease for machine %s: %s", machine.ID, err.Error())
}
}
}
// AcquireLease works to acquire/attach a lease for the specified machine.
// WARNING: Make sure you defer the lease release process.
func AcquireLease(ctx context.Context, machine *api.Machine) (*api.Machine, releaseLeaseFunc, error) {
flapsClient := flaps.FromContext(ctx)
lease, err := flapsClient.AcquireLease(ctx, machine.ID, api.IntPointer(120))
if err != nil {
return nil, func() {}, fmt.Errorf("failed to obtain lease: %w", err)
}
releaseFunc := func() { releaseLease(ctx, machine) }
// Set lease nonce before we re-fetch the Machines latest configuration.
// This will ensure the lease can still be released in the event the upcoming GET fails.
machine.LeaseNonce = lease.Data.Nonce
// Return earlier if the lease's machine version matches the machine's version we have
if machine.InstanceID == lease.Data.Version {
return machine, releaseFunc, nil
}
// Re-query machine post-lease acquisition to ensure we are working against the latest configuration.
updatedMachine, err := flapsClient.Get(ctx, machine.ID)
if err != nil {
return machine, releaseFunc, err
}
updatedMachine.LeaseNonce = lease.Data.Nonce
releaseFunc = func() { releaseLease(ctx, updatedMachine) }
return updatedMachine, releaseFunc, nil
}