/
cluster_swarm.go
405 lines (340 loc) 路 11.7 KB
/
cluster_swarm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
package runner
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"reflect"
"time"
"github.com/testground/sdk-go/ptypes"
"github.com/testground/sdk-go/runtime"
"github.com/testground/testground/pkg/api"
"github.com/testground/testground/pkg/aws"
"github.com/testground/testground/pkg/conv"
"github.com/testground/testground/pkg/rpc"
"golang.org/x/sync/errgroup"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
)
var (
_ api.Runner = &ClusterSwarmRunner{}
)
// ClusterSwarmRunnerConfig is the configuration object of this runner. Boolean
// values are expressed in a way that zero value (false) is the default setting.
type ClusterSwarmRunnerConfig struct {
// LogLevel sets the log level in the test containers (default: not set).
LogLevel string `toml:"log_level"`
// Background avoids tailing the output of containers, and displaying it as
// log messages (default: true).
Background bool `toml:"background"`
// DockerEndpoint is the URL of the docker swarm manager endpoint, e.g.
// "tcp://manager:2376"
DockerEndpoint string `toml:"docker_endpoint"`
// DockerTLS indicates whether client TLS is enabled.
DockerTLS bool `toml:"docker_tls"`
// DockerTLSCACertPath is the path to the CA Certificate. Only used if
// DockerTLS = true.
DockerTLSCACertPath string `toml:"docker_tls_ca_cert_path"`
// DockerTLSCertPath is the path to our client cert, signed by the CA. Only
// used if DockerTLS = true.
DockerTLSCertPath string `toml:"docker_tls_cert_path"`
// DockerTLSKeyPath is our private key. Only used if DockerTLS = true.
DockerTLSKeyPath string `toml:"docker_tls_key_path"`
// KeepService keeps the service after all instances have finished and
// all logs have been piped. Only used when running in foreground mode
// (default is background mode).
KeepService bool `toml:"keep_service"`
}
// ClusterSwarmRunner is a runner that creates a Docker service to launch as
// many replicated instances of a container as the run job indicates.
type ClusterSwarmRunner struct{}
// TODO runner option to keep containers alive instead of deleting them after
// the test has run.
func (*ClusterSwarmRunner) Run(ctx context.Context, input *api.RunInput, ow *rpc.OutputWriter) (*api.RunOutput, error) {
var (
log = ow.With("runner", "cluster:swarm", "run_id", input.RunID)
cfg = *input.RunnerConfig.(*ClusterSwarmRunnerConfig)
)
// global timeout of 1 minute for the scheduling.
ctx, cancelFn := context.WithTimeout(ctx, 1*time.Minute)
defer cancelFn()
parent := fmt.Sprintf("tg-%s-%s-%s", input.TestPlan, input.TestCase, input.RunID)
// Build a runenv.
template := runtime.RunParams{
TestPlan: input.TestPlan,
TestCase: input.TestCase,
TestRun: input.RunID,
TestInstanceCount: input.TotalInstances,
TestDisableMetrics: input.DisableMetrics,
TestSidecar: true,
}
// Create a docker client.
var opts []client.Opt
if cfg.DockerTLS {
opts = append(opts, client.WithTLSClientConfig(cfg.DockerTLSCACertPath, cfg.DockerTLSCertPath, cfg.DockerTLSKeyPath))
}
opts = append(opts, client.WithHost(cfg.DockerEndpoint), client.WithAPIVersionNegotiation())
cli, err := client.NewClientWithOpts(opts...)
if err != nil {
return nil, err
}
// first check if redis is running.
svcs, err := cli.ServiceList(ctx, types.ServiceListOptions{
Filters: filters.NewArgs(filters.Arg("name", "testground-redis")),
})
if err != nil {
return nil, err
} else if len(svcs) == 0 {
return nil, fmt.Errorf("testground-redis service doesn't exist in the swarm cluster; aborting")
}
// We can't create a network for every testplan on the same range,
// so we check how many networks we have and decide based on this number
networks, err := cli.NetworkList(ctx, types.NetworkListOptions{
Filters: filters.NewArgs(filters.Arg("label", "testground.name=default")),
})
if err != nil {
return nil, err
}
subnet, gateway, err := nextDataNetwork(len(networks))
if err != nil {
return nil, err
}
template.TestSubnet = &ptypes.IPNet{IPNet: *subnet}
// Create the data network.
log.Infow("creating data network", "parent", parent, "subnet", subnet)
networkSpec := types.NetworkCreate{
Driver: "overlay",
CheckDuplicate: true,
// EnableIPv6: true, // TODO(steb): this breaks.
Internal: true,
Attachable: true,
Scope: "swarm",
IPAM: &network.IPAM{
Driver: "default",
Config: []network.IPAMConfig{{
Subnet: subnet.String(),
Gateway: gateway,
}},
},
Labels: map[string]string{
"testground.plan": input.TestPlan,
"testground.testcase": input.TestCase,
"testground.run_id": input.RunID,
"testground.name": "default", // default name. TODO: allow multiple networks.
},
}
networkResp, err := cli.NetworkCreate(ctx, parent+"-default", networkSpec)
if err != nil {
return nil, err
}
networkID := networkResp.ID
log.Infow("network created successfully", "id", networkID)
defer func() {
if cfg.KeepService || cfg.Background {
log.Info("skipping removing the data network due to user request")
// if we are keeping the service, we must also keep the network.
return
}
err = retry(5, 1*time.Second, func() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return cli.NetworkRemove(ctx, networkID)
})
if err != nil {
log.Errorw("couldn't remove network", "network", networkID, "err", err)
}
}()
ow.Infof("fetching an authorization token from AWS ECR")
// Get an authorization token from AWS ECR.
auth, err := aws.ECR.GetAuthToken(input.EnvConfig.AWS)
if err != nil {
return nil, err
}
ow.Infof("fetched an authorization token from AWS ECR")
services := make(map[string]int, len(input.Groups))
for _, g := range input.Groups {
runenv := template
runenv.TestGroupID = g.ID
runenv.TestGroupInstanceCount = g.Instances
runenv.TestInstanceParams = g.Parameters
runenv.TestCaptureProfiles = g.Profiles
// Serialize the runenv into env variables to pass to docker.
env := conv.ToOptionsSlice(runenv.ToEnvVars())
// Set the log level if provided in cfg.
if cfg.LogLevel != "" {
env = append(env, "LOG_LEVEL="+cfg.LogLevel)
}
// Create the service.
log.Infow("creating service", "parent", parent, "group", g.ID, "image", g.ArtifactPath, "replicas", g.Instances)
cnt := (uint64)(runenv.TestGroupInstanceCount)
serviceSpec := swarm.ServiceSpec{
Networks: []swarm.NetworkAttachmentConfig{
{Target: "control"},
{Target: networkID},
},
Mode: swarm.ServiceMode{
Replicated: &swarm.ReplicatedService{
Replicas: &cnt,
},
},
Annotations: swarm.Annotations{Name: parent},
TaskTemplate: swarm.TaskSpec{
ContainerSpec: &swarm.ContainerSpec{
Image: g.ArtifactPath,
Env: env,
Labels: map[string]string{
"testground.plan": input.TestPlan,
"testground.testcase": input.TestCase,
"testground.run_id": input.RunID,
"testground.groupid": g.ID,
},
},
RestartPolicy: &swarm.RestartPolicy{
Condition: swarm.RestartPolicyConditionNone,
},
Resources: &swarm.ResourceRequirements{
Reservations: &swarm.Resources{
MemoryBytes: 60 * 1024 * 1024,
},
Limits: &swarm.Resources{
MemoryBytes: 30 * 1024 * 1024,
},
},
Placement: &swarm.Placement{
MaxReplicas: 10000,
Constraints: []string{
"node.labels.TGRole==worker",
},
},
},
}
scopts := types.ServiceCreateOptions{
QueryRegistry: true,
// the registry auth will be propagated to all docker swarm nodes so
// they can fetch the image properly.
EncodedRegistryAuth: aws.ECR.EncodeAuthToken(auth),
}
ow.Infow("creating the service on docker swarm", "parent", parent, "group", g.ID, "image", g.ArtifactPath, "replicas", g.Instances)
// Now create the docker swarm service.
serviceResp, err := cli.ServiceCreate(ctx, serviceSpec, scopts)
if err != nil {
return nil, err
}
ow.Infow("service created successfully", "id", serviceResp.ID)
services[serviceResp.ID] = g.Instances
}
// If we are running in background mode, return immediately.
if cfg.Background {
return &api.RunOutput{RunID: input.RunID}, nil
}
// Docker multiplexes STDOUT and STDERR streams inside the single IO stream
// returned by ServiceLogs. We need to use docker functions to separate
// those strands, and because we don't care about treating STDOUT and STDERR
// separately, we consolidate them into the same io.Writer.
rpipe, wpipe := io.Pipe()
// Tail all services until all instances are done, then remove the service
// if the flag has been set.
errgrp, ctx := errgroup.WithContext(ctx)
for service, count := range services {
rc, err := cli.ServiceLogs(context.Background(), service, types.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Since: "2019-01-01T00:00:00",
Follow: true,
})
if err != nil {
return nil, fmt.Errorf("failed while tailing logs: %w", err)
}
// Pipe logs to the merging pipe.
errgrp.Go(func() error {
// StdCopy reads until EOF, which is triggered by closing rc when the service has finished (below).
_, err := stdcopy.StdCopy(wpipe, wpipe, rc)
return err
})
// This goroutine monitors the state of tasks every two seconds. When all
// tasks are shutdown, we are done here. We close the logs io.ReadCloser,
// which in turns signals that the runner is now finished.
errgrp.Go(func(service string, count int) func() error {
return func() error {
tick := time.NewTicker(2 * time.Second)
defer tick.Stop()
defer rc.Close()
for range tick.C {
var finished int
tasks, err := cli.TaskList(ctx, types.TaskListOptions{
Filters: filters.NewArgs(filters.Arg("service", service)),
})
if err != nil {
return err
}
status := make(map[swarm.TaskState]uint64, count)
for _, t := range tasks {
s := t.Status.State
switch status[s]++; s {
case swarm.TaskStateShutdown, swarm.TaskStateComplete, swarm.TaskStateFailed, swarm.TaskStateRejected:
finished++
}
}
ow.Infow("task status", "service", service, "status", status)
if finished == count {
break
}
}
return nil
}
}(service, count))
go func() {
err := errgrp.Wait()
_ = wpipe.CloseWithError(err)
}()
}
scanner := bufio.NewScanner(rpipe)
for scanner.Scan() {
fmt.Println(scanner.Text())
}
if !cfg.KeepService {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
for service := range services {
ow.Infow("removing service", "service", service)
if err := cli.ServiceRemove(ctx, service); err == nil {
ow.Infow("service removed", "service", service)
} else {
ow.Errorf("removing the service failed: %w", err)
}
}
} else {
log.Info("skipping removing the service due to user request")
}
return &api.RunOutput{RunID: input.RunID}, nil
}
func (*ClusterSwarmRunner) CollectOutputs(ctx context.Context, input *api.CollectionInput, ow *rpc.OutputWriter) error {
return errors.New("unimplemented")
}
func (*ClusterSwarmRunner) ID() string {
return "cluster:swarm"
}
func (*ClusterSwarmRunner) ConfigType() reflect.Type {
return reflect.TypeOf(ClusterSwarmRunnerConfig{})
}
func (*ClusterSwarmRunner) CompatibleBuilders() []string {
return []string{"docker:go"}
}
func retry(attempts int, sleep time.Duration, f func() error) (err error) {
for i := 0; ; i++ {
err = f()
if err == nil {
return
}
if i >= (attempts - 1) {
break
}
time.Sleep(sleep)
}
return fmt.Errorf("after %d attempts, last error: %s", attempts, err)
}