forked from moby/moby
-
Notifications
You must be signed in to change notification settings - Fork 0
/
streams.go
109 lines (91 loc) · 3.33 KB
/
streams.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package runconfig
import (
"fmt"
"io"
"io/ioutil"
"strings"
"sync"
"github.com/docker/docker/pkg/broadcaster"
"github.com/docker/docker/pkg/ioutils"
)
// StreamConfig holds information about I/O streams managed together.
//
// streamConfig.StdinPipe returns a WriteCloser which can be used to feed data
// to the standard input of the streamConfig's active process.
// streamConfig.StdoutPipe and streamConfig.StderrPipe each return a ReadCloser
// which can be used to retrieve the standard output (and error) generated
// by the container's active process. The output (and error) are actually
// copied and delivered to all StdoutPipe and StderrPipe consumers, using
// a kind of "broadcaster".
type StreamConfig struct {
sync.WaitGroup
stdout *broadcaster.Unbuffered
stderr *broadcaster.Unbuffered
stdin io.ReadCloser
stdinPipe io.WriteCloser
}
// NewStreamConfig creates a stream config and initializes
// the standard err and standard out to new unbuffered broadcasters.
func NewStreamConfig() *StreamConfig {
return &StreamConfig{
stderr: new(broadcaster.Unbuffered),
stdout: new(broadcaster.Unbuffered),
}
}
// Stdout returns the standard output in the configuration.
func (streamConfig *StreamConfig) Stdout() *broadcaster.Unbuffered {
return streamConfig.stdout
}
// Stderr returns the standard error in the configuration.
func (streamConfig *StreamConfig) Stderr() *broadcaster.Unbuffered {
return streamConfig.stderr
}
// Stdin returns the standard input in the configuration.
func (streamConfig *StreamConfig) Stdin() io.ReadCloser {
return streamConfig.stdin
}
// StdinPipe returns an input writer pipe as an io.WriteCloser.
func (streamConfig *StreamConfig) StdinPipe() io.WriteCloser {
return streamConfig.stdinPipe
}
// StdoutPipe creates a new io.ReadCloser with an empty bytes pipe.
// It adds this new out pipe to the Stdout broadcaster.
func (streamConfig *StreamConfig) StdoutPipe() io.ReadCloser {
bytesPipe := ioutils.NewBytesPipe(nil)
streamConfig.stdout.Add(bytesPipe)
return bytesPipe
}
// StderrPipe creates a new io.ReadCloser with an empty bytes pipe.
// It adds this new err pipe to the Stderr broadcaster.
func (streamConfig *StreamConfig) StderrPipe() io.ReadCloser {
bytesPipe := ioutils.NewBytesPipe(nil)
streamConfig.stderr.Add(bytesPipe)
return bytesPipe
}
// NewInputPipes creates new pipes for both standard inputs, Stdin and StdinPipe.
func (streamConfig *StreamConfig) NewInputPipes() {
streamConfig.stdin, streamConfig.stdinPipe = io.Pipe()
}
// NewNopInputPipe creates a new input pipe that will silently drop all messages in the input.
func (streamConfig *StreamConfig) NewNopInputPipe() {
streamConfig.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard)
}
// CloseStreams ensures that the configured streams are properly closed.
func (streamConfig *StreamConfig) CloseStreams() error {
var errors []string
if streamConfig.stdin != nil {
if err := streamConfig.stdin.Close(); err != nil {
errors = append(errors, fmt.Sprintf("error close stdin: %s", err))
}
}
if err := streamConfig.stdout.Clean(); err != nil {
errors = append(errors, fmt.Sprintf("error close stdout: %s", err))
}
if err := streamConfig.stderr.Clean(); err != nil {
errors = append(errors, fmt.Sprintf("error close stderr: %s", err))
}
if len(errors) > 0 {
return fmt.Errorf(strings.Join(errors, "\n"))
}
return nil
}