-
Notifications
You must be signed in to change notification settings - Fork 242
/
exec_hcs.go
522 lines (483 loc) · 14.8 KB
/
exec_hcs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
package main
import (
"context"
"strings"
"sync"
"time"
"github.com/Microsoft/hcsshim/internal/cmd"
"github.com/Microsoft/hcsshim/internal/cow"
"github.com/Microsoft/hcsshim/internal/guestrequest"
"github.com/Microsoft/hcsshim/internal/log"
"github.com/Microsoft/hcsshim/internal/signals"
"github.com/Microsoft/hcsshim/internal/uvm"
"github.com/Microsoft/hcsshim/osversion"
eventstypes "github.com/containerd/containerd/api/events"
containerd_v1_types "github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/v2/task"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
"golang.org/x/sys/windows"
)
const (
// processStopTimeout is the amount of time after signaling the process with
// a signal expected to kill the process that the exec must wait before
// forcibly terminating the process.
//
// For example, sending a SIGKILL is expected to kill a process. If the
// process does not stop within `processStopTimeout` we will forcibly
// terminate the process without a signal.
processStopTimeout = time.Second * 5
)
// newHcsExec creates an exec to track the lifetime of `spec` in `c` which is
// actually created on the call to `Start()`. If `id==tid` then this is the init
// exec and the exec will also start `c` on the call to `Start()` before execing
// the process `spec.Process`.
func newHcsExec(
ctx context.Context,
events publisher,
tid string,
host *uvm.UtilityVM,
c cow.Container,
id, bundle string,
isWCOW bool,
spec *specs.Process,
io cmd.UpstreamIO) shimExec {
log.G(ctx).WithFields(logrus.Fields{
"tid": tid,
"eid": id, // Init exec ID is always same as Task ID
"bundle": bundle,
"wcow": isWCOW,
}).Debug("newHcsExec")
he := &hcsExec{
events: events,
tid: tid,
host: host,
c: c,
id: id,
bundle: bundle,
isWCOW: isWCOW,
spec: spec,
io: io,
processDone: make(chan struct{}),
state: shimExecStateCreated,
exitStatus: 255, // By design for non-exited process status.
exited: make(chan struct{}),
}
go he.waitForContainerExit()
return he
}
var _ = (shimExec)(&hcsExec{})
type hcsExec struct {
events publisher
// tid is the task id of the container hosting this process.
//
// This MUST be treated as read only in the lifetime of the exec.
tid string
// host is the hosting VM for `c`. If `host==nil` this exec MUST be a
// process isolated WCOW exec.
//
// This MUST be treated as read only in the lifetime of the exec.
host *uvm.UtilityVM
// c is the hosting container for this exec.
//
// This MUST be treated as read only in the lifetime of the exec.
c cow.Container
// id is the id of this process.
//
// This MUST be treated as read only in the lifetime of the exec.
id string
// bundle is the on disk path to the folder containing the `process.json`
// describing this process. If `id==tid` the process is described in the
// `config.json`.
//
// This MUST be treated as read only in the lifetime of the exec.
bundle string
// isWCOW is set to `true` when this process is part of a Windows OCI spec.
//
// This MUST be treated as read only in the lifetime of the exec.
isWCOW bool
// spec is the OCI Process spec that was passed in at create time. This is
// stored because we don't actually create the process until the call to
// `Start`.
//
// This MUST be treated as read only in the lifetime of the exec.
spec *specs.Process
// io is the upstream io connections used for copying between the upstream
// io and the downstream io. The upstream IO MUST already be connected at
// create time in order to be valid.
//
// This MUST be treated as read only in the lifetime of the exec.
io cmd.UpstreamIO
processDone chan struct{}
processDoneOnce sync.Once
// sl is the state lock that MUST be held to safely read/write any of the
// following members.
sl sync.Mutex
state shimExecState
pid int
exitStatus uint32
exitedAt time.Time
p *cmd.Cmd
// exited is a wait block which waits async for the process to exit.
exited chan struct{}
exitedOnce sync.Once
}
func (he *hcsExec) ID() string {
return he.id
}
func (he *hcsExec) Pid() int {
he.sl.Lock()
defer he.sl.Unlock()
return he.pid
}
func (he *hcsExec) State() shimExecState {
he.sl.Lock()
defer he.sl.Unlock()
return he.state
}
func (he *hcsExec) Status() *task.StateResponse {
he.sl.Lock()
defer he.sl.Unlock()
var s containerd_v1_types.Status
switch he.state {
case shimExecStateCreated:
s = containerd_v1_types.StatusCreated
case shimExecStateRunning:
s = containerd_v1_types.StatusRunning
case shimExecStateExited:
s = containerd_v1_types.StatusStopped
}
return &task.StateResponse{
ID: he.tid,
ExecID: he.id,
Bundle: he.bundle,
Pid: uint32(he.pid),
Status: s,
Stdin: he.io.StdinPath(),
Stdout: he.io.StdoutPath(),
Stderr: he.io.StderrPath(),
Terminal: he.io.Terminal(),
ExitStatus: he.exitStatus,
ExitedAt: he.exitedAt,
}
}
func (he *hcsExec) startInternal(ctx context.Context, initializeContainer bool) (err error) {
he.sl.Lock()
defer he.sl.Unlock()
if he.state != shimExecStateCreated {
return newExecInvalidStateError(he.tid, he.id, he.state, "start")
}
defer func() {
if err != nil {
he.exitFromCreatedL(ctx, 1)
}
}()
if initializeContainer {
err = he.c.Start(ctx)
if err != nil {
return err
}
defer func() {
if err != nil {
he.c.Terminate(ctx)
he.c.Close()
}
}()
}
cmd := &cmd.Cmd{
Host: he.c,
Stdin: he.io.Stdin(),
Stdout: he.io.Stdout(),
Stderr: he.io.Stderr(),
Log: log.G(ctx).WithFields(logrus.Fields{
"tid": he.tid,
"eid": he.id,
}),
CopyAfterExitTimeout: time.Second * 1,
}
if he.isWCOW || he.id != he.tid {
// An init exec passes the process as part of the config. We only pass
// the spec if this is a true exec.
cmd.Spec = he.spec
}
err = cmd.Start()
if err != nil {
return err
}
he.p = cmd
// Assign the PID and transition the state.
he.pid = he.p.Process.Pid()
he.state = shimExecStateRunning
// Publish the task/exec start event. This MUST happen before waitForExit to
// avoid publishing the exit previous to the start.
if he.id != he.tid {
he.events.publishEvent(
ctx,
runtime.TaskExecStartedEventTopic,
&eventstypes.TaskExecStarted{
ContainerID: he.tid,
ExecID: he.id,
Pid: uint32(he.pid),
})
} else {
he.events.publishEvent(
ctx,
runtime.TaskStartEventTopic,
&eventstypes.TaskStart{
ContainerID: he.tid,
Pid: uint32(he.pid),
})
}
// wait in the background for the exit.
go he.waitForExit()
return nil
}
func (he *hcsExec) Start(ctx context.Context) (err error) {
// If he.id == he.tid then this is the init exec.
// We need to initialize the container itself before starting this exec.
return he.startInternal(ctx, he.id == he.tid)
}
func (he *hcsExec) Kill(ctx context.Context, signal uint32) error {
he.sl.Lock()
defer he.sl.Unlock()
switch he.state {
case shimExecStateCreated:
he.exitFromCreatedL(ctx, 1)
return nil
case shimExecStateRunning:
supported := false
if osversion.Get().Build >= osversion.RS5 {
supported = he.host == nil || he.host.SignalProcessSupported()
}
var options interface{}
var err error
if he.isWCOW {
var opt *guestrequest.SignalProcessOptionsWCOW
opt, err = signals.ValidateWCOW(int(signal), supported)
if opt != nil {
options = opt
}
} else {
var opt *guestrequest.SignalProcessOptionsLCOW
opt, err = signals.ValidateLCOW(int(signal), supported)
if opt != nil {
options = opt
}
}
if err != nil {
return errors.Wrapf(errdefs.ErrFailedPrecondition, "signal %d: %v", signal, err)
}
var delivered bool
if supported && options != nil {
delivered, err = he.p.Process.Signal(ctx, options)
} else {
// legacy path before signals support OR if WCOW with signals
// support needs to issue a terminate.
delivered, err = he.p.Process.Kill(ctx)
}
if err != nil {
return err
}
if !delivered {
return errors.Wrapf(errdefs.ErrNotFound, "exec: '%s' in task: '%s' not found", he.id, he.tid)
}
return nil
case shimExecStateExited:
return errors.Wrapf(errdefs.ErrNotFound, "exec: '%s' in task: '%s' not found", he.id, he.tid)
default:
return newExecInvalidStateError(he.tid, he.id, he.state, "kill")
}
}
func (he *hcsExec) ResizePty(ctx context.Context, width, height uint32) error {
he.sl.Lock()
defer he.sl.Unlock()
if !he.io.Terminal() {
return errors.Wrapf(errdefs.ErrFailedPrecondition, "exec: '%s' in task: '%s' is not a tty", he.id, he.tid)
}
if he.state == shimExecStateRunning {
return he.p.Process.ResizeConsole(ctx, uint16(width), uint16(height))
}
return nil
}
func (he *hcsExec) CloseIO(ctx context.Context, stdin bool) error {
// If we have any upstream IO we close the upstream connection. This will
// unblock the `io.Copy` in the `Start()` call which will signal
// `he.p.CloseStdin()`. If `he.io.Stdin()` is already closed this is safe to
// call multiple times.
he.io.CloseStdin(ctx)
return nil
}
func (he *hcsExec) Wait() *task.StateResponse {
<-he.exited
return he.Status()
}
func (he *hcsExec) ForceExit(ctx context.Context, status int) {
he.sl.Lock()
defer he.sl.Unlock()
if he.state != shimExecStateExited {
switch he.state {
case shimExecStateCreated:
he.exitFromCreatedL(ctx, status)
case shimExecStateRunning:
// Kill the process to unblock `he.waitForExit`
he.p.Process.Kill(ctx)
}
}
}
// exitFromCreatedL transitions the shim to the exited state from the created
// state. It is the callers responsibility to hold `he.sl` for the durration of
// this transition.
//
// This call is idempotent and will not affect any state if the shim is already
// in the `shimExecStateExited` state.
//
// To transition for a created state the following must be done:
//
// 1. Issue `he.processDoneCancel` to unblock the goroutine
// `he.waitForContainerExit()``.
//
// 2. Set `he.state`, `he.exitStatus` and `he.exitedAt` to the exited values.
//
// 3. Release any upstream IO resources that were never used in a copy.
//
// 4. Close `he.exited` channel to unblock any waiters who might have called
// `Create`/`Wait`/`Start` which is a valid pattern.
//
// We DO NOT send the async `TaskExit` event because we never would have sent
// the `TaskStart`/`TaskExecStarted` event.
func (he *hcsExec) exitFromCreatedL(ctx context.Context, status int) {
if he.state != shimExecStateExited {
// Avoid logging the force if we already exited gracefully
log.G(ctx).WithField("status", status).Debug("hcsExec::exitFromCreatedL")
// Unblock the container exit goroutine
he.processDoneOnce.Do(func() { close(he.processDone) })
// Transition this exec
he.state = shimExecStateExited
he.exitStatus = uint32(status)
he.exitedAt = time.Now()
// Release all upstream IO connections (if any)
he.io.Close(ctx)
// Free any waiters
he.exitedOnce.Do(func() {
close(he.exited)
})
}
}
// waitForExit waits for the `he.p` to exit. This MUST only be called after a
// successful call to `Create` and MUST not be called more than once.
//
// This MUST be called via a goroutine.
//
// In the case of an exit from a running process the following must be done:
//
// 1. Wait for `he.p` to exit.
//
// 2. Issue `he.processDoneCancel` to unblock the goroutine
// `he.waitForContainerExit()` (if still running). We do this early to avoid the
// container exit also attempting to kill the process. However this race
// condition is safe and handled.
//
// 3. Capture the process exit code and set `he.state`, `he.exitStatus` and
// `he.exitedAt` to the exited values.
//
// 4. Wait for all IO to complete and release any upstream IO connections.
//
// 5. Send the async `TaskExit` to upstream listeners of any events.
//
// 6. Close `he.exited` channel to unblock any waiters who might have called
// `Create`/`Wait`/`Start` which is a valid pattern.
//
// 7. Finally, save the UVM and this container as a template if specified.
func (he *hcsExec) waitForExit() {
ctx, span := trace.StartSpan(context.Background(), "hcsExec::waitForExit")
defer span.End()
span.AddAttributes(
trace.StringAttribute("tid", he.tid),
trace.StringAttribute("eid", he.id))
err := he.p.Process.Wait()
if err != nil {
log.G(ctx).WithError(err).Error("failed process Wait")
}
// Issue the process cancellation to unblock the container wait as early as
// possible.
he.processDoneOnce.Do(func() { close(he.processDone) })
code, err := he.p.Process.ExitCode()
if err != nil {
log.G(ctx).WithError(err).Error("failed to get ExitCode")
} else {
log.G(ctx).WithField("exitCode", code).Debug("exited")
}
he.sl.Lock()
he.state = shimExecStateExited
he.exitStatus = uint32(code)
he.exitedAt = time.Now()
he.sl.Unlock()
// Wait for all IO copies to complete and free the resources.
he.p.Wait()
he.io.Close(ctx)
// Only send the `runtime.TaskExitEventTopic` notification if this is a true
// exec. For the `init` exec this is handled in task teardown.
if he.tid != he.id {
// We had a valid process so send the exited notification.
he.events.publishEvent(
ctx,
runtime.TaskExitEventTopic,
&eventstypes.TaskExit{
ContainerID: he.tid,
ID: he.id,
Pid: uint32(he.pid),
ExitStatus: he.exitStatus,
ExitedAt: he.exitedAt,
})
}
// Free any waiters.
he.exitedOnce.Do(func() {
close(he.exited)
})
}
// waitForContainerExit waits for `he.c` to exit. Depending on the exec's state
// will forcibly transition this exec to the exited state and unblock any
// waiters.
//
// This MUST be called via a goroutine at exec create.
func (he *hcsExec) waitForContainerExit() {
ctx, span := trace.StartSpan(context.Background(), "hcsExec::waitForContainerExit")
defer span.End()
span.AddAttributes(
trace.StringAttribute("tid", he.tid),
trace.StringAttribute("eid", he.id))
cexit := make(chan struct{})
go func() {
he.c.Wait()
close(cexit)
}()
select {
case <-cexit:
// Container exited first. We need to force the process into the exited
// state and cleanup any resources
he.sl.Lock()
switch he.state {
case shimExecStateCreated:
he.exitFromCreatedL(ctx, 1)
case shimExecStateRunning:
// Kill the process to unblock `he.waitForExit`.
he.p.Process.Kill(ctx)
}
he.sl.Unlock()
case <-he.processDone:
// Process exited first. This is the normal case do nothing because
// `he.waitForExit` will release any waiters.
}
}
// escapeArgs makes a Windows-style escaped command line from a set of arguments
func escapeArgs(args []string) string {
escapedArgs := make([]string, len(args))
for i, a := range args {
escapedArgs[i] = windows.EscapeArg(a)
}
return strings.Join(escapedArgs, " ")
}