forked from containerd/containerd
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
194 lines (170 loc) · 4.8 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
// +build !windows
package shim
import (
"context"
"fmt"
"io"
"net"
"os"
"os/exec"
"strings"
"time"
"golang.org/x/sys/unix"
"github.com/Sirupsen/logrus"
"github.com/pkg/errors"
shim "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/reaper"
"github.com/containerd/containerd/sys"
"google.golang.org/grpc"
)
type ClientOpt func(context.Context, Config) (shim.ShimClient, io.Closer, error)
// WithStart executes a new shim process
func WithStart(binary, address string, debug bool) ClientOpt {
return func(ctx context.Context, config Config) (shim.ShimClient, io.Closer, error) {
socket, err := newSocket(config)
if err != nil {
return nil, nil, err
}
defer socket.Close()
f, err := socket.File()
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to get fd for socket %s", config.Address)
}
defer f.Close()
cmd := newCommand(binary, address, debug, config, f)
if err := reaper.Default.Start(cmd); err != nil {
return nil, nil, errors.Wrapf(err, "failed to start shim")
}
log.G(ctx).WithFields(logrus.Fields{
"pid": cmd.Process.Pid,
"address": config.Address,
"debug": debug,
}).Infof("shim %s started", binary)
if err = sys.SetOOMScore(cmd.Process.Pid, sys.OOMScoreMaxKillable); err != nil {
return nil, nil, errors.Wrap(err, "failed to set OOM Score on shim")
}
return WithConnect(ctx, config)
}
}
func newCommand(binary, address string, debug bool, config Config, socket *os.File) *exec.Cmd {
args := []string{
"--namespace", config.Namespace,
"--address", address,
}
if debug {
args = append(args, "--debug")
}
cmd := exec.Command(binary, args...)
cmd.Dir = config.Path
// make sure the shim can be re-parented to system init
// and is cloned in a new mount namespace because the overlay/filesystems
// will be mounted by the shim
cmd.SysProcAttr = &atter
cmd.ExtraFiles = append(cmd.ExtraFiles, socket)
if debug {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
}
return cmd
}
func newSocket(config Config) (*net.UnixListener, error) {
if len(config.Address) > 106 {
return nil, errors.Errorf("%q: unix socket path too long (limit 106)", config.Address)
}
l, err := net.Listen("unix", "\x00"+config.Address)
if err != nil {
return nil, errors.Wrapf(err, "failed to listen to abstract unix socket %q", config.Address)
}
return l.(*net.UnixListener), nil
}
func connect(address string, d func(string, time.Duration) (net.Conn, error)) (*grpc.ClientConn, error) {
gopts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithInsecure(),
grpc.WithTimeout(100 * time.Second),
grpc.WithDialer(d),
grpc.FailOnNonTempDialError(true),
}
conn, err := grpc.Dial(dialAddress(address), gopts...)
if err != nil {
return nil, errors.Wrapf(err, "failed to dial %q", address)
}
return conn, nil
}
func dialer(address string, timeout time.Duration) (net.Conn, error) {
address = strings.TrimPrefix(address, "unix://")
return net.DialTimeout("unix", address, timeout)
}
func annonDialer(address string, timeout time.Duration) (net.Conn, error) {
address = strings.TrimPrefix(address, "unix://")
return net.DialTimeout("unix", "\x00"+address, timeout)
}
func dialAddress(address string) string {
return fmt.Sprintf("unix://%s", address)
}
// WithConnect connects to an existing shim
func WithConnect(ctx context.Context, config Config) (shim.ShimClient, io.Closer, error) {
conn, err := connect(config.Address, annonDialer)
if err != nil {
return nil, nil, err
}
return shim.NewShimClient(conn), conn, nil
}
// WithLocal uses an in process shim
func WithLocal(ctx context.Context, config Config) (shim.ShimClient, io.Closer, error) {
service, err := NewService(config.Path, config.Namespace, "")
if err != nil {
return nil, nil, err
}
return NewLocal(service), nil, nil
}
type Config struct {
Address string
Path string
Namespace string
}
// New returns a new shim client
func New(ctx context.Context, config Config, opt ClientOpt) (*Client, error) {
s, c, err := opt(ctx, config)
if err != nil {
return nil, err
}
return &Client{
ShimClient: s,
c: c,
}, nil
}
type Client struct {
shim.ShimClient
c io.Closer
}
func (c *Client) IsAlive(ctx context.Context) (bool, error) {
_, err := c.ShimInfo(ctx, empty)
if err != nil {
if err != grpc.ErrServerStopped {
return false, err
}
return false, nil
}
return true, nil
}
// KillShim kills the shim forcefully
func (c *Client) KillShim(ctx context.Context) error {
info, err := c.ShimInfo(ctx, empty)
if err != nil {
return err
}
pid := int(info.ShimPid)
// make sure we don't kill ourselves if we are running a local shim
if os.Getpid() == pid {
return nil
}
return unix.Kill(pid, unix.SIGKILL)
}
func (c *Client) Close() error {
if c.c == nil {
return nil
}
return c.c.Close()
}