Skip to content
This repository has been archived by the owner on Jul 28, 2021. It is now read-only.

Commit

Permalink
Change to use mutex for sync
Browse files Browse the repository at this point in the history
Removes the per container cache for processes as they are all external pid's
Changes to use a mutex instead of the wait group for ResizeConsole so we are confident of order.
Puts back some of the containerID overloads because HCS always sends this even for external processes.
  • Loading branch information
jterry75 committed Aug 2, 2017
1 parent b055df3 commit 37ae80c
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 176 deletions.
6 changes: 3 additions & 3 deletions service/gcs/bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (b *bridge) signalProcess(message []byte) (*prot.MessageResponseBase, error
}
response.ActivityID = request.ActivityID

if err := b.coreint.SignalProcess(request.ContainerID, int(request.ProcessID), request.Options); err != nil {
if err := b.coreint.SignalProcess(int(request.ProcessID), request.Options); err != nil {
return response, err
}

Expand Down Expand Up @@ -348,7 +348,7 @@ func (b *bridge) waitOnProcess(message []byte, header *prot.MessageHeader) (*pro
logrus.Error(errors.Wrapf(err, "failed to send process exit response \"%v\"", response))
}
}
if err := b.coreint.RegisterProcessExitHook(request.ContainerID, int(request.ProcessID), exitHook); err != nil {
if err := b.coreint.RegisterProcessExitHook(int(request.ProcessID), exitHook); err != nil {
return response, err
}

Expand All @@ -363,7 +363,7 @@ func (b *bridge) resizeConsole(message []byte) (*prot.MessageResponseBase, error
}
response.ActivityID = request.ActivityID

if err := b.coreint.ResizeConsole(request.ContainerID, int(request.ProcessID), request.Height, request.Width); err != nil {
if err := b.coreint.ResizeConsole(int(request.ProcessID), request.Height, request.Width); err != nil {
return response, err
}

Expand Down
21 changes: 0 additions & 21 deletions service/gcs/bridge/bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,32 +614,11 @@ var _ = Describe("Bridge", func() {
AssertNoResponseErrors()
AssertActivityIDCorrect()
It("should receive the correct values", func() {
Expect(callArgs.ID).To(Equal(containerID))
Expect(callArgs.Pid).To(Equal(101))
Expect(callArgs.Height).To(Equal(uint16(30)))
Expect(callArgs.Width).To(Equal(uint16(72)))
})
})
Context("the message is normal ASCII no containerid", func() {
BeforeEach(func() {
message = prot.ContainerResizeConsole{
MessageBase: &prot.MessageBase{
ActivityID: activityID,
},
ProcessID: 102,
Height: 80,
Width: 80,
}
})
AssertNoResponseErrors()
AssertActivityIDCorrect()
It("should receive the correct values", func() {
Expect(callArgs.ID).To(Equal(""))
Expect(callArgs.Pid).To(Equal(102))
Expect(callArgs.Height).To(Equal(uint16(80)))
Expect(callArgs.Width).To(Equal(uint16(80)))
})
})
})

