Skip to content

Commit

Permalink
Refactor libcontainerd to minimize c8d RPCs
Browse files Browse the repository at this point in the history
The containerd client is very chatty at the best of times. Because the
libcontained API is stateless and references containers and processes by
string ID for every method call, the implementation is essentially
forced to use the containerd client in a way which amplifies the number
of redundant RPCs invoked to perform any operation. The libcontainerd
remote implementation has to reload the containerd container, task
and/or process metadata for nearly every operation. This in turn
amplifies the number of context switches between dockerd and containerd
to perform any container operation or handle a containerd event,
increasing the load on the system which could otherwise be allocated to
workloads.

Overhaul the libcontainerd interface to reduce the impedance mismatch
with the containerd client so that the containerd client can be used
more efficiently. Split the API out into container, task and process
interfaces which the consumer is expected to retain so that
libcontainerd can retain state---especially the analogous containerd
client objects---without having to manage any state-store inside the
libcontainerd client.

Signed-off-by: Cory Snider <csnider@mirantis.com>
  • Loading branch information
corhere committed Aug 24, 2022
1 parent 57d2d6e commit 4bafaa0
Show file tree
Hide file tree
Showing 36 changed files with 1,156 additions and 1,111 deletions.
47 changes: 44 additions & 3 deletions container/container.go
Expand Up @@ -19,7 +19,6 @@ import (
mounttypes "github.com/docker/docker/api/types/mount"
swarmtypes "github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/container/stream"
"github.com/docker/docker/daemon/exec"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/jsonfilelog"
"github.com/docker/docker/daemon/logger/local"
Expand All @@ -28,6 +27,7 @@ import (
"github.com/docker/docker/errdefs"
"github.com/docker/docker/image"
"github.com/docker/docker/layer"
libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
"github.com/docker/docker/pkg/containerfs"
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/pkg/ioutils"
Expand Down Expand Up @@ -86,7 +86,7 @@ type Container struct {
HasBeenManuallyRestarted bool `json:"-"` // used to distinguish restart caused by restart policy from the manual one
MountPoints map[string]*volumemounts.MountPoint
HostConfig *containertypes.HostConfig `json:"-"` // do not serialize the host config in the json, otherwise we'll make the container unportable
ExecCommands *exec.Store `json:"-"`
ExecCommands *ExecStore `json:"-"`
DependencyStore agentexec.DependencyGetter `json:"-"`
SecretReferences []*swarmtypes.SecretReference
ConfigReferences []*swarmtypes.ConfigReference
Expand Down Expand Up @@ -121,7 +121,7 @@ func NewBaseContainer(id, root string) *Container {
return &Container{
ID: id,
State: NewState(),
ExecCommands: exec.NewStore(),
ExecCommands: NewExecStore(),
Root: root,
MountPoints: make(map[string]*volumemounts.MountPoint),
StreamConfig: stream.NewConfig(),
Expand Down Expand Up @@ -752,6 +752,47 @@ func (container *Container) CreateDaemonEnvironment(tty bool, linkedEnv []string
return env
}

// RestoreTask restores the containerd container and task handles and reattaches
// the IO for the running task. Container state is not synced with containerd's
// state.
//
// An errdefs.NotFound error is returned if the container does not exist in
// containerd. However, a nil error is returned if the task does not exist in
// containerd.
func (container *Container) RestoreTask(ctx context.Context, client libcontainerdtypes.Client) error {
container.Lock()
defer container.Unlock()
var err error
container.ctr, err = client.LoadContainer(ctx, container.ID)
if err != nil {
return err
}
container.task, err = container.ctr.AttachTask(ctx, container.InitializeStdio)
if err != nil && !errdefs.IsNotFound(err) {
return err
}
return nil
}

// GetRunningTask asserts that the container is running and returns the Task for
// the container. An errdefs.Conflict error is returned if the container is not
// in the Running state.
//
// A system error is returned if container is in a bad state: Running is true
// but has a nil Task.
//
// The container lock must be held when calling this method.
func (container *Container) GetRunningTask() (libcontainerdtypes.Task, error) {
if !container.Running {
return nil, errdefs.Conflict(fmt.Errorf("container %s is not running", container.ID))
}
tsk, ok := container.Task()
if !ok {
return nil, errdefs.System(errors.WithStack(fmt.Errorf("container %s is in Running state but has no containerd Task set", container.ID)))
}
return tsk, nil
}

type rio struct {
cio.IO

Expand Down
69 changes: 26 additions & 43 deletions daemon/exec/exec.go → container/exec.go
@@ -1,20 +1,20 @@
package exec // import "github.com/docker/docker/daemon/exec"
package container // import "github.com/docker/docker/container"

import (
"context"
"runtime"
"sync"

"github.com/containerd/containerd/cio"
"github.com/docker/docker/container/stream"
"github.com/docker/docker/libcontainerd/types"
"github.com/docker/docker/pkg/stringid"
"github.com/sirupsen/logrus"
)

// Config holds the configurations for execs. The Daemon keeps
// ExecConfig holds the configurations for execs. The Daemon keeps
// track of both running and finished execs so that they can be
// examined both during and after completion.
type Config struct {
type ExecConfig struct {
sync.Mutex
Started chan struct{}
StreamConfig *stream.Config
Expand All @@ -25,7 +25,7 @@ type Config struct {
OpenStderr bool
OpenStdout bool
CanRemove bool
ContainerID string
Container *Container
DetachKeys []byte
Entrypoint string
Args []string
Expand All @@ -34,39 +34,22 @@ type Config struct {
User string
WorkingDir string
Env []string
Pid int
Process types.Process
ConsoleSize *[2]uint
}

// NewConfig initializes the a new exec configuration
func NewConfig() *Config {
return &Config{
// NewExecConfig initializes the a new exec configuration
func NewExecConfig(c *Container) *ExecConfig {
return &ExecConfig{
ID: stringid.GenerateRandomID(),
Container: c,
StreamConfig: stream.NewConfig(),
Started: make(chan struct{}),
}
}

type rio struct {
cio.IO

sc *stream.Config
}

func (i *rio) Close() error {
i.IO.Close()

return i.sc.CloseStreams()
}

func (i *rio) Wait() {
i.sc.Wait(context.Background())

i.IO.Wait()
}

// InitializeStdio is called by libcontainerd to connect the stdio.
func (c *Config) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) {
func (c *ExecConfig) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) {
c.StreamConfig.CopyToPipe(iop)

if c.StreamConfig.Stdin() == nil && !c.Tty && runtime.GOOS == "windows" {
Expand All @@ -81,32 +64,32 @@ func (c *Config) InitializeStdio(iop *cio.DirectIO) (cio.IO, error) {
}

// CloseStreams closes the stdio streams for the exec
func (c *Config) CloseStreams() error {
func (c *ExecConfig) CloseStreams() error {
return c.StreamConfig.CloseStreams()
}

// SetExitCode sets the exec config's exit code
func (c *Config) SetExitCode(code int) {
func (c *ExecConfig) SetExitCode(code int) {
c.ExitCode = &code
}

// Store keeps track of the exec configurations.
type Store struct {
byID map[string]*Config
// ExecStore keeps track of the exec configurations.
type ExecStore struct {
byID map[string]*ExecConfig
mu sync.RWMutex
}

// NewStore initializes a new exec store.
func NewStore() *Store {
return &Store{
byID: make(map[string]*Config),
// NewExecStore initializes a new exec store.
func NewExecStore() *ExecStore {
return &ExecStore{
byID: make(map[string]*ExecConfig),
}
}

// Commands returns the exec configurations in the store.
func (e *Store) Commands() map[string]*Config {
func (e *ExecStore) Commands() map[string]*ExecConfig {
e.mu.RLock()
byID := make(map[string]*Config, len(e.byID))
byID := make(map[string]*ExecConfig, len(e.byID))
for id, config := range e.byID {
byID[id] = config
}
Expand All @@ -115,29 +98,29 @@ func (e *Store) Commands() map[string]*Config {
}

// Add adds a new exec configuration to the store.
func (e *Store) Add(id string, Config *Config) {
func (e *ExecStore) Add(id string, Config *ExecConfig) {
e.mu.Lock()
e.byID[id] = Config
e.mu.Unlock()
}

// Get returns an exec configuration by its id.
func (e *Store) Get(id string) *Config {
func (e *ExecStore) Get(id string) *ExecConfig {
e.mu.RLock()
res := e.byID[id]
e.mu.RUnlock()
return res
}

// Delete removes an exec configuration from the store.
func (e *Store) Delete(id string, pid int) {
func (e *ExecStore) Delete(id string) {
e.mu.Lock()
delete(e.byID, id)
e.mu.Unlock()
}

// List returns the list of exec ids in the store.
func (e *Store) List() []string {
func (e *ExecStore) List() []string {
var IDs []string
e.mu.RLock()
for id := range e.byID {
Expand Down
37 changes: 35 additions & 2 deletions container/state.go
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/docker/docker/api/types"
libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
units "github.com/docker/go-units"
)

Expand Down Expand Up @@ -36,6 +37,14 @@ type State struct {

stopWaiters []chan<- StateStatus
removeOnlyWaiters []chan<- StateStatus

// The libcontainerd reference fields are unexported to force consumers
// to access them through the getter methods with multi-valued returns
// so that they can't forget to nil-check: the code won't compile unless
// the nil-check result is explicitly consumed or discarded.

ctr libcontainerdtypes.Container
task libcontainerdtypes.Task
}

// StateStatus is used to return container wait results.
Expand Down Expand Up @@ -260,7 +269,7 @@ func (s *State) SetExitCode(ec int) {
}

// SetRunning sets the state of the container to "running".
func (s *State) SetRunning(pid int, initial bool) {
func (s *State) SetRunning(ctr libcontainerdtypes.Container, tsk libcontainerdtypes.Task, initial bool) {
s.ErrorMsg = ""
s.Paused = false
s.Running = true
Expand All @@ -269,7 +278,13 @@ func (s *State) SetRunning(pid int, initial bool) {
s.Paused = false
}
s.ExitCodeValue = 0
s.Pid = pid
s.ctr = ctr
s.task = tsk
if tsk != nil {
s.Pid = int(tsk.Pid())
} else {
s.Pid = 0
}
s.OOMKilled = false
if initial {
s.StartedAt = time.Now().UTC()
Expand Down Expand Up @@ -404,3 +419,21 @@ func (s *State) notifyAndClear(waiters *[]chan<- StateStatus) {
}
*waiters = nil
}

// C8dContainer returns a reference to the libcontainerd Container object for
// the container and whether the reference is valid.
//
// The container lock must be held when calling this method.
func (s *State) C8dContainer() (_ libcontainerdtypes.Container, ok bool) {
return s.ctr, s.ctr != nil
}

// Task returns a reference to the libcontainerd Task object for the container
// and whether the reference is valid.
//
// The container lock must be held when calling this method.
//
// See also: (*Container).GetRunningTask().
func (s *State) Task() (_ libcontainerdtypes.Task, ok bool) {
return s.task, s.task != nil
}
16 changes: 12 additions & 4 deletions container/state_test.go
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/docker/docker/api/types"
libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
)

func TestIsValidHealthString(t *testing.T) {
Expand All @@ -28,6 +29,13 @@ func TestIsValidHealthString(t *testing.T) {
}
}

type mockTask struct {
libcontainerdtypes.Task
pid uint32
}

func (t *mockTask) Pid() uint32 { return t.pid }

func TestStateRunStop(t *testing.T) {
s := NewState()

Expand Down Expand Up @@ -60,7 +68,7 @@ func TestStateRunStop(t *testing.T) {

// Set the state to "Running".
s.Lock()
s.SetRunning(i, true)
s.SetRunning(nil, &mockTask{pid: uint32(i)}, true)
s.Unlock()

// Assert desired state.
Expand Down Expand Up @@ -125,7 +133,7 @@ func TestStateTimeoutWait(t *testing.T) {
s := NewState()

s.Lock()
s.SetRunning(0, true)
s.SetRunning(nil, nil, true)
s.Unlock()

// Start a wait with a timeout.
Expand Down Expand Up @@ -174,7 +182,7 @@ func TestCorrectStateWaitResultAfterRestart(t *testing.T) {
s := NewState()

s.Lock()
s.SetRunning(0, true)
s.SetRunning(nil, nil, true)
s.Unlock()

waitC := s.Wait(context.Background(), WaitConditionNotRunning)
Expand All @@ -185,7 +193,7 @@ func TestCorrectStateWaitResultAfterRestart(t *testing.T) {
s.Unlock()

s.Lock()
s.SetRunning(0, true)
s.SetRunning(nil, nil, true)
s.Unlock()

got := <-waitC
Expand Down
9 changes: 6 additions & 3 deletions daemon/checkpoint.go
Expand Up @@ -57,8 +57,11 @@ func (daemon *Daemon) CheckpointCreate(name string, config types.CheckpointCreat
return err
}

if !container.IsRunning() {
return fmt.Errorf("Container %s not running", name)
container.Lock()
tsk, err := container.GetRunningTask()
container.Unlock()
if err != nil {
return err
}

if !validCheckpointNamePattern.MatchString(config.CheckpointID) {
Expand All @@ -70,7 +73,7 @@ func (daemon *Daemon) CheckpointCreate(name string, config types.CheckpointCreat
return fmt.Errorf("cannot checkpoint container %s: %s", name, err)
}

err = daemon.containerd.CreateCheckpoint(context.Background(), container.ID, checkpointDir, config.Exit)
err = tsk.CreateCheckpoint(context.Background(), checkpointDir, config.Exit)
if err != nil {
os.RemoveAll(checkpointDir)
return fmt.Errorf("Cannot checkpoint container %s: %s", name, err)
Expand Down

0 comments on commit 4bafaa0

Please sign in to comment.