Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add containerd-shim plumbing for job containers #962

Merged
merged 1 commit into from
Jun 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 46 additions & 6 deletions cmd/containerd-shim-runhcs-v1/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ func createPod(ctx context.Context, events publisher, req *task.CreateTaskReques
owner := filepath.Base(os.Args[0])
isWCOW := oci.IsWCOW(s)

p := pod{
events: events,
id: req.ID,
}

var parent *uvm.UtilityVM
if oci.IsIsolated(s) {
// Create the UVM parent
Expand Down Expand Up @@ -125,22 +130,41 @@ func createPod(ctx context.Context, events publisher, req *task.CreateTaskReques
parent.Close()
return nil, err
}
} else if oci.IsJobContainer(s) {
// If we're making a job container fake a task (i.e reuse the wcowPodSandbox logic)
p.sandboxTask = newWcowPodSandboxTask(ctx, events, req.ID, req.Bundle, parent, "")
if err := events.publishEvent(
ctx,
runtime.TaskCreateEventTopic,
&eventstypes.TaskCreate{
ContainerID: req.ID,
Bundle: req.Bundle,
Rootfs: req.Rootfs,
IO: &eventstypes.TaskIO{
Stdin: req.Stdin,
Stdout: req.Stdout,
Stderr: req.Stderr,
Terminal: req.Terminal,
},
Checkpoint: "",
Pid: 0,
}); err != nil {
return nil, err
katiewasnothere marked this conversation as resolved.
Show resolved Hide resolved
}
p.jobContainer = true
return &p, nil
} else if !isWCOW {
return nil, errors.Wrap(errdefs.ErrFailedPrecondition, "oci spec does not contain WCOW or LCOW spec")
}

defer func() {
// clean up the uvm if we fail any further operations
if err != nil && parent != nil {
parent.Close()
}
}()

p := pod{
events: events,
id: req.ID,
host: parent,
}

p.host = parent
if parent != nil {
cid := req.ID
if id, ok := s.Annotations[oci.AnnotationNcproxyContainerID]; ok {
Expand Down Expand Up @@ -232,6 +256,11 @@ type pod struct {
// It MUST be treated as read only in the lifetime of the pod.
host *uvm.UtilityVM

// jobContainer specifies whether this pod is for WCOW job containers only.
//
// It MUST be treated as read only in the lifetime of the pod.
jobContainer bool

workloadTasks sync.Map
}

Expand Down Expand Up @@ -263,6 +292,17 @@ func (p *pod) CreateTask(ctx context.Context, req *task.CreateTaskRequest, s *sp
return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "task with id: '%s' already exists id pod: '%s'", req.ID, p.id)
}

if p.jobContainer {
// This is a short circuit to make sure that all containers in a pod will have
// the same IP address/be added to the same compartment.
//
// There will need to be OS work needed to support this scenario, so for now we need to block on
// this.
if !oci.IsJobContainer(s) {
return nil, errors.New("cannot create a normal process isolated container if the pod sandbox is a job container")
}
}

ct, sid, err := oci.GetSandboxTypeAndID(s.Annotations)
if err != nil {
return nil, err
Expand Down
52 changes: 37 additions & 15 deletions cmd/containerd-shim-runhcs-v1/task_hcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/Microsoft/hcsshim/internal/hcs/schema1"
hcsschema "github.com/Microsoft/hcsshim/internal/hcs/schema2"
"github.com/Microsoft/hcsshim/internal/hcsoci"
"github.com/Microsoft/hcsshim/internal/jobcontainers"
"github.com/Microsoft/hcsshim/internal/log"
"github.com/Microsoft/hcsshim/internal/oci"
"github.com/Microsoft/hcsshim/internal/processorinfo"
Expand Down Expand Up @@ -113,6 +114,39 @@ func newHcsStandaloneTask(ctx context.Context, events publisher, req *task.Creat
return shim, nil
}

// createContainer is a generic call to return either a process/hypervisor isolated container, or a job container
// based on what is set in the OCI spec.
func createContainer(ctx context.Context, id, owner, netNS string, s *specs.Spec, parent *uvm.UtilityVM, shimOpts *runhcsopts.Options) (cow.Container, *resources.Resources, error) {
var (
err error
container cow.Container
resources *resources.Resources
)

if oci.IsJobContainer(s) {
container, resources, err = jobcontainers.Create(ctx, id, s)
if err != nil {
return nil, nil, err
}
} else {
opts := &hcsoci.CreateOptions{
ID: id,
Owner: owner,
Spec: s,
HostingSystem: parent,
NetworkNamespace: netNS,
}
if shimOpts != nil {
opts.ScaleCPULimitsToSandbox = shimOpts.ScaleCpuLimitsToSandbox
}
container, resources, err = hcsoci.CreateContainer(ctx, opts)
if err != nil {
return nil, nil, err
}
}
return container, resources, nil
}

// newHcsTask creates a container within `parent` and its init exec process in
// the `shimExecCreated` state and returns the task that tracks its lifetime.
//
Expand Down Expand Up @@ -152,19 +186,7 @@ func newHcsTask(
shimOpts = v.(*runhcsopts.Options)
}

opts := hcsoci.CreateOptions{
ID: req.ID,
Owner: owner,
Spec: s,
HostingSystem: parent,
NetworkNamespace: netNS,
}

if shimOpts != nil {
opts.ScaleCPULimitsToSandbox = shimOpts.ScaleCpuLimitsToSandbox
}

system, resources, err := hcsoci.CreateContainer(ctx, &opts)
container, resources, err := createContainer(ctx, req.ID, owner, netNS, s, parent, shimOpts)
if err != nil {
return nil, err
}
Expand All @@ -173,7 +195,7 @@ func newHcsTask(
events: events,
id: req.ID,
isWCOW: oci.IsWCOW(s),
c: system,
c: container,
cr: resources,
ownsHost: ownsParent,
host: parent,
Expand All @@ -186,7 +208,7 @@ func newHcsTask(
events,
req.ID,
parent,
system,
container,
req.ID,
req.Bundle,
ht.isWCOW,
Expand Down
3 changes: 3 additions & 0 deletions cmd/containerd-shim-runhcs-v1/task_wcow_podsandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@ func (wpst *wcowPodSandboxTask) Share(ctx context.Context, req *shimdiag.ShareRe

func (wpst *wcowPodSandboxTask) Stats(ctx context.Context) (*stats.Statistics, error) {
stats := &stats.Statistics{}
if wpst.host == nil {
return stats, nil
}
katiewasnothere marked this conversation as resolved.
Show resolved Hide resolved
vmStats, err := wpst.host.Stats(ctx)
if err != nil && !isStatsNotFound(err) {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions internal/hcsoci/resources_lcow.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ func allocateLinuxResources(ctx context.Context, coi *createOptionsInternal, r *
containerRootInUVM := r.ContainerRootInUVM()
if coi.Spec.Windows != nil && len(coi.Spec.Windows.LayerFolders) > 0 {
log.G(ctx).Debug("hcsshim::allocateLinuxResources mounting storage")
rootPath, err := layers.MountContainerLayers(ctx, coi.Spec.Windows.LayerFolders, containerRootInUVM, coi.HostingSystem)
rootPath, err := layers.MountContainerLayers(ctx, coi.Spec.Windows.LayerFolders, containerRootInUVM, "", coi.HostingSystem)
if err != nil {
return errors.Wrap(err, "failed to mount container storage")
}
coi.Spec.Root.Path = rootPath
layers := layers.NewImageLayers(coi.HostingSystem, containerRootInUVM, coi.Spec.Windows.LayerFolders, isSandbox)
layers := layers.NewImageLayers(coi.HostingSystem, containerRootInUVM, coi.Spec.Windows.LayerFolders, "", isSandbox)
r.SetLayers(layers)
} else if coi.Spec.Root.Path != "" {
// This is the "Plan 9" root filesystem.
Expand Down
4 changes: 2 additions & 2 deletions internal/hcsoci/resources_wcow.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ func allocateWindowsResources(ctx context.Context, coi *createOptionsInternal, r
if coi.Spec.Root.Path == "" && (coi.HostingSystem != nil || coi.Spec.Windows.HyperV == nil) {
log.G(ctx).Debug("hcsshim::allocateWindowsResources mounting storage")
containerRootInUVM := r.ContainerRootInUVM()
containerRootPath, err := layers.MountContainerLayers(ctx, coi.Spec.Windows.LayerFolders, containerRootInUVM, coi.HostingSystem)
containerRootPath, err := layers.MountContainerLayers(ctx, coi.Spec.Windows.LayerFolders, containerRootInUVM, "", coi.HostingSystem)
if err != nil {
return errors.Wrap(err, "failed to mount container storage")
}
coi.Spec.Root.Path = containerRootPath
layers := layers.NewImageLayers(coi.HostingSystem, containerRootInUVM, coi.Spec.Windows.LayerFolders, isSandbox)
layers := layers.NewImageLayers(coi.HostingSystem, containerRootInUVM, coi.Spec.Windows.LayerFolders, "", isSandbox)
r.SetLayers(layers)
}

Expand Down
90 changes: 26 additions & 64 deletions internal/jobcontainers/jobcontainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"github.com/Microsoft/hcsshim/internal/layers"
"github.com/Microsoft/hcsshim/internal/log"
"github.com/Microsoft/hcsshim/internal/queue"
"github.com/Microsoft/hcsshim/internal/resources"
"github.com/Microsoft/hcsshim/internal/winapi"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sys/windows"
)

Expand Down Expand Up @@ -66,7 +66,6 @@ type JobContainer struct {
spec *specs.Spec // OCI spec used to create the container
job *jobobject.JobObject // Object representing the job object the container owns
sandboxMount string // Path to where the sandbox is mounted on the host
m sync.Mutex
closedWaitOnce sync.Once
init initProc
startTimestamp time.Time
Expand All @@ -89,33 +88,21 @@ func newJobContainer(id string, s *specs.Spec) *JobContainer {
}

// Create creates a new JobContainer from `s`.
func Create(ctx context.Context, id string, s *specs.Spec) (_ cow.Container, err error) {
func Create(ctx context.Context, id string, s *specs.Spec) (_ cow.Container, _ *resources.Resources, err error) {
log.G(ctx).WithField("id", id).Debug("Creating job container")

if s == nil {
return nil, errors.New("Spec must be supplied")
return nil, nil, errors.New("Spec must be supplied")
}

if id == "" {
g, err := guid.NewV4()
if err != nil {
return nil, err
return nil, nil, err
}
id = g.String()
}

if err := mountLayers(ctx, s); err != nil {
return nil, errors.Wrap(err, "failed to mount container layers")
}

volumeGUIDRegex := `^\\\\\?\\(Volume)\{{0,1}[0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12}(\}){0,1}\}(|\\)$`
if matched, err := regexp.MatchString(volumeGUIDRegex, s.Root.Path); !matched || err != nil {
return nil, fmt.Errorf(`invalid container spec - Root.Path '%s' must be a volume GUID path in the format '\\?\Volume{GUID}\'`, s.Root.Path)
}
if s.Root.Path[len(s.Root.Path)-1] != '\\' {
s.Root.Path += `\` // Be nice to clients and make sure well-formed for back-compat
}

container := newJobContainer(id, s)

// Create the job object all processes will run in.
Expand All @@ -125,52 +112,50 @@ func Create(ctx context.Context, id string, s *specs.Spec) (_ cow.Container, err
}
job, err := jobobject.Create(ctx, options)
if err != nil {
return nil, errors.Wrap(err, "failed to create job object")
return nil, nil, errors.Wrap(err, "failed to create job object")
}

// Parity with how we handle process isolated containers. We set the same flag which
// behaves the same way for a silo.
if err := job.SetTerminateOnLastHandleClose(); err != nil {
return nil, errors.Wrap(err, "failed to set terminate on last handle close on job container")
return nil, nil, errors.Wrap(err, "failed to set terminate on last handle close on job container")
}
container.job = job

var path string
r := resources.NewContainerResources(id)
defer func() {
if err != nil {
container.Close()
if path != "" {
_ = removeSandboxMountPoint(ctx, path)
}
_ = resources.ReleaseResources(ctx, r, nil, true)
}
}()

limits, err := specToLimits(ctx, id, s)
if err != nil {
return nil, errors.Wrap(err, "failed to convert OCI spec to job object limits")
sandboxPath := fmt.Sprintf(sandboxMountFormat, id)
if err := mountLayers(ctx, s, sandboxPath); err != nil {
return nil, nil, errors.Wrap(err, "failed to mount container layers")
}
container.sandboxMount = sandboxPath

// Set resource limits on the job object based off of oci spec.
if err := job.SetResourceLimits(limits); err != nil {
return nil, errors.Wrap(err, "failed to set resource limits")
layers := layers.NewImageLayers(nil, "", s.Windows.LayerFolders, sandboxPath, false)
r.SetLayers(layers)

volumeGUIDRegex := `^\\\\\?\\(Volume)\{{0,1}[0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12}(\}){0,1}\}(|\\)$`
if matched, err := regexp.MatchString(volumeGUIDRegex, s.Root.Path); !matched || err != nil {
return nil, nil, fmt.Errorf(`invalid container spec - Root.Path '%s' must be a volume GUID path in the format '\\?\Volume{GUID}\'`, s.Root.Path)
}

// Setup directory sandbox volume will be mounted
sandboxPath := fmt.Sprintf(sandboxMountFormat, id)
if _, err := os.Stat(sandboxPath); os.IsNotExist(err) {
if err := os.MkdirAll(sandboxPath, 0777); err != nil {
return nil, errors.Wrap(err, "failed to create mounted folder")
}
limits, err := specToLimits(ctx, id, s)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to convert OCI spec to job object limits")
}
path = sandboxPath

if err := mountSandboxVolume(ctx, path, s.Root.Path); err != nil {
return nil, errors.Wrap(err, "failed to bind payload directory on host")
// Set resource limits on the job object based off of oci spec.
if err := job.SetResourceLimits(limits); err != nil {
return nil, nil, errors.Wrap(err, "failed to set resource limits")
}

container.sandboxMount = path
go container.waitBackground(ctx)
return container, nil
return container, r, nil
}

// CreateProcess creates a process on the host, starts it, adds it to the containers
Expand Down Expand Up @@ -283,29 +268,6 @@ func (c *JobContainer) Modify(ctx context.Context, config interface{}) (err erro
return errors.New("modify not supported for job containers")
}

// Release unmounts all of the container layers. Safe to call multiple times, if no storage
// is mounted this call will just return nil.
func (c *JobContainer) Release(ctx context.Context) error {
c.m.Lock()
defer c.m.Unlock()

log.G(ctx).WithFields(logrus.Fields{
"id": c.id,
"path": c.sandboxMount,
}).Warn("removing sandbox volume mount")

if c.sandboxMount != "" {
if err := removeSandboxMountPoint(ctx, c.sandboxMount); err != nil {
return errors.Wrap(err, "failed to remove sandbox volume mount path")
}
if err := layers.UnmountContainerLayers(ctx, c.spec.Windows.LayerFolders, "", nil, layers.UnmountOperationAll); err != nil {
return errors.Wrap(err, "failed to unmount container layers")
}
c.sandboxMount = ""
}
return nil
}

// Start starts the container. There's nothing to "start" for job containers, so this just
// sets the start timestamp.
func (c *JobContainer) Start(ctx context.Context) error {
Expand Down Expand Up @@ -484,7 +446,7 @@ func (c *JobContainer) waitBackground(ctx context.Context) {
// them to exit.
<-c.init.proc.waitBlock

ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
dcantah marked this conversation as resolved.
Show resolved Hide resolved
defer cancel()
if err := c.Shutdown(ctx); err != nil {
_ = c.Terminate(ctx)
Expand Down
Loading