Skip to content
Permalink
Browse files

Merge pull request #36895 from dmcgowan/vendor-containerd-master

Update containerd to 1.1
  • Loading branch information...
thaJeztah committed Jun 6, 2018
2 parents 340fd5e + a000934 commit fd2f2a919e392b96de74795ae9af2dc5e510bc4c
Showing 481 changed files with 41,852 additions and 19,098 deletions.
@@ -4,7 +4,7 @@
# containerd is also pinned in vendor.conf. When updating the binary
# version you may also need to update the vendor version to pick up bug
# fixes or new APIs.
CONTAINERD_COMMIT=773c489c9c1b21a6d78b5c538cd395416ec50f88 # v1.0.3
CONTAINERD_COMMIT=395068d2b7256518259816ae19e45824b15da071 # v1.1.1-rc.0

install_containerd() {
echo "Install containerd version $CONTAINERD_COMMIT"
@@ -1,7 +1,7 @@
#!/bin/sh

# When updating RUNC_COMMIT, also update runc in vendor.conf accordingly
RUNC_COMMIT=4fc53a81fb7c994640722ac585fa9ca548971871
RUNC_COMMIT=69663f0bd4b60df09991c08812a60108003fa340

install_runc() {
# Do not build with ambient capabilities support
@@ -94,7 +94,7 @@ func testBuildWithSession(t *testing.T, client dclient.APIClient, daemonHost str
})
sess.Allow(fsProvider)

g, ctx := errgroup.WithContext(context.Background())
g, ctx := errgroup.WithContext(ctx)

g.Go(func() error {
return sess.Run(ctx, client.DialSession)
@@ -16,18 +16,17 @@ import (
"syscall"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/containerd/containerd"
"github.com/containerd/containerd/api/events"
eventsapi "github.com/containerd/containerd/api/services/events/v1"
apievents "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/archive"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/content"
containerderrors "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/linux/runctypes"
"github.com/containerd/typeurl"
@@ -295,7 +294,8 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
t, err = ctr.ctr.NewTask(ctx,
func(id string) (cio.IO, error) {
fifos := newFIFOSet(ctr.bundleDir, InitProcessName, withStdin, spec.Process.Terminal)
rio, err = c.createIO(fifos, id, InitProcessName, stdinCloseSync, attachStdio)

rio, err = c.createIO(fifos, id, InitProcessName, stdinCloseSync, attachStdio, spec.Process.Terminal)
return rio, err
},
func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error {
@@ -366,7 +366,7 @@ func (c *client) Exec(ctx context.Context, containerID, processID string, spec *
}()

p, err = t.Exec(ctx, processID, spec, func(id string) (cio.IO, error) {
rio, err = c.createIO(fifos, containerID, processID, stdinCloseSync, attachStdio)
rio, err = c.createIO(fifos, containerID, processID, stdinCloseSync, attachStdio, spec.Terminal)
return rio, err
})
if err != nil {
@@ -645,8 +645,16 @@ func (c *client) getProcess(containerID, processID string) (containerd.Process,

// createIO creates the io to be used by a process
// This needs to get a pointer to interface as upon closure the process may not have yet been registered
func (c *client) createIO(fifos *cio.FIFOSet, containerID, processID string, stdinCloseSync chan struct{}, attachStdio StdioCallback) (cio.IO, error) {
io, err := cio.NewDirectIO(context.Background(), fifos)
func (c *client) createIO(fifos *cio.FIFOSet, containerID, processID string, stdinCloseSync chan struct{}, attachStdio StdioCallback, terminal bool) (cio.IO, error) {
var (
io *cio.DirectIO
err error
)
if terminal {
io, err = cio.NewDirectIOWithTerminal(context.Background(), fifos)
} else {
io, err = cio.NewDirectIO(context.Background(), fifos)
}
if err != nil {
return nil, err
}
@@ -729,137 +737,123 @@ func (c *client) processEvent(ctr *container, et EventType, ei EventInfo) {

func (c *client) processEventStream(ctx context.Context) {
var (
err error
eventStream eventsapi.Events_SubscribeClient
ev *eventsapi.Envelope
et EventType
ei EventInfo
ctr *container
err error
ev *events.Envelope
et EventType
ei EventInfo
ctr *container
)
defer func() {
if err != nil {
select {
case <-ctx.Done():
c.logger.WithError(ctx.Err()).
Info("stopping event stream following graceful shutdown")
default:
go c.processEventStream(ctx)
}
}
}()

eventStream, err = c.getRemote().EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{
Filters: []string{
// Filter on both namespace *and* topic. To create an "and" filter,
// this must be a single, comma-separated string
"namespace==" + c.namespace + ",topic~=|^/tasks/|",
},
}, grpc.FailFast(false))
if err != nil {
return
}
// Filter on both namespace *and* topic. To create an "and" filter,
// this must be a single, comma-separated string
eventStream, errC := c.getRemote().EventService().Subscribe(ctx, "namespace=="+c.namespace+",topic~=|^/tasks/|")

c.logger.WithField("namespace", c.namespace).Debug("processing event stream")

var oomKilled bool
for {
ev, err = eventStream.Recv()
if err != nil {
errStatus, ok := status.FromError(err)
if !ok || errStatus.Code() != codes.Canceled {
c.logger.WithError(err).Error("failed to get event")
select {
case err = <-errC:
if err != nil {
errStatus, ok := status.FromError(err)
if !ok || errStatus.Code() != codes.Canceled {
c.logger.WithError(err).Error("failed to get event")
go c.processEventStream(ctx)
} else {
c.logger.WithError(ctx.Err()).Info("stopping event stream following graceful shutdown")
}
}
return
}

if ev.Event == nil {
c.logger.WithField("event", ev).Warn("invalid event")
continue
}
case ev = <-eventStream:
if ev.Event == nil {
c.logger.WithField("event", ev).Warn("invalid event")
continue
}

v, err := typeurl.UnmarshalAny(ev.Event)
if err != nil {
c.logger.WithError(err).WithField("event", ev).Warn("failed to unmarshal event")
continue
}
v, err := typeurl.UnmarshalAny(ev.Event)
if err != nil {
c.logger.WithError(err).WithField("event", ev).Warn("failed to unmarshal event")
continue
}

c.logger.WithField("topic", ev.Topic).Debug("event")
c.logger.WithField("topic", ev.Topic).Debug("event")

switch t := v.(type) {
case *events.TaskCreate:
et = EventCreate
ei = EventInfo{
ContainerID: t.ContainerID,
ProcessID: t.ContainerID,
Pid: t.Pid,
}
case *events.TaskStart:
et = EventStart
ei = EventInfo{
ContainerID: t.ContainerID,
ProcessID: t.ContainerID,
Pid: t.Pid,
}
case *events.TaskExit:
et = EventExit
ei = EventInfo{
ContainerID: t.ContainerID,
ProcessID: t.ID,
Pid: t.Pid,
ExitCode: t.ExitStatus,
ExitedAt: t.ExitedAt,
}
case *events.TaskOOM:
et = EventOOM
ei = EventInfo{
ContainerID: t.ContainerID,
OOMKilled: true,
}
oomKilled = true
case *events.TaskExecAdded:
et = EventExecAdded
ei = EventInfo{
ContainerID: t.ContainerID,
ProcessID: t.ExecID,
}
case *events.TaskExecStarted:
et = EventExecStarted
ei = EventInfo{
ContainerID: t.ContainerID,
ProcessID: t.ExecID,
Pid: t.Pid,
}
case *events.TaskPaused:
et = EventPaused
ei = EventInfo{
ContainerID: t.ContainerID,
switch t := v.(type) {
case *apievents.TaskCreate:
et = EventCreate
ei = EventInfo{
ContainerID: t.ContainerID,
ProcessID: t.ContainerID,
Pid: t.Pid,
}
case *apievents.TaskStart:
et = EventStart
ei = EventInfo{
ContainerID: t.ContainerID,
ProcessID: t.ContainerID,
Pid: t.Pid,
}
case *apievents.TaskExit:
et = EventExit
ei = EventInfo{
ContainerID: t.ContainerID,
ProcessID: t.ID,
Pid: t.Pid,
ExitCode: t.ExitStatus,
ExitedAt: t.ExitedAt,
}
case *apievents.TaskOOM:
et = EventOOM
ei = EventInfo{
ContainerID: t.ContainerID,
OOMKilled: true,
}
oomKilled = true
case *apievents.TaskExecAdded:
et = EventExecAdded
ei = EventInfo{
ContainerID: t.ContainerID,
ProcessID: t.ExecID,
}
case *apievents.TaskExecStarted:
et = EventExecStarted
ei = EventInfo{
ContainerID: t.ContainerID,
ProcessID: t.ExecID,
Pid: t.Pid,
}
case *apievents.TaskPaused:
et = EventPaused
ei = EventInfo{
ContainerID: t.ContainerID,
}
case *apievents.TaskResumed:
et = EventResumed
ei = EventInfo{
ContainerID: t.ContainerID,
}
default:
c.logger.WithFields(logrus.Fields{
"topic": ev.Topic,
"type": reflect.TypeOf(t)},
).Info("ignoring event")
continue
}
case *events.TaskResumed:
et = EventResumed
ei = EventInfo{
ContainerID: t.ContainerID,

ctr = c.getContainer(ei.ContainerID)
if ctr == nil {
c.logger.WithField("container", ei.ContainerID).Warn("unknown container")
continue
}
default:
c.logger.WithFields(logrus.Fields{
"topic": ev.Topic,
"type": reflect.TypeOf(t)},
).Info("ignoring event")
continue
}

ctr = c.getContainer(ei.ContainerID)
if ctr == nil {
c.logger.WithField("container", ei.ContainerID).Warn("unknown container")
continue
}
if oomKilled {
ctr.setOOMKilled(true)
oomKilled = false
}
ei.OOMKilled = ctr.getOOMKilled()

if oomKilled {
ctr.setOOMKilled(true)
oomKilled = false
c.processEvent(ctr, et, ei)
}
ei.OOMKilled = ctr.getOOMKilled()

c.processEvent(ctr, et, ei)
}
}

@@ -18,7 +18,7 @@ import (

"github.com/BurntSushi/toml"
"github.com/containerd/containerd"
"github.com/containerd/containerd/server"
"github.com/containerd/containerd/services/server"
"github.com/docker/docker/pkg/system"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@@ -6,6 +6,7 @@ import (
"syscall"
"time"

"github.com/containerd/containerd/defaults"
"github.com/docker/docker/pkg/system"
)

@@ -18,6 +19,12 @@ func (r *remote) setDefaults() {
if r.GRPC.Address == "" {
r.GRPC.Address = filepath.Join(r.stateDir, sockFile)
}
if r.GRPC.MaxRecvMsgSize == 0 {
r.GRPC.MaxRecvMsgSize = defaults.DefaultMaxRecvMsgSize
}
if r.GRPC.MaxSendMsgSize == 0 {
r.GRPC.MaxSendMsgSize = defaults.DefaultMaxSendMsgSize
}
if r.Debug.Address == "" {
r.Debug.Address = filepath.Join(r.stateDir, debugSockFile)
}
Oops, something went wrong.

0 comments on commit fd2f2a9

Please sign in to comment.
You can’t perform that action at this time.