Skip to content

Commit

Permalink
Merge pull request #1601 from kiashok/memLeakShim
Browse files Browse the repository at this point in the history
Remove blocking on container exit for every new exec created
  • Loading branch information
kiashok committed Dec 12, 2022
2 parents 9782dee + 5fc00c5 commit 0b8319a
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 26 deletions.
8 changes: 2 additions & 6 deletions cmd/containerd-shim-runhcs-v1/exec_hcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,13 +493,9 @@ func (he *hcsExec) waitForContainerExit() {
trace.StringAttribute("tid", he.tid),
trace.StringAttribute("eid", he.id))

cexit := make(chan struct{})
go func() {
_ = he.c.Wait()
close(cexit)
}()
// wait for container or process to exit and ckean up resrources
select {
case <-cexit:
case <-he.c.WaitChannel():
// Container exited first. We need to force the process into the exited
// state and cleanup any resources
he.sl.Lock()
Expand Down
6 changes: 6 additions & 0 deletions internal/cow/cow.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ type Container interface {
// container to be terminated by some error condition (including calling
// Close).
Wait() error
// WaitChannel returns the wait channel of the container
WaitChannel() <-chan struct{}
// WaitError returns the container termination error.
// This function should only be called after the channel in WaitChannel()
// is closed. Otherwise it is not thread safe.
WaitError() error
// Modify sends a request to modify container resources
Modify(ctx context.Context, config interface{}) error
}
50 changes: 34 additions & 16 deletions internal/gcs/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ type Container struct {
notifyCh chan struct{}
closeCh chan struct{}
closeOnce sync.Once
// waitBlock is the channel used to wait for container shutdown or termination
waitBlock chan struct{}
// waitError indicates the container termination error if any
waitError error
}

var _ cow.Container = &Container{}
Expand All @@ -39,10 +43,11 @@ func (gc *GuestConnection) CreateContainer(ctx context.Context, cid string, conf
span.AddAttributes(trace.StringAttribute("cid", cid))

c := &Container{
gc: gc,
id: cid,
notifyCh: make(chan struct{}),
closeCh: make(chan struct{}),
gc: gc,
id: cid,
notifyCh: make(chan struct{}),
closeCh: make(chan struct{}),
waitBlock: make(chan struct{}),
}
err = gc.requestNotify(cid, c.notifyCh)
if err != nil {
Expand All @@ -65,10 +70,11 @@ func (gc *GuestConnection) CreateContainer(ctx context.Context, cid string, conf
// container that is already running inside the UVM (after cloning).
func (gc *GuestConnection) CloneContainer(ctx context.Context, cid string) (_ *Container, err error) {
c := &Container{
gc: gc,
id: cid,
notifyCh: make(chan struct{}),
closeCh: make(chan struct{}),
gc: gc,
id: cid,
notifyCh: make(chan struct{}),
closeCh: make(chan struct{}),
waitBlock: make(chan struct{}),
}
err = gc.requestNotify(cid, c.notifyCh)
if err != nil {
Expand All @@ -95,6 +101,8 @@ func (c *Container) Close() error {
_, span := oc.StartSpan(context.Background(), "gcs::Container::Close")
defer span.End()
span.AddAttributes(trace.StringAttribute("cid", c.id))

close(c.closeCh)
})
return nil
}
Expand Down Expand Up @@ -224,23 +232,33 @@ func (c *Container) Terminate(ctx context.Context) (err error) {
return c.shutdown(ctx, rpcShutdownForced)
}

func (c *Container) WaitChannel() <-chan struct{} {
return c.waitBlock
}

func (c *Container) WaitError() error {
return c.waitError
}

// Wait waits for the container to terminate (or Close to be called, or the
// guest connection to terminate).
func (c *Container) Wait() error {
select {
case <-c.notifyCh:
return nil
case <-c.closeCh:
return errors.New("container closed")
}
<-c.WaitChannel()
return c.WaitError()
}

func (c *Container) waitBackground() {
ctx, span := oc.StartSpan(context.Background(), "gcs::Container::waitBackground")
defer span.End()
span.AddAttributes(trace.StringAttribute("cid", c.id))

err := c.Wait()
select {
case <-c.notifyCh:
case <-c.closeCh:
c.waitError = errors.New("container closed")
}
close(c.waitBlock)

log.G(ctx).Debug("container exited")
oc.SetSpanStatus(span, err)
oc.SetSpanStatus(span, c.waitError)
}
12 changes: 10 additions & 2 deletions internal/hcs/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,11 +296,19 @@ func (computeSystem *System) waitBackground() {
oc.SetSpanStatus(span, err)
}

func (computeSystem *System) WaitChannel() <-chan struct{} {
return computeSystem.waitBlock
}

func (computeSystem *System) WaitError() error {
return computeSystem.waitError
}

// Wait synchronously waits for the compute system to shutdown or terminate. If
// the compute system has already exited returns the previous error (if any).
func (computeSystem *System) Wait() error {
<-computeSystem.waitBlock
return computeSystem.waitError
<-computeSystem.WaitChannel()
return computeSystem.WaitError()
}

// stopped returns true if the compute system stopped.
Expand Down
12 changes: 10 additions & 2 deletions internal/jobcontainers/jobcontainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,11 +626,19 @@ func (c *JobContainer) Terminate(ctx context.Context) error {
return nil
}

func (c *JobContainer) WaitChannel() <-chan struct{} {
return c.waitBlock
}

func (c *JobContainer) WaitError() error {
return c.waitError
}

// Wait synchronously waits for the container to shutdown or terminate. If
// the container has already exited returns the previous error (if any).
func (c *JobContainer) Wait() error {
<-c.waitBlock
return c.waitError
<-c.WaitChannel()
return c.WaitError()
}

func (c *JobContainer) waitBackground(ctx context.Context) {
Expand Down

0 comments on commit 0b8319a

Please sign in to comment.