Describe("calling modifySettings", func() {
Expand Down
6 changes: 3 additions & 3 deletions service/gcs/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ type Core interface {
CreateContainer(id string, info prot.VMHostedContainerSettings) error
ExecProcess(id string, info prot.ProcessParameters, stdioSet *stdio.ConnectionSet) (pid int, err error)
SignalContainer(id string, signal oslayer.Signal) error
SignalProcess(id string, pid int, options prot.SignalProcessOptions) error
SignalProcess(pid int, options prot.SignalProcessOptions) error
ListProcesses(id string) ([]runtime.ContainerProcessState, error)
RunExternalProcess(info prot.ProcessParameters, stdioSet *stdio.ConnectionSet) (pid int, err error)
ModifySettings(id string, request prot.ResourceModificationRequestResponse) error
RegisterContainerExitHook(id string, onExit func(oslayer.ProcessExitState)) error
RegisterProcessExitHook(id string, pid int, onExit func(oslayer.ProcessExitState)) error
ResizeConsole(id string, pid int, height, width uint16) error
RegisterProcessExitHook(pid int, onExit func(oslayer.ProcessExitState)) error
ResizeConsole(pid int, height, width uint16) error
}
167 changes: 55 additions & 112 deletions service/gcs/core/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,20 @@ type gcsCore struct {
// ID to cache entry.
containerCache map[string]*containerCacheEntry

externalProcessCacheMutex sync.RWMutex
processCacheMutex sync.RWMutex
// externalProcessCache stores information about external processes which
// persists between calls into the gcsCore. It is structured as a map from
// pid to cache entry.
externalProcessCache map[int]*processCacheEntry
processCache map[int]*processCacheEntry
}

// NewGCSCore creates a new gcsCore struct initialized with the given Runtime.
func NewGCSCore(rtime runtime.Runtime, os oslayer.OS) *gcsCore {
return &gcsCore{
Rtime: rtime,
OS: os,
containerCache: make(map[string]*containerCacheEntry),
externalProcessCache: make(map[int]*processCacheEntry),
Rtime: rtime,
OS: os,
containerCache: make(map[string]*containerCacheEntry),
processCache: make(map[int]*processCacheEntry),
}
}

Expand All @@ -63,20 +63,14 @@ type containerCacheEntry struct {
MappedDirectories map[uint32]prot.MappedDirectory
NetworkAdapters []prot.NetworkAdapter
container runtime.Container

processCacheMutex sync.RWMutex
// processCache stores information about processes which persists between
// calls into the gcsCore. It is structured as a map from pid to cache
// entry.
processCache map[int]*processCacheEntry
hasRunInitProcess bool
}

func newContainerCacheEntry(id string) *containerCacheEntry {
return &containerCacheEntry{
ID: id,
MappedVirtualDisks: make(map[uint8]prot.MappedVirtualDisk),
MappedDirectories: make(map[uint32]prot.MappedDirectory),
processCache: make(map[int]*processCacheEntry),
}
}
func (e *containerCacheEntry) AddExitHook(hook func(oslayer.ProcessExitState)) {
Expand Down Expand Up @@ -116,13 +110,14 @@ func (e *containerCacheEntry) RemoveMappedDirectory(dir prot.MappedDirectory) {

// processCacheEntry stores cached information for a single process.
type processCacheEntry struct {
ExitStatus oslayer.ProcessExitState
ExitHooks []func(oslayer.ProcessExitState)
Tty *stdio.TtyRelay
ExitStatus oslayer.ProcessExitState
ExitHooks []func(oslayer.ProcessExitState)
Tty *stdio.TtyRelay
ContainerID string // If "" a host process otherwise a container process.
}

func newProcessCacheEntry() *processCacheEntry {
return &processCacheEntry{}
func newProcessCacheEntry(containerID string) *processCacheEntry {
return &processCacheEntry{ContainerID: containerID}
}
func (e *processCacheEntry) AddExitHook(hook func(oslayer.ProcessExitState)) {
e.ExitHooks = append(e.ExitHooks, hook)
Expand Down Expand Up @@ -200,14 +195,11 @@ func (c *gcsCore) ExecProcess(id string, params prot.ProcessParameters, stdioSet
if containerEntry == nil {
return -1, errors.WithStack(gcserr.NewContainerDoesNotExistError(id))
}
processEntry := newProcessCacheEntry()
processEntry := newProcessCacheEntry(id)

var p runtime.Process

containerEntry.processCacheMutex.Lock()
isInitProcess := len(containerEntry.processCache) == 0
containerEntry.processCacheMutex.Unlock()
if isInitProcess {
if !containerEntry.hasRunInitProcess {
containerEntry.hasRunInitProcess = true
if err := c.writeConfigFile(id, params.OCISpecification); err != nil {
return -1, err
}
Expand Down Expand Up @@ -244,13 +236,17 @@ func (c *gcsCore) ExecProcess(id string, params prot.ProcessParameters, stdioSet
}
c.containerCacheMutex.Unlock()

containerEntry.processCacheMutex.Lock()
c.processCacheMutex.Lock()
processEntry.ExitStatus = state
for _, hook := range processEntry.ExitHooks {
hook(state)
}
containerEntry.processCacheMutex.Unlock()
c.processCacheMutex.Unlock()
c.containerCacheMutex.Lock()
containerEntry.ExitStatus = state
for _, hook := range containerEntry.ExitHooks {
hook(state)
}
delete(c.containerCache, id)
c.containerCacheMutex.Unlock()
}()
Expand All @@ -276,19 +272,19 @@ func (c *gcsCore) ExecProcess(id string, params prot.ProcessParameters, stdioSet
}
logrus.Infof("container process %d exited with exit status %d", p.Pid(), state.ExitCode())

containerEntry.processCacheMutex.Lock()
c.processCacheMutex.Lock()
processEntry.ExitStatus = state
for _, hook := range processEntry.ExitHooks {
hook(state)
}
containerEntry.processCacheMutex.Unlock()
c.processCacheMutex.Unlock()
if err := p.Delete(); err != nil {
logrus.Error(err)
}
}()
}

containerEntry.processCacheMutex.Lock()
c.processCacheMutex.Lock()
// If a processCacheEntry with the given pid already exists in the cache,
// this will overwrite it. This behavior is expected. Processes are kept in
// the cache even after they exit, which allows for exit hooks registered
Expand All @@ -301,8 +297,8 @@ func (c *gcsCore) ExecProcess(id string, params prot.ProcessParameters, stdioSet
// apply to the old process no longer makes sense, so since the old
// process's pid has been reused, its cache entry can also be reused. This
// applies to external processes as well.
containerEntry.processCache[p.Pid()] = processEntry
containerEntry.processCacheMutex.Unlock()
c.processCache[p.Pid()] = processEntry
c.processCacheMutex.Unlock()
return p.Pid(), nil
}

Expand All @@ -326,30 +322,13 @@ func (c *gcsCore) SignalContainer(id string, signal oslayer.Signal) error {
}

// SignalProcess sends the signal specified in options to the given process.
func (c *gcsCore) SignalProcess(id string, pid int, options prot.SignalProcessOptions) error {
if id == "" {
c.externalProcessCacheMutex.Lock()
if _, ok := c.externalProcessCache[pid]; !ok {
c.externalProcessCacheMutex.Unlock()
return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid))
}
c.externalProcessCacheMutex.Unlock()
} else {
c.containerCacheMutex.Lock()
containerEntry := c.getContainer(id)
if containerEntry == nil {
c.containerCacheMutex.Unlock()
return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid))
}
containerEntry.processCacheMutex.Lock()
if _, ok := containerEntry.processCache[pid]; !ok {
containerEntry.processCacheMutex.Unlock()
c.containerCacheMutex.Unlock()
return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid))
}
containerEntry.processCacheMutex.Unlock()
c.containerCacheMutex.Unlock()
func (c *gcsCore) SignalProcess(pid int, options prot.SignalProcessOptions) error {
c.processCacheMutex.Lock()
if _, ok := c.processCache[pid]; !ok {
c.processCacheMutex.Unlock()
return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid))
}
c.processCacheMutex.Unlock()

// Interpret signal value 0 as SIGKILL.
// TODO: Remove this special casing when we are not worried about breaking
Expand Down Expand Up @@ -448,7 +427,7 @@ func (c *gcsCore) RunExternalProcess(params prot.ProcessParameters, stdioSet *st
relay.Start()
}

processEntry := newProcessCacheEntry()
processEntry := newProcessCacheEntry("")
processEntry.Tty = relay
go func() {
if err := cmd.Wait(); err != nil {
Expand All @@ -467,18 +446,18 @@ func (c *gcsCore) RunExternalProcess(params prot.ProcessParameters, stdioSet *st

// Run exit hooks for the process.
state := cmd.ExitState()
c.externalProcessCacheMutex.Lock()
c.processCacheMutex.Lock()
processEntry.ExitStatus = state
for _, hook := range processEntry.ExitHooks {
hook(state)
}
c.externalProcessCacheMutex.Unlock()
c.processCacheMutex.Unlock()
}()

pid = cmd.Process().Pid()
c.externalProcessCacheMutex.Lock()
c.externalProcessCache[pid] = processEntry
c.externalProcessCacheMutex.Unlock()
c.processCacheMutex.Lock()
c.processCache[pid] = processEntry
c.processCacheMutex.Unlock()
return pid, nil
}

Expand Down Expand Up @@ -562,30 +541,14 @@ func (c *gcsCore) RegisterContainerExitHook(id string, exitHook func(oslayer.Pro
// process may have multiple exit hooks registered for it.
// This function works for both processes that are running in a container, and
// ones that are running externally to a container.
func (c *gcsCore) RegisterProcessExitHook(id string, pid int, exitHook func(oslayer.ProcessExitState)) error {
var (
entry *processCacheEntry
ok bool
)

if id == "" {
c.externalProcessCacheMutex.Lock()
defer c.externalProcessCacheMutex.Unlock()
if entry, ok = c.externalProcessCache[pid]; !ok {
return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid))
}
} else {
c.containerCacheMutex.Lock()
defer c.containerCacheMutex.Unlock()
containerEntry := c.getContainer(id)
if containerEntry == nil {
return errors.WithStack(gcserr.NewContainerDoesNotExistError(id))
}
containerEntry.processCacheMutex.Lock()
defer containerEntry.processCacheMutex.Unlock()
if entry, ok = containerEntry.processCache[pid]; !ok {
return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid))
}
func (c *gcsCore) RegisterProcessExitHook(pid int, exitHook func(oslayer.ProcessExitState)) error {
c.processCacheMutex.Lock()
defer c.processCacheMutex.Unlock()

var entry *processCacheEntry
var ok bool
if entry, ok = c.processCache[pid]; !ok {
return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid))
}

exitStatus := entry.ExitStatus
Expand All @@ -599,35 +562,15 @@ func (c *gcsCore) RegisterProcessExitHook(id string, pid int, exitHook func(osla
return nil
}

func (c *gcsCore) ResizeConsole(id string, pid int, height, width uint16) error {
var (
p *processCacheEntry
ok bool
)

if id == "" {
c.externalProcessCacheMutex.Lock()
if p, ok = c.externalProcessCache[pid]; !ok {
c.externalProcessCacheMutex.Unlock()
return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid))
}
c.externalProcessCacheMutex.Unlock()
} else {
c.containerCacheMutex.Lock()
containerEntry := c.getContainer(id)
if containerEntry == nil {
c.containerCacheMutex.Unlock()
return errors.WithStack(gcserr.NewContainerDoesNotExistError(id))
}
containerEntry.processCacheMutex.Lock()
if p, ok = containerEntry.processCache[pid]; !ok {
containerEntry.processCacheMutex.Unlock()
c.containerCacheMutex.Unlock()
return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid))
}
containerEntry.processCacheMutex.Unlock()
c.containerCacheMutex.Unlock()
func (c *gcsCore) ResizeConsole(pid int, height, width uint16) error {
c.processCacheMutex.Lock()
var p *processCacheEntry
var ok bool
if p, ok = c.processCache[pid]; !ok {
c.processCacheMutex.Unlock()
return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid))
}
c.processCacheMutex.Unlock()

if p.Tty == nil {
return fmt.Errorf("pid: %d, is not a tty and cannot be resized", pid)
Expand Down
4 changes: 2 additions & 2 deletions service/gcs/core/gcs/gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,7 @@ var _ = Describe("GCS", func() {
sigkillOptions = prot.SignalProcessOptions{Signal: int32(syscall.SIGKILL)}
})
JustBeforeEach(func() {
err = coreint.SignalProcess(containerID, processID, sigkillOptions)
err = coreint.SignalProcess(processID, sigkillOptions)
})
Context("the process has already been created", func() {
BeforeEach(func() {
Expand Down Expand Up @@ -1069,7 +1069,7 @@ var _ = Describe("GCS", func() {
pid int
)
JustBeforeEach(func() {
err = coreint.RegisterProcessExitHook(containerID, pid, func(oslayer.ProcessExitState) {})
err = coreint.RegisterProcessExitHook(pid, func(oslayer.ProcessExitState) {})
})
Context("the container has already been created", func() {
BeforeEach(func() {
Expand Down

0 comments on commit 37ae80c

Please sign in to comment.