forked from grailbio/reflow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
docker.go
797 lines (751 loc) · 24.3 KB
/
docker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
// Copyright 2017 GRAIL, Inc. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
package local
//go:generate stringer -type=execState
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"os"
"path"
"strconv"
"strings"
"sync"
"time"
"github.com/docker/docker/pkg/stdcopy"
"github.com/docker/engine-api/client"
"github.com/docker/engine-api/types"
"github.com/docker/engine-api/types/container"
"github.com/docker/engine-api/types/network"
"github.com/grailbio/base/digest"
"github.com/grailbio/reflow"
"github.com/grailbio/reflow/errors"
"github.com/grailbio/reflow/log"
"github.com/grailbio/reflow/repository/file"
)
// Exec directory layout:
// <exec>/arg/n/m...
// <exec>/out
// <exec>/manifest.json -- contains final output, only after things are done.
const (
inspectPath = "inspect.json"
manifestPath = "manifest.json"
objectPath = "obj"
)
var dockerUser = fmt.Sprintf("%d:%d", os.Getuid(), os.Getgid())
// dockerExec is a (local) exec attached to a local executor, from which it
// is given its own subdirectory to operate. exec is responsible for
// the lifecycle of an exec through an executor. It maintains a state
// machine (invoked by exec.Go) to see the exec through completion.
// Before every state change, exec saves its state to manifestPath,
// and is always recoverable from the previous state.
type dockerExec struct {
// The Executor that owns this exec.
Executor *Executor
// The (possibly nil) Logger that logs exec's actions, for external consumption.
Log *log.Logger
id digest.Digest
client *client.Client
repo *file.Repository
staging file.Repository
stdout *log.Logger
stderr *log.Logger
mu sync.Mutex
cond *sync.Cond
// Manifest stores the serializable state of the exec.
Manifest
err error
}
// newExec creates a new exec with parent executor x.
func newDockerExec(id digest.Digest, x *Executor, cfg reflow.ExecConfig, stdout, stderr *log.Logger) *dockerExec {
e := &dockerExec{
Executor: x,
// Fill in from executor:
Log: x.Log.Tee(nil, fmt.Sprintf("%s: ", id)),
repo: x.FileRepository,
id: id,
client: x.Client,
stdout: stdout,
stderr: stderr,
}
e.staging.Root = e.path(objectsDir)
e.staging.Log = e.Log
e.Config = cfg
e.Manifest.Type = execDocker
e.Manifest.Created = time.Now()
e.cond = sync.NewCond(&e.mu)
return e
}
// TODO(marius): checksum the manifest file (and other state) to identify
// partial writes. (This is likely not a problem in this case since the JSON
// struct would be incomplete.)
// This could also be made more resilient by creating a backup file
// before saving the new state.
func (e *dockerExec) save(state execState) error {
if err := os.MkdirAll(e.path(), 0777); err != nil {
return err
}
path := e.path(manifestPath)
f, err := os.Create(path)
if err != nil {
return err
}
manifest := e.Manifest
manifest.State = state
if err := json.NewEncoder(f).Encode(manifest); err != nil {
os.Remove(path)
f.Close()
return err
}
f.Close()
return nil
}
// containerName returns the name of the container for
// this exec. It is uniquely determined by the exec's ID and directory.
func (e *dockerExec) containerName() string {
pathHex := reflow.Digester.FromString(e.path()).Short()
return fmt.Sprintf("reflow-%s-%s-%s", e.Executor.ID, e.id.Hex(), pathHex)
}
// create sets up the exec's filesystem layout environment and
// instantiates its container. It is not run. The arguments are
// materialized to a the 'arg' directory in the exec's run directory,
// and are passed into the container as /arg. The output object is
// placed in 'obj': the run directory is bound into the container as
// '/return', and $out is set to /return/obj. This arrangement permits
// for 'obj' to be either a file or a directory.
//
// We use Docker's host networking mode. In the future we'd like to
// disable networking altogether (except for special execs like those
// associated with interns and externs).
func (e *dockerExec) create(ctx context.Context) (execState, error) {
if _, err := e.client.ContainerInspect(ctx, e.containerName()); err == nil {
return execCreated, nil
} else if !client.IsErrContainerNotFound(err) {
return execInit, errors.E("ContainerInspect", e.containerName(), kind(err), err)
}
// TODO: it might be worthwhile doing image pulling as a separate state.
if err := e.Executor.ensureImage(ctx, e.Config.Image); err != nil {
return execInit, fmt.Errorf("failed to pull image %s: %s", e.Config.Image, err)
}
// Map the products to input arguments and volume bindings for
// the container. Currently we map the whole repository (named by
// the digest) and then include the cut in the arguments passed to
// the job.
args := make([]interface{}, len(e.Config.Args))
for i, iv := range e.Config.Args {
if iv.Out {
which := strconv.Itoa(iv.Index)
args[i] = path.Join("/return", which)
} else {
flat := iv.Fileset.Flatten()
argv := make([]string, len(flat))
for j, jv := range flat {
argPath := fmt.Sprintf("arg/%d/%d", i, j)
binds := map[string]digest.Digest{}
for path, file := range jv.Map {
binds[path] = file.ID
}
if err := e.repo.Materialize(e.path(argPath), binds); err != nil {
return execInit, err
}
argv[j] = "/" + argPath
}
args[i] = strings.Join(argv, " ")
}
}
// Set up temporary directory.
os.MkdirAll(e.path("tmp"), 0777)
os.MkdirAll(e.path("return"), 0777)
hostConfig := &container.HostConfig{
Binds: []string{
e.hostPath("arg") + ":/arg",
e.hostPath("tmp") + ":/tmp",
e.hostPath("return") + ":/return",
},
NetworkMode: container.NetworkMode("host"),
}
/* TODO: introduce strict mode for this
if mem := e.Config.Resources.Memory; mem > 0 {
hostConfig.Resources.Memory = int64(mem)
}
*/
env := []string{
"tmp=/tmp",
"TMPDIR=/tmp",
"HOME=/tmp",
}
if outputs := e.Config.OutputIsDir; outputs != nil {
for i, isdir := range outputs {
if isdir {
os.MkdirAll(e.path("return", strconv.Itoa(i)), 0777)
}
}
} else {
env = append(env, "out=/return/default")
}
// TODO(marius): this is a hack for Earl to use the AWS tool.
if e.Config.NeedAWSCreds {
creds, err := e.Executor.AWSCreds.Get()
if err != nil {
// We mark this as temporary, because most of the time it is.
// TODO(marius): can we get better error classification from
// the AWS SDK?
return execInit, errors.E("run", e.ID, errors.Temporary, err)
}
// TODO(marius): region?
env = append(env, "AWS_ACCESS_KEY_ID="+creds.AccessKeyID)
env = append(env, "AWS_SECRET_ACCESS_KEY="+creds.SecretAccessKey)
env = append(env, "AWS_SESSION_TOKEN="+creds.SessionToken)
}
config := &container.Config{
Image: e.Config.Image,
// We use a login shell here as many Docker images are configured
// with /root/.profile, etc.
Cmd: []string{"/bin/bash", "-e", "-l", "-o", "pipefail", "-c", fmt.Sprintf(e.Config.Cmd, args...)},
Env: env,
Labels: map[string]string{"reflow-id": e.id.Hex()},
User: dockerUser,
}
networkingConfig := &network.NetworkingConfig{}
if _, err := e.client.ContainerCreate(ctx, config, hostConfig, networkingConfig, e.containerName()); err != nil {
return execInit, errors.E(
"ContainerCreate",
kind(err),
e.containerName(),
fmt.Sprint(config), fmt.Sprint(hostConfig), fmt.Sprint(networkingConfig),
err,
)
}
return execCreated, nil
}
func scanLines(input io.ReadCloser, output *log.Logger) error {
r, w := io.Pipe()
go func() {
stdcopy.StdCopy(w, w, input)
w.Close()
}()
s := bufio.NewScanner(r)
for s.Scan() {
output.Print(s.Text())
}
return s.Err()
}
// start starts the container that's been set up by exec.create.
func (e *dockerExec) start(ctx context.Context) (execState, error) {
if err := e.client.ContainerStart(ctx, e.containerName(), types.ContainerStartOptions{}); err != nil {
return execCreated, errors.E("ContainerStart", e.containerName(), kind(err), err)
}
var err error
e.Docker, err = e.client.ContainerInspect(ctx, e.containerName())
if err != nil {
e.Log.Errorf("error inspecting container %q: %v", e.containerName(), err)
}
if e.stdout != nil {
rcStdout, err := e.client.ContainerLogs(ctx, e.containerName(),
types.ContainerLogsOptions{ShowStdout: true, Follow: true})
if err != nil {
e.Log.Errorf("docker.containerlogs %q: %v", e.containerName(), err)
} else {
go func() {
err := scanLines(rcStdout, e.stdout)
if err != nil {
log.Errorf("scanlines stdout: %v", err)
}
rcStdout.Close()
}()
}
}
if e.stderr != nil {
rcStderr, err := e.client.ContainerLogs(ctx, e.containerName(),
types.ContainerLogsOptions{ShowStderr: true, Follow: true})
if err != nil {
e.Log.Errorf("docker.containerlogs %q: %v", e.containerName(), err)
} else {
go func() {
err := scanLines(rcStderr, e.stderr)
if err != nil {
log.Errorf("scanlines stderr: %v", err)
}
rcStderr.Close()
}()
}
}
return execRunning, nil
}
// wait waits for the container complete and performs teardown:
// - save log files to the exec directory;
// - inspect the docker container and save its output to exec.Manifest.Docker;
// - install the results into the repository;
// - remove (de-link) the argument directory.
func (e *dockerExec) wait(ctx context.Context) (state execState, err error) {
// We start profiling here. Note that if the executor is restarted,
// and thus reattaches to the container, it will lose samples.
profc := make(chan stats)
profctx, cancelprof := context.WithCancel(ctx)
go func() {
stats, err := e.profile(profctx)
if err != nil {
e.Log.Errorf("profile: %v", err)
}
profc <- stats
}()
code, err := e.client.ContainerWait(ctx, e.containerName())
if err != nil {
return execInit, errors.E("ContainerWait", e.containerName(), kind(err), err)
}
// Best-effort writing of log files.
rc, err := e.client.ContainerLogs(
ctx, e.containerName(),
types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true})
if err == nil {
// TODO: these should be put into the repository.
stderr, err := os.Create(e.path("stderr"))
if err != nil {
e.Log.Errorf("failed to stderr log file %q: %s", e.path("stderr"), err)
stderr = nil
}
stdout, err := os.Create(e.path("stdout"))
if err != nil {
e.Log.Errorf("failed to stdout log file %q: %s", e.path("stdout"), err)
stdout = nil
}
_, err = stdcopy.StdCopy(stdout, stderr, rc)
if err != nil {
e.Log.Errorf("failed to copy stdout and stderr logs: %s", err)
}
rc.Close()
if stderr != nil {
stderr.Close()
}
if stdout != nil {
stdout.Close()
}
}
e.Docker, err = e.client.ContainerInspect(ctx, e.containerName())
if err != nil {
return execInit, errors.E("ContainerInspect", e.containerName(), kind(err), err)
}
// Docker can return inconsistent return codes between a ContainerWait and
// a ContainerInspect call. If either of these calls return a non zero exit code,
// we use that as the exit status.
if code == 0 && e.Docker.State.ExitCode != 0 {
code = e.Docker.State.ExitCode
}
// Retrieve the profile before we clean up the results.
cancelprof()
e.Manifest.Stats = <-profc
finishedAt, err := time.Parse(time.RFC3339Nano, e.Docker.State.FinishedAt)
if err != nil {
return execInit, errors.E(errors.Invalid, errors.Errorf("parsing docker time %s: %v", e.Docker.State.FinishedAt, err))
}
// The Docker daemon does not reliably report the container's exit
// status correctly, and, what's worse, ContainerWait can return
// successfully while the container is still running. This appears
// to happen during system shutdown (e.g., the Docker daemon is
// killed before Reflow) and also on system restart (Docker daemon
// restores container state from disk).
//
// We are not currently able to distinguish between a system restart
// and a successful exit.
//
// This appears to be fixed in Docker/Moby 1.13, but we are not yet
// ready to adopt this. See:
// https://github.com/moby/moby/issues/31262
//
// TODO(marius): either upgrade to Docker/Moby 1.13, or else add
// some sort of epoch detection (Docker isn't helpful here either,
// but system start time might be a good proxy.)
switch {
// ContainerWait returns while the container is in running state
// (explicitly, or without a finish time). This happens during
// system shutdown.
case e.Docker.State.Running || finishedAt.IsZero():
return execInit, errors.E(
"exec", e.id, errors.Temporary,
errors.New("container returned in running state; docker daemon likely shutting down"))
// The remaining appear to be true completions.
case code == 137 || e.Docker.State.OOMKilled:
e.Manifest.Result.Err = errors.Recover(errors.E("exec", e.id, errors.Temporary, errors.New("killed by the OOM killer")))
case code == 0:
if err := e.install(ctx); err != nil {
return execInit, err
}
default:
e.Manifest.Result.Err = errors.Recover(errors.E("exec", e.id, errors.Errorf("exited with code %d", code)))
}
// Clean up args. TODO(marius): replace these with symlinks to sha256s also?
if err := os.RemoveAll(e.path("arg")); err != nil {
e.Log.Errorf("failed to remove arg path: %v", err)
}
if err := os.RemoveAll(e.path("tmp")); err != nil {
e.Log.Errorf("failed to remove tmpdir: %v", err)
}
return execComplete, nil
}
// profile profiles the container and returns a profile when its
// context is cancelled or when the container stops.
func (e *dockerExec) profile(ctx context.Context) (stats, error) {
// Sample disk usage every minute.
// TODO(marius): perform a final disk usage check before returning
const diskPeriod = time.Minute
var (
lastDiskTime time.Time
stats = make(stats)
paths = map[string]string{"tmp": e.path("tmp"), "disk": e.path("return")}
)
resp, err := e.client.ContainerStats(ctx, e.containerName(), true /*stream*/)
if err != nil {
return nil, errors.E("ContainerStats", kind(err), err)
}
defer resp.Close()
dec := json.NewDecoder(resp)
gauges := make(reflow.Gauges)
for {
var v types.StatsJSON
if err := dec.Decode(&v); err != nil {
if err == io.EOF {
return stats, nil
}
dec = json.NewDecoder(io.MultiReader(dec.Buffered(), resp))
select {
case <-time.After(100 * time.Millisecond):
continue
case <-ctx.Done():
return stats, nil
}
}
var (
deltaCPU = float64(v.CPUStats.CPUUsage.TotalUsage - v.PreCPUStats.CPUUsage.TotalUsage)
deltaSys = float64(v.CPUStats.SystemUsage - v.PreCPUStats.SystemUsage)
// TODO(marius): switch to stats.CPUStats.OnlineCPUs once we update the
// Docker client.
ncpu = float64(len(v.CPUStats.CPUUsage.PercpuUsage))
)
if deltaSys > 0 {
// We compute the CPU time here by looking at the proportion of
// this container's CPU time to total system time. This is normalized
// and so needs to be multiplied by the number of CPUs to get a
// portable load number.
load := ncpu * deltaCPU / deltaSys
stats.Observe("cpu", load)
gauges["cpu"] = load
}
// We exclude page cache memory since this is not counted towards
// your limits.
mem := float64(v.MemoryStats.Usage - v.MemoryStats.Stats["cache"])
stats.Observe("mem", mem)
gauges["mem"] = mem
if time.Since(lastDiskTime) >= diskPeriod {
for k, path := range paths {
n, err := du(path)
if err != nil {
e.Log.Errorf("du %s: %v", path, err)
continue
}
gauges[k] = float64(n)
stats.Observe(k, float64(n))
}
lastDiskTime = time.Now()
}
e.Manifest.Gauges = gauges.Snapshot()
}
}
// Go runs the exec's state machine. It resumes from the saved state
// when possible; if no state exists, it begins from execUnstarted,
// and immediately transitions to execInit.
func (e *dockerExec) Go(ctx context.Context) {
os.MkdirAll(e.path(), 0777)
/*
if f, err := os.OpenFile(e.path("log"), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600); err != nil {
e.printf("failed to open local log file: %v", err)
} else {
e.logger = log.New(f, "", log.Llongfile|log.LstdFlags)
defer func() {
e.logger = nil
f.Close()
}()
}
*/
for state, err := e.getState(); err == nil && state != execComplete; e.setState(state, err) {
switch state {
case execUnstarted:
state = execInit
case execInit:
state, err = e.create(ctx)
case execCreated:
state, err = e.start(ctx)
case execRunning:
state, err = e.wait(ctx)
default:
panic("bug")
}
if err == nil {
err = e.save(state)
}
if state == execComplete {
if err := e.client.ContainerRemove(context.Background(), e.containerName(), types.ContainerRemoveOptions{}); err != nil {
e.Log.Errorf("failed to remove container %s: %s", e.containerName(), err)
}
}
}
}
// Logs returns the stdout and/or stderr log files. Logs returns live
// logs from the Docker daemon if the exec is still running;
// otherwise the saved logs are returned.
//
// Note that this is a bit racy (e.g., we could switch states between
// the state check and acting on that state here); but we don't worry
// too much about it, as this is used for diagnostics purposes, and
// can easily be retried.
func (e *dockerExec) Logs(ctx context.Context, stdout, stderr, follow bool) (io.ReadCloser, error) {
state, err := e.getState()
if err != nil {
return nil, err
}
if !stdout && !stderr {
return nil, errors.Errorf("logs %v %v %v: must specify at least one of stdout, stderr", e.id, stdout, stderr)
}
switch state {
case execUnstarted, execInit, execCreated:
return nil, errors.Errorf("logs %v %v %v: exec not yet started", e.id, stdout, stderr)
case execRunning:
// Note that this is technically racy (we may be competing with the completion
// routine), but since this is for user interaction, it's probably not a big deal.
opts := types.ContainerLogsOptions{ShowStdout: stdout, ShowStderr: stderr, Follow: follow}
rc, err := e.client.ContainerLogs(ctx, e.containerName(), opts)
if err != nil {
return nil, errors.E("ContainerLogs", e.containerName, fmt.Sprint(opts), kind(err), err)
}
r, w := io.Pipe()
go func() {
stdcopy.StdCopy(w, w, rc)
w.Close()
}()
return newAllCloser(r, rc), nil
case execComplete:
// This doesn't really make sense for materialized logs. When
// querying a live Docker container, we get interleaved log lines;
// here we simply concatenate stderr to stdout. Since we cannot
// stay true to this interface, we should perhaps permit only one
// log file to be retrieved at a time.
var files []*os.File
if stdout {
file, err := os.Open(e.path("stdout"))
if err != nil {
return nil, err
}
files = append(files, file)
}
if stderr {
file, err := os.Open(e.path("stderr"))
if err != nil {
return nil, err
}
files = append(files, file)
}
readers := make([]io.Reader, len(files))
closers := make([]io.Closer, len(files))
for i, f := range files {
readers[i] = f
closers[i] = f
}
return newAllCloser(io.MultiReader(readers...), closers...), nil
}
panic("bug")
}
func (e *dockerExec) Shell(ctx context.Context) (io.ReadWriteCloser, error) {
state, err := e.getState()
if err != nil {
return nil, err
}
switch state {
case execRunning:
c := types.ExecConfig{
Cmd: []string{"/bin/bash"},
AttachStdin: true,
AttachStdout: true,
AttachStderr: true,
Tty: true,
DetachKeys: "ctrl-p,ctrl-q",
}
response, err := e.client.ContainerExecCreate(ctx, e.containerName(), c)
if err != nil {
return nil, err
}
conn, err := e.client.ContainerExecAttach(ctx, response.ID, c)
if err != nil {
return nil, err
}
return conn.Conn, nil
default:
return nil, errors.New("cannot shell into a non-running exec")
}
}
// Inspect returns the current state of the exec.
func (e *dockerExec) Inspect(ctx context.Context) (reflow.ExecInspect, error) {
inspect := reflow.ExecInspect{
Created: e.Manifest.Created,
Config: e.Config,
Docker: e.Docker,
Profile: e.Manifest.Stats.Profile(),
Gauges: e.Manifest.Gauges,
}
state, err := e.getState()
if err != nil {
inspect.Error = errors.Recover(err)
}
switch state {
case execUnstarted, execInit:
inspect.State = "initializing"
inspect.Status = "the exec is still initializing"
case execCreated:
inspect.State = "created"
inspect.Status = "the exec container was created"
case execRunning:
top, err := e.client.ContainerTop(ctx, e.containerName(), []string{"auwx"})
if err != nil {
e.Log.Errorf("top %s: %v", e.containerName(), err)
} else {
var i int
for ; i < len(top.Titles); i++ {
if top.Titles[i] == "COMMAND" {
break
}
}
if i != len(top.Titles) {
inspect.Commands = make([]string, len(top.Processes))
for j, proc := range top.Processes {
inspect.Commands[j] = proc[i]
}
}
}
inspect.State = "running"
inspect.Status = "the exec container is running"
case execComplete:
inspect.State = "complete"
inspect.Status = "the exec container has completed"
}
return inspect, nil
}
// Value returns the value computed by the exec.
func (e *dockerExec) Result(ctx context.Context) (reflow.Result, error) {
state, err := e.getState()
if err != nil {
return reflow.Result{}, err
}
if state != execComplete {
return reflow.Result{}, errors.Errorf("result %v: exec not complete", e.id)
}
return e.Manifest.Result, nil
}
func (e *dockerExec) Promote(ctx context.Context) error {
return e.repo.Vacuum(ctx, &e.staging)
}
// Kill kills the exec's container and removes it entirely.
func (e *dockerExec) Kill(ctx context.Context) error {
e.client.ContainerKill(ctx, e.containerName(), "KILL")
if err := e.Wait(ctx); err != nil {
return err
}
return os.RemoveAll(e.path())
}
// WaitUntil returns when the object state reaches at least min, or
// an error occurs.
func (e *dockerExec) WaitUntil(min execState) error {
e.mu.Lock()
for e.State < min && e.err == nil {
e.cond.Wait()
}
e.mu.Unlock()
return e.err
}
// Wait waits until the exec reaches completion.
func (e *dockerExec) Wait(ctx context.Context) error {
return e.WaitUntil(execComplete)
}
// URI returns a URI For this exec based on its executor's URI.
func (e *dockerExec) URI() string { return e.Executor.URI() + "/" + e.id.Hex() }
// ID returns this exec's ID.
func (e *dockerExec) ID() digest.Digest { return e.id }
// path constructs a path in the exec's directory.
func (e *dockerExec) path(elems ...string) string {
return e.Executor.execPath(e.id, elems...)
}
// path constructs a host path in the exec's directory.
func (e *dockerExec) hostPath(elems ...string) string {
return e.Executor.execHostPath(e.id, elems...)
}
// setState sets the current state and error. It broadcasts
// on the exec's condition variable to wake up all waiters.
func (e *dockerExec) setState(state execState, err error) {
e.mu.Lock()
e.State = state
e.err = err
e.cond.Broadcast()
e.mu.Unlock()
}
// getState returns the current state of the exec.
func (e *dockerExec) getState() (execState, error) {
e.mu.Lock()
defer e.mu.Unlock()
return e.State, e.err
}
// install installs the exec's result object into the repository.
// install removes the original copy of each object, replacing it
// with a symlink to the digest of that object; this is to aid with
// debugging.
func (e *dockerExec) install(ctx context.Context) error {
if e.Manifest.Result.Fileset.Map != nil || e.Manifest.Result.Fileset.List != nil {
return nil
}
if outputs := e.Config.OutputIsDir; outputs != nil {
e.Manifest.Result.Fileset.List = make([]reflow.Fileset, len(outputs))
for i := range outputs {
var err error
e.Manifest.Result.Fileset.List[i], err =
e.Executor.install(ctx, e.path("return", strconv.Itoa(i)), true, &e.staging)
if err != nil {
return err
}
}
return nil
}
var err error
e.Manifest.Result.Fileset, err = e.Executor.install(ctx, e.path("return", "default"), true, &e.staging)
return err
}
// allCloser defines a io.ReadCloser over a number of a reader
// and multiple closers.
type allCloser struct {
io.Reader
closers []io.Closer
}
func newAllCloser(r io.Reader, closers ...io.Closer) io.ReadCloser {
return &allCloser{r, closers}
}
func (c *allCloser) Close() error {
var err error
for _, c := range c.closers {
if e := c.Close(); e != nil {
err = e
}
}
return err
}
// Kind returns the kind of a docker error.
func kind(err error) errors.Kind {
switch {
case client.IsErrNotFound(err):
return errors.NotExist
case client.IsErrUnauthorized(err):
return errors.NotAllowed
default:
// Liberally pick unavailable as the default error, so that lower
// layers can retry errors that may be fruitfully retried.
// This is always safe to do, but may cause extra work.
return errors.Unavailable
}
}