forked from hughker/wercker
/
session.go
338 lines (303 loc) · 9.39 KB
/
session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
// Copyright 2016 Wercker Holding BV
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package core
import (
"bytes"
"fmt"
"io"
"strings"
"time"
"github.com/pborman/uuid"
"github.com/wercker/wercker/util"
"golang.org/x/net/context"
)
// Receiver is for reading from our session
type Receiver struct {
queue chan string
}
// NewReceiver returns a new channel-based io.Writer
func NewReceiver(queue chan string) *Receiver {
return &Receiver{queue: queue}
}
// Write writes to a channel
func (r *Receiver) Write(p []byte) (int, error) {
buf := bytes.NewBuffer(p)
r.queue <- buf.String()
return buf.Len(), nil
}
// Sender is for sending to our session
type Sender struct {
queue chan string
}
// NewSender gives us a new channel-based io.Reader
func NewSender(queue chan string) *Sender {
return &Sender{queue: queue}
}
// Read reads from a channel
func (s *Sender) Read(p []byte) (int, error) {
send := <-s.queue
i := copy(p, []byte(send))
return i, nil
}
// Transport interface for talking to containervisors
type Transport interface {
Attach(context.Context, io.Reader, io.Writer, io.Writer) (context.Context, error)
}
// Session is our way to interact with the docker container
type Session struct {
options *PipelineOptions
transport Transport
logsHidden bool
send chan string
recv chan string
exit chan int
logger *util.LogEntry
}
// NewSession returns a new interactive session to a container.
func NewSession(options *PipelineOptions, transport Transport) *Session {
logger := util.RootLogger().WithField("Logger", "Session")
return &Session{
options: options,
transport: transport,
logsHidden: false,
logger: logger,
}
}
func (s *Session) Transport() interface{} {
return s.transport
}
func (s *Session) Recv() chan string {
return s.recv
}
// Attach us to our container and set up read and write queues.
// Returns a context object for the transport so we can propagate cancels
// on errors and closed connections.
func (s *Session) Attach(runnerCtx context.Context) (context.Context, error) {
recv := make(chan string)
outputStream := NewReceiver(recv)
s.recv = recv
send := make(chan string)
inputStream := NewSender(send)
s.send = send
// We treat the transport context as the session context everywhere
return s.transport.Attach(runnerCtx, inputStream, outputStream, outputStream)
}
// HideLogs will emit Logs with args.Hidden set to true
func (s *Session) HideLogs() {
s.logsHidden = true
}
// ShowLogs will emit Logs with args.Hidden set to false
func (s *Session) ShowLogs() {
s.logsHidden = false
}
// Send an array of commands.
func (s *Session) Send(sessionCtx context.Context, forceHidden bool, commands ...string) error {
e, err := EmitterFromContext(sessionCtx)
if err != nil {
return err
}
// Do a quick initial check whether we have a valid session first
select {
case <-sessionCtx.Done():
s.logger.Errorln("Session finished before sending commands:", commands)
return sessionCtx.Err()
// Wait because if both cases are available golang will pick one randomly
case <-time.After(1 * time.Millisecond):
// Pass
}
for i := range commands {
command := commands[i] + "\n"
select {
case <-sessionCtx.Done():
s.logger.Errorln("Session finished before sending command:", command)
return sessionCtx.Err()
case s.send <- command:
hidden := s.logsHidden
if forceHidden {
hidden = forceHidden
}
e.Emit(Logs, &LogsArgs{
Hidden: hidden,
Stream: "stdin",
Logs: command,
})
}
}
return nil
}
var randomSentinel = func() string {
return uuid.NewRandom().String()
}
// CommandResult exists so that we can make a channel of them
type CommandResult struct {
exit int
recv []string
err error
}
func checkLine(line, sentinel string) (bool, int) {
if !strings.HasPrefix(line, sentinel) {
return false, -999
}
var rand string
var exit int
_, err := fmt.Sscanf(line, "%s %d\n", &rand, &exit)
if err != nil {
return false, -999
}
return true, exit
}
// smartSplitLines tries really hard to make sure our sentinel string
// ends up on its own line
func smartSplitLines(line, sentinel string) []string {
// NOTE(termie): we have to do some string mangling here to find the
// sentinel when stuff manages to squeeze it on to the
// same logical output line, it isn't pretty and makes
// me sad
lines := []string{}
splitLines := strings.Split(line, "\n")
// If the line at least ends with a newline
if len(splitLines) > 1 {
// Check the second to last element
// (the newline at the end makes an empty final element)
possibleSentinel := splitLines[len(splitLines)-2]
// And we expect a newline at the end
possibleSentinel = fmt.Sprintf("%s\n", possibleSentinel)
foundExit, _ := checkLine(possibleSentinel, sentinel)
// If we found the exit code, make sure it gets read as a separate line
if foundExit {
// If we weren't the only line to begin with, add the rest
if len(splitLines) > 2 {
otherLines := strings.Join(splitLines[:len(splitLines)-2], "\n")
otherLines = fmt.Sprintf("%s\n", otherLines)
lines = append(lines, otherLines)
}
// Add the line we split off
lines = append(lines, possibleSentinel)
} else {
// Otherwise if no exit was found just return the whole thing
lines = append(lines, line)
}
} else {
lines = append(lines, line)
}
return lines
}
// SendChecked sends commands, waits for them to complete and returns the
// exit status and output
// Ways to know a command is done:
// [x] We received the sentinel echo
// [x] The container has exited and we've exhausted the incoming data
// [x] The session has closed and we've exhaused the incoming data
// [x] The command has timed out
// Ways for a command to be successful:
// [x] We received the sentinel echo with exit code 0
func (s *Session) SendChecked(sessionCtx context.Context, commands ...string) (int, []string, error) {
e, err := EmitterFromContext(sessionCtx)
if err != nil {
return -1, []string{}, err
}
recv := []string{}
sentinel := randomSentinel()
sendCtx, _ := context.WithTimeout(sessionCtx, time.Duration(s.options.CommandTimeout)*time.Millisecond)
commandComplete := make(chan CommandResult)
// Signal channel to tell the reader to stop reading, this lets us
// keep it reading for a small amount of time after we know something
// has gone wrong, otherwise it misses some error messages.
stopReading := make(chan struct{}, 1)
// This is our main waiter, it will get an exit code, an error or a timeout
// and then complete the command, anything
exitChan := make(chan int)
errChan := make(chan error)
go func() {
select {
// We got an exit code because we got our sentinel, let's skiddaddle
case exit := <-exitChan:
err = nil
if exit != 0 {
err = fmt.Errorf("Command exited with exit code: %d", exit)
}
commandComplete <- CommandResult{exit: exit, recv: recv, err: err}
case err = <-errChan:
commandComplete <- CommandResult{exit: -1, recv: recv, err: err}
case <-sendCtx.Done():
// We timed out or something closed, try to read in the rest of the data
// over the next 100 milliseconds and then return
<-time.After(time.Duration(100) * time.Millisecond)
// close(stopReading)
stopReading <- struct{}{}
commandComplete <- CommandResult{exit: -1, recv: recv, err: sendCtx.Err()}
}
}()
// If we don't get a response in a certain amount of time, timeout
noResponseTimeout := make(chan struct{})
go func() {
for {
select {
case <-noResponseTimeout:
continue
case <-time.After(time.Duration(s.options.NoResponseTimeout) * time.Millisecond):
stopReading <- struct{}{}
errChan <- fmt.Errorf("Command timed out after no response")
return
}
}
}()
// Read in data until we get our sentinel or are asked to stop
go func() {
for {
select {
case line := <-s.recv:
// If we found a line reset the NoResponseTimeout timer
noResponseTimeout <- struct{}{}
lines := smartSplitLines(line, sentinel)
for _, subline := range lines {
// subline = fmt.Sprintf("%s\n", subline)
// If we found the exit code, we're done
foundExit, exit := checkLine(subline, sentinel)
if foundExit {
e.Emit(Logs, &LogsArgs{
Hidden: true,
Logs: subline,
})
exitChan <- exit
return
}
e.Emit(Logs, &LogsArgs{
Hidden: s.logsHidden,
Logs: subline,
})
recv = append(recv, subline)
}
case <-stopReading:
return
}
}
}()
err = s.Send(sessionCtx, false, commands...)
if err != nil {
return -1, []string{}, err
}
err = s.Send(sessionCtx, true, fmt.Sprintf("echo %s $?", sentinel))
if err != nil {
return -1, []string{}, err
}
r := <-commandComplete
// Pretty up the error messages
if r.err == context.DeadlineExceeded {
r.err = fmt.Errorf("Command timed out")
} else if r.err == context.Canceled {
r.err = fmt.Errorf("Command cancelled due to error")
}
return r.exit, r.recv, r.err
}