-
Notifications
You must be signed in to change notification settings - Fork 250
/
ip_pool.go
188 lines (166 loc) · 6.52 KB
/
ip_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package steps
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/sirupsen/logrus"
coreapi "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
"github.com/openshift/ci-tools/pkg/api"
"github.com/openshift/ci-tools/pkg/junit"
"github.com/openshift/ci-tools/pkg/lease"
"github.com/openshift/ci-tools/pkg/results"
)
var NoLeaseClientForIPErr = errors.New("step needs access to an IP pool, but no lease client provided")
// ipPoolStep wraps another step and acquires/releases chunks of IPs.
type ipPoolStep struct {
client *lease.Client
secretClient ctrlruntimeclient.Client
ipPoolLease stepLease
wrapped api.Step
params api.Parameters
namespace func() string
}
func IPPoolStep(client *lease.Client, secretClient ctrlruntimeclient.Client, lease api.StepLease, wrapped api.Step, params api.Parameters, namespace func() string) api.Step {
ret := ipPoolStep{
client: client,
secretClient: secretClient,
wrapped: wrapped,
namespace: namespace,
params: params,
ipPoolLease: stepLease{StepLease: lease},
}
return &ret
}
func (s *ipPoolStep) Inputs() (api.InputDefinition, error) {
return s.wrapped.Inputs()
}
func (s *ipPoolStep) Validate() error {
if s.client == nil {
return NoLeaseClientForIPErr
}
return nil
}
func (s *ipPoolStep) Name() string { return s.wrapped.Name() }
func (s *ipPoolStep) Description() string { return s.wrapped.Description() }
func (s *ipPoolStep) Requires() []api.StepLink { return s.wrapped.Requires() }
func (s *ipPoolStep) Creates() []api.StepLink { return s.wrapped.Creates() }
func (s *ipPoolStep) Objects() []ctrlruntimeclient.Object { return s.wrapped.Objects() }
func (s *ipPoolStep) Provides() api.ParameterMap {
parameters := s.wrapped.Provides()
if parameters == nil {
parameters = api.ParameterMap{}
}
l := &s.ipPoolLease
// Disable unparam lint as we need to confirm to this interface, but there will never be an error
//nolint:unparam
parameters[l.Env] = func() (string, error) {
return strconv.Itoa(len(l.resources)), nil
}
return parameters
}
func (s *ipPoolStep) SubTests() []*junit.TestCase {
if subTests, ok := s.wrapped.(SubtestReporter); ok {
return subTests.SubTests()
}
return nil
}
func (s *ipPoolStep) Run(ctx context.Context) error {
return results.ForReason("utilizing_ip_pool").ForError(s.run(ctx, time.Minute))
}
// minute is provided as an argument to assist with unit testing
func (s *ipPoolStep) run(ctx context.Context, minute time.Duration) error {
l := &s.ipPoolLease
region, err := s.params.Get(api.DefaultLeaseEnv)
if err != nil || region == "" {
return results.ForReason("acquiring_ip_pool_lease").WithError(err).Errorf("failed to determine region to acquire lease for %s", l.ResourceType)
}
l.ResourceType = fmt.Sprintf("%s-%s", l.ResourceType, region)
logrus.Infof("Acquiring IP Pool leases for test %s: %v", s.Name(), l.ResourceType)
client := *s.client
ctx, cancel := context.WithCancel(ctx)
names, err := client.AcquireIfAvailableImmediately(l.ResourceType, l.Count, cancel)
if err != nil {
if err == lease.ErrNotFound {
logrus.Infof("no leases of type: %s available", l.ResourceType)
} else {
return results.ForReason("acquiring_ip_pool_lease").WithError(err).Errorf("failed to acquire lease for %s: %v", l.ResourceType, err)
}
} else {
logrus.Infof("Acquired %d ip pool lease(s) for %s: %v", l.Count, l.ResourceType, names)
s.ipPoolLease.resources = names
}
remainingResources := make(chan []string)
if len(names) > 0 {
go checkAndReleaseUnusedLeases(ctx, s.namespace(), s.wrapped.Name(), names, s.secretClient, s.client, minute, remainingResources)
}
wrappedErr := results.ForReason("executing_test").ForError(s.wrapped.Run(ctx))
logrus.Infof("Releasing ip pool leases for test %s", s.Name())
select {
case s.ipPoolLease.resources = <-remainingResources:
logrus.Debugf("resources left to release after unused have already been released %v", s.ipPoolLease.resources)
default:
logrus.Debug("no unused resources were released, releasing all")
}
releaseErr := results.ForReason("releasing_ip_pool_lease").ForError(releaseLeases(client, *l))
return aggregateWrappedErrorAndReleaseError(wrappedErr, releaseErr)
}
const UnusedIpCount = "UNUSED_IP_COUNT"
// checkAndReleaseUnusedLeases periodically checks for a positive value in the
// UNUSED_IP_COUNT data in the shared secret which signals that there are leases that can be released.
// If/when this is discovered it will release that number of leases, and stop checking.
// minute is provided as an argument to assist with unit testing.
// The remainingResources channel stores the names of the resources that haven't been released if applicable.
func checkAndReleaseUnusedLeases(ctx context.Context, namespace, testName string, resources []string, secretClient ctrlruntimeclient.Client, leaseClient *lease.Client, minute time.Duration, remainingResources chan<- []string) {
waitUntil := time.After(minute * 15)
sharedDirKey := types.NamespacedName{
Namespace: namespace,
Name: testName, // This is the name of the shared-dir secret
}
ticker := time.NewTicker(minute)
defer ticker.Stop()
for range ticker.C {
logrus.Debugf("checking for unused ip-pool leases to release")
select {
case <-ctx.Done():
return
case <-waitUntil:
logrus.Debugf("timeout to check for unused ip-pool leases has passed, no longer waiting")
return
default:
sharedDirSecret := coreapi.Secret{}
if err := secretClient.Get(ctx, sharedDirKey, &sharedDirSecret); err != nil {
logrus.WithError(err).Warn("could not get shared dir secret")
continue
}
rawCount := string(sharedDirSecret.Data[UnusedIpCount])
if rawCount == "" {
continue
}
rawCount = strings.TrimSpace(rawCount)
count, err := strconv.Atoi(rawCount)
if err != nil {
logrus.WithError(err).Warnf("cannot convert %s contents to int", UnusedIpCount)
}
logrus.Infof("there are %d unused ip-pool addresses to release", count)
client := *leaseClient
if count > len(resources) {
logrus.Warnf("requested to release %d ip-pool leases, but only %d have been leased; ignoring request", count, len(resources))
return
}
for i := 0; i < count; i++ {
name := resources[i]
logrus.Infof("releasing unused ip-pool lease: %s", name)
if err = client.Release(name); err != nil {
logrus.WithError(err).Warnf("cannot release ip-pool lease %s", name)
}
}
remainingResources <- resources[count:]
return
}
}
}