/
container.go
158 lines (143 loc) · 4.22 KB
/
container.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
/* Copyright (c) 2017, Samuel Karp. All rights reserved.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package main
import (
"bufio"
"context"
"io"
"strings"
"sync"
"unicode"
log "github.com/cihub/seelog"
docker "github.com/fsouza/go-dockerclient"
"github.com/pkg/errors"
)
// #cgo pkg-config: purple
// #include "account.h"
// #include "blist.h"
import "C"
type Container struct {
ctx context.Context
cancel context.CancelFunc
id string
name string
interactive bool // interactive containers have "attach", noninteractive have "exec"
account *Account
purpleBuddy *C.PurpleBuddy
lock sync.RWMutex
stdin io.Writer
stdout io.Reader
stderr io.Reader
}
// TODO figure out some decent behavior here; note that a PurpleBuddy should
// only be allocated on the event loop thread.
func (account *Account) NewContainer(dockerContainer *docker.Container) *Container {
name := dockerContainer.Name[1:] // strip the / that Docker puts as the first character
interactive := dockerContainer.Config.OpenStdin
containerCtx, cancel := context.WithCancel(account.ctx)
container := &Container{
ctx: containerCtx,
cancel: cancel,
id: dockerContainer.ID,
name: name,
interactive: interactive,
account: account,
}
account.lock.Lock()
account.containers[name] = container
account.lock.Unlock()
account.enqueueFunction(func() {
container.lock.Lock()
defer container.lock.Unlock()
container.purpleBuddy = account.addBuddyUnsafe(name, "containers", false)
})
return container
}
func (container *Container) Attach() {
container.lock.Lock()
defer container.lock.Unlock()
if container.interactive {
log.Debugf("attaching to interactive container %s", container.name)
// hook 'em up
stdinReader, stdinWriter := io.Pipe()
stdoutReader, stdoutWriter := io.Pipe()
stderrReader, stderrWriter := io.Pipe()
container.stdin = stdinWriter
container.stdout = stdoutReader
container.stderr = stderrReader
go container.monitor(stdinReader, stdoutWriter, stderrWriter)
go container.receive()
}
}
// monitor attaches to a container, hooking up stdin, stdout, and stderr
func (container *Container) monitor(stdin io.Reader, stdout io.Writer, stderr io.Writer) {
client, ok := container.account.getDockerClient()
if !ok {
return
}
err := client.AttachToContainer(docker.AttachToContainerOptions{
Container: container.id,
InputStream: stdin,
OutputStream: stdout,
ErrorStream: stderr,
Stdin: true,
Stdout: true,
Stderr: true,
Stream: true,
})
if err != nil {
log.Error(err)
}
log.Debugf("detached from container %s", container.name)
}
// receive reads stdout and stderr to send them to the account
func (container *Container) receive() {
stdout := bufio.NewReader(container.stdout)
stderr := bufio.NewReader(container.stderr)
stdoutChan := readStringChan(stdout)
stderrChan := readStringChan(stderr)
log.Debugf("receiving from container %s", container.name)
for {
select {
case <-container.ctx.Done():
return
case stdoutLine := <-stdoutChan:
container.account.ReceiveIM(container.name, stdoutLine)
case stderrLine := <-stderrChan:
container.account.ReceiveIM(container.name, stderrLine)
}
}
}
func readStringChan(reader *bufio.Reader) <-chan string {
channel := make(chan string)
go func() {
for {
line, err := reader.ReadString('\n')
if err != nil {
log.Trace("detaching from stream")
if err != io.EOF {
log.Error("stream ended unexpectedly")
}
close(channel)
}
channel <- strings.TrimRightFunc(line, unicode.IsSpace)
}
}()
return channel
}
// ToStdinAttached writes to the stdin of the attached container
func (container *Container) ToStdinAttached(content string) error {
if !container.interactive {
return errors.Errorf("stdin: container not interactive: %s", container.name)
}
toWrite := content + "\n"
written, err := container.stdin.Write([]byte(toWrite))
if written != len(toWrite) {
return errors.Errorf("stdin: could not write bytes: %s (%d written)", container.name, written)
}
return err
}