/
stream.go
101 lines (86 loc) · 2.2 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package runtime
import (
"context"
"fmt"
"io"
"github.com/docker/docker/api/types"
"github.com/docker/docker/pkg/stdcopy"
log "github.com/hashicorp/go-hclog"
"github.com/wabenet/dodo-core/pkg/ioutil"
"github.com/wabenet/dodo-core/pkg/plugin"
)
type ContainerStream struct {
hasTTY bool
hijack types.HijackedResponse
stdin io.Reader
stdout io.Writer
stderr io.Writer
}
type closeWriter interface {
CloseWrite() error
}
func (c *ContainerRuntime) AttachContainer(
ctx context.Context, id string, stream *plugin.StreamConfig,
) (*ContainerStream, func(), error) {
client, err := c.ensureClient()
if err != nil {
return nil, nil, err
}
config, err := client.ContainerInspect(ctx, id)
if err != nil {
return nil, nil, fmt.Errorf("could not inspect container: %w", err)
}
attach, err := client.ContainerAttach(
ctx,
id,
types.ContainerAttachOptions{
Stream: true,
Stdin: true,
Stdout: true,
Stderr: true,
Logs: true,
},
)
if err != nil {
return nil, nil, fmt.Errorf("could not attach to container: %w", err)
}
inContext, cancel := context.WithCancel(context.Background())
inReader := ioutil.NewCancelableReader(inContext, stream.Stdin)
return &ContainerStream{
hasTTY: config.Config.Tty,
hijack: attach,
stdin: inReader,
stdout: stream.Stdout,
stderr: stream.Stderr,
}, cancel, nil
}
func (s *ContainerStream) CopyOutput() error {
if s.hasTTY {
if _, err := io.Copy(s.stdout, s.hijack.Reader); err != nil {
log.L().Warn("could not copy container output", "error", err)
}
} else {
if _, err := stdcopy.StdCopy(s.stdout, s.stderr, s.hijack.Reader); err != nil {
log.L().Warn("could not copy container output", "error", err)
}
}
return nil
}
func (s *ContainerStream) CopyInput() error {
defer func() {
cw, ok := s.hijack.Conn.(closeWriter)
if ok {
if err := cw.CloseWrite(); err != nil {
log.L().Warn("could not close streaming connection", "error", err)
}
} else {
if err := s.hijack.Conn.Close(); err != nil {
log.L().Warn("could not close streaming connection", "error", err)
}
}
}()
if _, err := io.Copy(s.hijack.Conn, s.stdin); err != nil {
log.L().Error("could not copy container input", "error", err)
}
return nil
}