forked from containerd/cri
/
container_execsync.go
152 lines (136 loc) · 4.98 KB
/
container_execsync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"github.com/containerd/containerd/api/services/execution"
"github.com/containerd/containerd/api/types/task"
prototypes "github.com/gogo/protobuf/types"
"github.com/golang/glog"
runtimespec "github.com/opencontainers/runtime-spec/specs-go"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
)
// ExecSync executes a command in the container, and returns the stdout output.
// If command exits with a non-zero exit code, an error is returned.
func (c *criContainerdService) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) (retRes *runtime.ExecSyncResponse, retErr error) {
glog.V(2).Infof("ExecSync for %q with command %+v and timeout %d (s)", r.GetContainerId(), r.GetCmd(), r.GetTimeout())
defer func() {
if retErr == nil {
glog.V(2).Infof("ExecSync for %q returns with exit code %d", r.GetContainerId(), retRes.GetExitCode())
glog.V(4).Infof("ExecSync for %q outputs - stdout: %q, stderr: %q", r.GetContainerId(),
retRes.GetStdout(), retRes.GetStderr())
}
}()
// Get container config from container store.
meta, err := c.containerStore.Get(r.GetContainerId())
if err != nil {
return nil, fmt.Errorf("an error occurred when try to find container %q: %v", r.GetContainerId(), err)
}
id := meta.ID
if meta.State() != runtime.ContainerState_CONTAINER_RUNNING {
return nil, fmt.Errorf("container %q is in %s state", id, criContainerStateToString(meta.State()))
}
// TODO(random-liu): Replace the following logic with containerd client and add unit test.
// Prepare streaming pipes.
execDir, err := ioutil.TempDir(getContainerRootDir(c.rootDir, id), "exec")
if err != nil {
return nil, fmt.Errorf("failed to create exec streaming directory: %v", err)
}
defer func() {
if err = c.os.RemoveAll(execDir); err != nil {
glog.Errorf("Failed to remove exec streaming directory %q: %v", execDir, err)
}
}()
_, stdout, stderr := getStreamingPipes(execDir)
_, stdoutPipe, stderrPipe, err := c.prepareStreamingPipes(ctx, "", stdout, stderr)
if err != nil {
return nil, fmt.Errorf("failed to prepare streaming pipes: %v", err)
}
defer stdoutPipe.Close()
defer stderrPipe.Close()
// Start redirecting exec output.
stdoutBuf, stderrBuf := new(bytes.Buffer), new(bytes.Buffer)
go io.Copy(stdoutBuf, stdoutPipe) // nolint: errcheck
go io.Copy(stderrBuf, stderrPipe) // nolint: errcheck
// Get containerd event client first, so that we won't miss any events.
// TODO(random-liu): Handle this in event handler. Create an events client for
// each exec introduces unnecessary overhead.
cancellable, cancel := context.WithCancel(ctx)
events, err := c.taskService.Events(cancellable, &execution.EventsRequest{})
if err != nil {
return nil, fmt.Errorf("failed to get containerd event: %v", err)
}
spec := &meta.Spec.Process
spec.Args = r.GetCmd()
rawSpec, err := json.Marshal(spec)
if err != nil {
return nil, fmt.Errorf("failed to marshal oci spec %+v: %v", spec, err)
}
resp, err := c.taskService.Exec(ctx, &execution.ExecRequest{
ContainerID: id,
Terminal: false,
Stdout: stdout,
Stderr: stderr,
Spec: &prototypes.Any{
TypeUrl: runtimespec.Version,
Value: rawSpec,
},
})
if err != nil {
return nil, fmt.Errorf("failed to exec in container %q: %v", id, err)
}
exitCode, err := waitContainerExec(cancel, events, id, resp.Pid, r.GetTimeout())
if err != nil {
return nil, fmt.Errorf("failed to wait for exec in container %q to finish: %v", id, err)
}
// TODO(random-liu): Make sure stdout/stderr are drained.
return &runtime.ExecSyncResponse{
Stdout: stdoutBuf.Bytes(),
Stderr: stderrBuf.Bytes(),
ExitCode: int32(exitCode),
}, nil
}
// waitContainerExec waits for container exec to finish and returns the exit code.
func waitContainerExec(cancel context.CancelFunc, events execution.Tasks_EventsClient, id string,
pid uint32, timeout int64) (uint32, error) {
// TODO(random-liu): [P1] Support ExecSync timeout.
// TODO(random-liu): Delete process after containerd upgrade.
defer func() {
// Stop events and drain the event channel. grpc-go#188
cancel()
for {
_, err := events.Recv()
if err != nil {
break
}
}
}()
for {
e, err := events.Recv()
if err != nil {
// Return non-zero exit code just in case.
return unknownExitCode, err
}
if e.Type != task.Event_EXIT {
continue
}
if e.ID == id && e.Pid == pid {
return e.ExitStatus, nil
}
}
}