Skip to content

Commit

Permalink
Merge pull request #43564 from corhere/libcontainerd-overhaul
Browse files Browse the repository at this point in the history
Refactor libcontainerd to minimize containerd RPCs
  • Loading branch information
tianon committed Aug 25, 2022
2 parents 5f698d1 + a09f8db commit 0ec426a
Show file tree
Hide file tree
Showing 40 changed files with 1,262 additions and 1,198 deletions.
3 changes: 2 additions & 1 deletion api/swagger.yaml
Expand Up @@ -4650,7 +4650,8 @@ definitions:
example: false
OOMKilled:
description: |
Whether this container has been killed because it ran out of memory.
Whether a process within this container has been killed because it ran
out of memory since the container was last started.
type: "boolean"
example: false
Dead:
Expand Down
50 changes: 44 additions & 6 deletions container/container.go
Expand Up @@ -19,7 +19,6 @@ import (
mounttypes "github.com/docker/docker/api/types/mount"
swarmtypes "github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/container/stream"
"github.com/docker/docker/daemon/exec"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/jsonfilelog"
"github.com/docker/docker/daemon/logger/local"
Expand All @@ -28,6 +27,7 @@ import (
"github.com/docker/docker/errdefs"
"github.com/docker/docker/image"
"github.com/docker/docker/layer"
libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
"github.com/docker/docker/pkg/containerfs"
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/pkg/ioutils"
Expand All @@ -53,9 +53,6 @@ type ExitStatus struct {
// The exit code with which the container exited.
ExitCode int

// Whether the container encountered an OOM.
OOMKilled bool

// Time at which the container died
ExitedAt time.Time
}
Expand Down Expand Up @@ -89,7 +86,7 @@ type Container struct {
HasBeenManuallyRestarted bool `json:"-"` // used to distinguish restart caused by restart policy from the manual one
MountPoints map[string]*volumemounts.MountPoint
HostConfig *containertypes.HostConfig `json:"-"` // do not serialize the host config in the json, otherwise we'll make the container unportable
ExecCommands *exec.Store `json:"-"`
ExecCommands *ExecStore `json:"-"`
DependencyStore agentexec.DependencyGetter `json:"-"`
SecretReferences []*swarmtypes.SecretReference
ConfigReferences []*swarmtypes.ConfigReference
Expand Down Expand Up @@ -124,7 +121,7 @@ func NewBaseContainer(id, root string) *Container {
return &Container{
ID: id,
State: NewState(),
ExecCommands: exec.NewStore(),
ExecCommands: NewExecStore(),
Root: root,
MountPoints: make(map[string]*volumemounts.MountPoint),
StreamConfig: stream.NewConfig(),
Expand Down Expand Up @@ -755,6 +752,47 @@ func (container *Container) CreateDaemonEnvironment(tty bool, linkedEnv []string
return env
}

// RestoreTask restores the containerd container and task handles and reattaches
// the IO for the running task. Container state is not synced with containerd's
// state.
//
// An errdefs.NotFound error is returned if the container does not exist in
// containerd. However, a nil error is returned if the task does not exist in
// containerd.
func (container *Container) RestoreTask(ctx context.Context, client libcontainerdtypes.Client) error {
container.Lock()
defer container.Unlock()
var err error
container.ctr, err = client.LoadContainer(ctx, container.ID)
if err != nil {
return err
}
container.task, err = container.ctr.AttachTask(ctx, container.InitializeStdio)
if err != nil && !errdefs.IsNotFound(err) {
return err
}
return nil
}

// GetRunningTask asserts that the container is running and returns the Task for
// the container. An errdefs.Conflict error is returned if the container is not
// in the Running state.
//
// A system error is returned if container is in a bad state: Running is true
// but has a nil Task.
//
// The container lock must be held when calling this method.
func (container *Container) GetRunningTask() (libcontainerdtypes.Task, error) {
if !container.Running {
return nil, errdefs.Conflict(fmt.Errorf("container %s is not running", container.ID))
}
tsk, ok := container.Task()
if !ok {
return nil, errdefs.System(errors.WithStack(fmt.Errorf("container %s is in Running state but has no containerd Task set", container.ID)))
}
return tsk, nil
}

type rio struct {
cio.IO

Expand Down
91 changes: 37 additions & 54 deletions daemon/exec/exec.go → container/exec.go
@@ -1,20 +1,20 @@
package exec // import "github.com/docker/docker/daemon/exec"
package container // import "github.com/docker/docker/container"

import (
"context"
"runtime"
"sync"

"github.com/containerd/containerd/cio"
"github.com/docker/docker/container/stream"
"github.com/docker/docker/libcontainerd/types"
"github.com/docker/docker/pkg/stringid"
"github.com/sirupsen/logrus"
)

// Config holds the configurations for execs. The Daemon keeps
// ExecConfig holds the configurations for execs. The Daemon keeps
// track of both running and finished execs so that they can be
// examined both during and after completion.
type Config struct {
type ExecConfig struct {
sync.Mutex
Started chan struct{}
StreamConfig *stream.Config
Expand All @@ -25,7 +25,7 @@ type Config struct {
OpenStderr bool
OpenStdout bool
CanRemove bool
ContainerID string
Container *Container
DetachKeys []byte
Entrypoint string
Args []string
Expand All @@ -34,39 +34,22 @@ type Config struct {
User string
WorkingDir string
Env []string
Pid int
Process types.Process
ConsoleSize *[2]uint
}

// NewConfig initializes the a new exec configuration
func NewConfig() *Config {
return &Config{
// NewExecConfig initializes the a new exec configuration
func NewExecConfig(c *Container) *ExecConfig {
return &ExecConfig{
ID: stringid.GenerateRandomID(),
Container: c,
StreamConfig: stream.NewConfig(),
Started: make(chan struct{}),
}
}

type rio struct {
cio.IO

sc *stream.Config
}

func (i *rio) Close() error {
i.IO.Close()

return i.sc.CloseStreams()
}

func (i *rio) Wait() {
i.sc.Wait(context.Background())

i.IO.Wait()
}

// InitializeStdio is called by libcontainerd to connect the stdio.
func (c *Config) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) {
func (c *ExecConfig) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) {
c.StreamConfig.CopyToPipe(iop)

if c.StreamConfig.Stdin() == nil && !c.Tty && runtime.GOOS == "windows" {
Expand All @@ -81,68 +64,68 @@ func (c *Config) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) {
}

// CloseStreams closes the stdio streams for the exec
func (c *Config) CloseStreams() error {
func (c *ExecConfig) CloseStreams() error {
return c.StreamConfig.CloseStreams()
}

// SetExitCode sets the exec config's exit code
func (c *Config) SetExitCode(code int) {
func (c *ExecConfig) SetExitCode(code int) {
c.ExitCode = &code
}

// Store keeps track of the exec configurations.
type Store struct {
byID map[string]*Config
sync.RWMutex
// ExecStore keeps track of the exec configurations.
type ExecStore struct {
byID map[string]*ExecConfig
mu sync.RWMutex
}

// NewStore initializes a new exec store.
func NewStore() *Store {
return &Store{
byID: make(map[string]*Config),
// NewExecStore initializes a new exec store.
func NewExecStore() *ExecStore {
return &ExecStore{
byID: make(map[string]*ExecConfig),
}
}

// Commands returns the exec configurations in the store.
func (e *Store) Commands() map[string]*Config {
e.RLock()
byID := make(map[string]*Config, len(e.byID))
func (e *ExecStore) Commands() map[string]*ExecConfig {
e.mu.RLock()
byID := make(map[string]*ExecConfig, len(e.byID))
for id, config := range e.byID {
byID[id] = config
}
e.RUnlock()
e.mu.RUnlock()
return byID
}

// Add adds a new exec configuration to the store.
func (e *Store) Add(id string, Config *Config) {
e.Lock()
func (e *ExecStore) Add(id string, Config *ExecConfig) {
e.mu.Lock()
e.byID[id] = Config
e.Unlock()
e.mu.Unlock()
}

// Get returns an exec configuration by its id.
func (e *Store) Get(id string) *Config {
e.RLock()
func (e *ExecStore) Get(id string) *ExecConfig {
e.mu.RLock()
res := e.byID[id]
e.RUnlock()
e.mu.RUnlock()
return res
}

// Delete removes an exec configuration from the store.
func (e *Store) Delete(id string, pid int) {
e.Lock()
func (e *ExecStore) Delete(id string) {
e.mu.Lock()
delete(e.byID, id)
e.Unlock()
e.mu.Unlock()
}

// List returns the list of exec ids in the store.
func (e *Store) List() []string {
func (e *ExecStore) List() []string {
var IDs []string
e.RLock()
e.mu.RLock()
for id := range e.byID {
IDs = append(IDs, id)
}
e.RUnlock()
e.mu.RUnlock()
return IDs
}
40 changes: 36 additions & 4 deletions container/state.go
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/docker/docker/api/types"
libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
units "github.com/docker/go-units"
)

Expand Down Expand Up @@ -36,6 +37,14 @@ type State struct {

stopWaiters []chan<- StateStatus
removeOnlyWaiters []chan<- StateStatus

// The libcontainerd reference fields are unexported to force consumers
// to access them through the getter methods with multi-valued returns
// so that they can't forget to nil-check: the code won't compile unless
// the nil-check result is explicitly consumed or discarded.

ctr libcontainerdtypes.Container
task libcontainerdtypes.Task
}

// StateStatus is used to return container wait results.
Expand Down Expand Up @@ -260,7 +269,7 @@ func (s *State) SetExitCode(ec int) {
}

// SetRunning sets the state of the container to "running".
func (s *State) SetRunning(pid int, initial bool) {
func (s *State) SetRunning(ctr libcontainerdtypes.Container, tsk libcontainerdtypes.Task, initial bool) {
s.ErrorMsg = ""
s.Paused = false
s.Running = true
Expand All @@ -269,7 +278,14 @@ func (s *State) SetRunning(pid int, initial bool) {
s.Paused = false
}
s.ExitCodeValue = 0
s.Pid = pid
s.ctr = ctr
s.task = tsk
if tsk != nil {
s.Pid = int(tsk.Pid())
} else {
s.Pid = 0
}
s.OOMKilled = false
if initial {
s.StartedAt = time.Now().UTC()
}
Expand All @@ -287,7 +303,6 @@ func (s *State) SetStopped(exitStatus *ExitStatus) {
s.FinishedAt = exitStatus.ExitedAt
}
s.ExitCodeValue = exitStatus.ExitCode
s.OOMKilled = exitStatus.OOMKilled

s.notifyAndClear(&s.stopWaiters)
}
Expand All @@ -303,7 +318,6 @@ func (s *State) SetRestarting(exitStatus *ExitStatus) {
s.Pid = 0
s.FinishedAt = time.Now().UTC()
s.ExitCodeValue = exitStatus.ExitCode
s.OOMKilled = exitStatus.OOMKilled

s.notifyAndClear(&s.stopWaiters)
}
Expand Down Expand Up @@ -405,3 +419,21 @@ func (s *State) notifyAndClear(waiters *[]chan<- StateStatus) {
}
*waiters = nil
}

// C8dContainer returns a reference to the libcontainerd Container object for
// the container and whether the reference is valid.
//
// The container lock must be held when calling this method.
func (s *State) C8dContainer() (_ libcontainerdtypes.Container, ok bool) {
return s.ctr, s.ctr != nil
}

// Task returns a reference to the libcontainerd Task object for the container
// and whether the reference is valid.
//
// The container lock must be held when calling this method.
//
// See also: (*Container).GetRunningTask().
func (s *State) Task() (_ libcontainerdtypes.Task, ok bool) {
return s.task, s.task != nil
}

0 comments on commit 0ec426a

Please sign in to comment.