From e7abf551b7e3f949167ffb7989ae0a33c71d8ac5 Mon Sep 17 00:00:00 2001 From: "Justin Terry (VM)" Date: Tue, 1 Aug 2017 14:02:49 -0700 Subject: [PATCH] Change to use mutex for sync Removes the per container cache for processes as they are all external pid's Changes to use a mutex instead of the wait group for ResizeConsole so we are confident of order. Puts back some of the containerID overloads because HCS always sends this even for external processes. --- service/gcs/bridge/bridge.go | 6 +- service/gcs/bridge/bridge_test.go | 2 - service/gcs/core/core.go | 6 +- service/gcs/core/gcs/gcs.go | 163 ++++++++------------------ service/gcs/core/gcs/gcs_test.go | 4 +- service/gcs/core/mockcore/mockcore.go | 8 +- service/gcs/runtime/runc/runc.go | 2 +- service/gcs/stdio/stdio.go | 61 ++++++---- 8 files changed, 98 insertions(+), 154 deletions(-) diff --git a/service/gcs/bridge/bridge.go b/service/gcs/bridge/bridge.go index 2f4eb004..b6c0ab6d 100644 --- a/service/gcs/bridge/bridge.go +++ b/service/gcs/bridge/bridge.go @@ -276,7 +276,7 @@ func (b *bridge) signalProcess(message []byte) (*prot.MessageResponseBase, error } response.ActivityID = request.ActivityID - if err := b.coreint.SignalProcess(request.ContainerID, int(request.ProcessID), request.Options); err != nil { + if err := b.coreint.SignalProcess(int(request.ProcessID), request.Options); err != nil { return response, err } @@ -348,7 +348,7 @@ func (b *bridge) waitOnProcess(message []byte, header *prot.MessageHeader) (*pro logrus.Error(errors.Wrapf(err, "failed to send process exit response \"%v\"", response)) } } - if err := b.coreint.RegisterProcessExitHook(request.ContainerID, int(request.ProcessID), exitHook); err != nil { + if err := b.coreint.RegisterProcessExitHook(int(request.ProcessID), exitHook); err != nil { return response, err } @@ -363,7 +363,7 @@ func (b *bridge) resizeConsole(message []byte) (*prot.MessageResponseBase, error } response.ActivityID = request.ActivityID - if err := b.coreint.ResizeConsole(request.ContainerID, int(request.ProcessID), request.Height, request.Width); err != nil { + if err := b.coreint.ResizeConsole(int(request.ProcessID), request.Height, request.Width); err != nil { return response, err } diff --git a/service/gcs/bridge/bridge_test.go b/service/gcs/bridge/bridge_test.go index c653bae5..56043310 100644 --- a/service/gcs/bridge/bridge_test.go +++ b/service/gcs/bridge/bridge_test.go @@ -614,7 +614,6 @@ var _ = Describe("Bridge", func() { AssertNoResponseErrors() AssertActivityIDCorrect() It("should receive the correct values", func() { - Expect(callArgs.ID).To(Equal(containerID)) Expect(callArgs.Pid).To(Equal(101)) Expect(callArgs.Height).To(Equal(uint16(30))) Expect(callArgs.Width).To(Equal(uint16(72))) @@ -634,7 +633,6 @@ var _ = Describe("Bridge", func() { AssertNoResponseErrors() AssertActivityIDCorrect() It("should receive the correct values", func() { - Expect(callArgs.ID).To(Equal("")) Expect(callArgs.Pid).To(Equal(102)) Expect(callArgs.Height).To(Equal(uint16(80))) Expect(callArgs.Width).To(Equal(uint16(80))) diff --git a/service/gcs/core/core.go b/service/gcs/core/core.go index fe97e2b6..8b99249e 100644 --- a/service/gcs/core/core.go +++ b/service/gcs/core/core.go @@ -16,11 +16,11 @@ type Core interface { CreateContainer(id string, info prot.VMHostedContainerSettings) error ExecProcess(id string, info prot.ProcessParameters, stdioSet *stdio.ConnectionSet) (pid int, err error) SignalContainer(id string, signal oslayer.Signal) error - SignalProcess(id string, pid int, options prot.SignalProcessOptions) error + SignalProcess(pid int, options prot.SignalProcessOptions) error ListProcesses(id string) ([]runtime.ContainerProcessState, error) RunExternalProcess(info prot.ProcessParameters, stdioSet *stdio.ConnectionSet) (pid int, err error) ModifySettings(id string, request prot.ResourceModificationRequestResponse) error RegisterContainerExitHook(id string, onExit func(oslayer.ProcessExitState)) error - RegisterProcessExitHook(id string, pid int, onExit func(oslayer.ProcessExitState)) error - ResizeConsole(id string, pid int, height, width uint16) error + RegisterProcessExitHook(pid int, onExit func(oslayer.ProcessExitState)) error + ResizeConsole(pid int, height, width uint16) error } diff --git a/service/gcs/core/gcs/gcs.go b/service/gcs/core/gcs/gcs.go index 84b5ebe6..ecae0810 100644 --- a/service/gcs/core/gcs/gcs.go +++ b/service/gcs/core/gcs/gcs.go @@ -37,20 +37,20 @@ type gcsCore struct { // ID to cache entry. containerCache map[string]*containerCacheEntry - externalProcessCacheMutex sync.RWMutex + processCacheMutex sync.RWMutex // externalProcessCache stores information about external processes which // persists between calls into the gcsCore. It is structured as a map from // pid to cache entry. - externalProcessCache map[int]*processCacheEntry + processCache map[int]*processCacheEntry } // NewGCSCore creates a new gcsCore struct initialized with the given Runtime. func NewGCSCore(rtime runtime.Runtime, os oslayer.OS) *gcsCore { return &gcsCore{ - Rtime: rtime, - OS: os, - containerCache: make(map[string]*containerCacheEntry), - externalProcessCache: make(map[int]*processCacheEntry), + Rtime: rtime, + OS: os, + containerCache: make(map[string]*containerCacheEntry), + processCache: make(map[int]*processCacheEntry), } } @@ -63,12 +63,7 @@ type containerCacheEntry struct { MappedDirectories map[uint32]prot.MappedDirectory NetworkAdapters []prot.NetworkAdapter container runtime.Container - - processCacheMutex sync.RWMutex - // processCache stores information about processes which persists between - // calls into the gcsCore. It is structured as a map from pid to cache - // entry. - processCache map[int]*processCacheEntry + hasRunInitProcess bool } func newContainerCacheEntry(id string) *containerCacheEntry { @@ -76,7 +71,6 @@ func newContainerCacheEntry(id string) *containerCacheEntry { ID: id, MappedVirtualDisks: make(map[uint8]prot.MappedVirtualDisk), MappedDirectories: make(map[uint32]prot.MappedDirectory), - processCache: make(map[int]*processCacheEntry), } } func (e *containerCacheEntry) AddExitHook(hook func(oslayer.ProcessExitState)) { @@ -116,13 +110,14 @@ func (e *containerCacheEntry) RemoveMappedDirectory(dir prot.MappedDirectory) { // processCacheEntry stores cached information for a single process. type processCacheEntry struct { - ExitStatus oslayer.ProcessExitState - ExitHooks []func(oslayer.ProcessExitState) - Tty *stdio.TtyRelay + ExitStatus oslayer.ProcessExitState + ExitHooks []func(oslayer.ProcessExitState) + Tty *stdio.TtyRelay + ContainerID string // If "" a host process otherwise a container process. } -func newProcessCacheEntry() *processCacheEntry { - return &processCacheEntry{} +func newProcessCacheEntry(containerID string) *processCacheEntry { + return &processCacheEntry{ContainerID: containerID} } func (e *processCacheEntry) AddExitHook(hook func(oslayer.ProcessExitState)) { e.ExitHooks = append(e.ExitHooks, hook) @@ -200,14 +195,11 @@ func (c *gcsCore) ExecProcess(id string, params prot.ProcessParameters, stdioSet if containerEntry == nil { return -1, errors.WithStack(gcserr.NewContainerDoesNotExistError(id)) } - processEntry := newProcessCacheEntry() + processEntry := newProcessCacheEntry(id) var p runtime.Process - - containerEntry.processCacheMutex.Lock() - isInitProcess := len(containerEntry.processCache) == 0 - containerEntry.processCacheMutex.Unlock() - if isInitProcess { + if !containerEntry.hasRunInitProcess { + containerEntry.hasRunInitProcess = true if err := c.writeConfigFile(id, params.OCISpecification); err != nil { return -1, err } @@ -244,12 +236,12 @@ func (c *gcsCore) ExecProcess(id string, params prot.ProcessParameters, stdioSet } c.containerCacheMutex.Unlock() - containerEntry.processCacheMutex.Lock() + c.processCacheMutex.Lock() processEntry.ExitStatus = state for _, hook := range processEntry.ExitHooks { hook(state) } - containerEntry.processCacheMutex.Unlock() + c.processCacheMutex.Unlock() c.containerCacheMutex.Lock() delete(c.containerCache, id) c.containerCacheMutex.Unlock() @@ -276,19 +268,19 @@ func (c *gcsCore) ExecProcess(id string, params prot.ProcessParameters, stdioSet } logrus.Infof("container process %d exited with exit status %d", p.Pid(), state.ExitCode()) - containerEntry.processCacheMutex.Lock() + c.processCacheMutex.Lock() processEntry.ExitStatus = state for _, hook := range processEntry.ExitHooks { hook(state) } - containerEntry.processCacheMutex.Unlock() + c.processCacheMutex.Unlock() if err := p.Delete(); err != nil { logrus.Error(err) } }() } - containerEntry.processCacheMutex.Lock() + c.processCacheMutex.Lock() // If a processCacheEntry with the given pid already exists in the cache, // this will overwrite it. This behavior is expected. Processes are kept in // the cache even after they exit, which allows for exit hooks registered @@ -301,8 +293,8 @@ func (c *gcsCore) ExecProcess(id string, params prot.ProcessParameters, stdioSet // apply to the old process no longer makes sense, so since the old // process's pid has been reused, its cache entry can also be reused. This // applies to external processes as well. - containerEntry.processCache[p.Pid()] = processEntry - containerEntry.processCacheMutex.Unlock() + c.processCache[p.Pid()] = processEntry + c.processCacheMutex.Unlock() return p.Pid(), nil } @@ -326,30 +318,13 @@ func (c *gcsCore) SignalContainer(id string, signal oslayer.Signal) error { } // SignalProcess sends the signal specified in options to the given process. -func (c *gcsCore) SignalProcess(id string, pid int, options prot.SignalProcessOptions) error { - if id == "" { - c.externalProcessCacheMutex.Lock() - if _, ok := c.externalProcessCache[pid]; !ok { - c.externalProcessCacheMutex.Unlock() - return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid)) - } - c.externalProcessCacheMutex.Unlock() - } else { - c.containerCacheMutex.Lock() - containerEntry := c.getContainer(id) - if containerEntry == nil { - c.containerCacheMutex.Unlock() - return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid)) - } - containerEntry.processCacheMutex.Lock() - if _, ok := containerEntry.processCache[pid]; !ok { - containerEntry.processCacheMutex.Unlock() - c.containerCacheMutex.Unlock() - return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid)) - } - containerEntry.processCacheMutex.Unlock() - c.containerCacheMutex.Unlock() +func (c *gcsCore) SignalProcess(pid int, options prot.SignalProcessOptions) error { + c.processCacheMutex.Lock() + if _, ok := c.processCache[pid]; !ok { + c.processCacheMutex.Unlock() + return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid)) } + c.processCacheMutex.Unlock() // Interpret signal value 0 as SIGKILL. // TODO: Remove this special casing when we are not worried about breaking @@ -448,7 +423,7 @@ func (c *gcsCore) RunExternalProcess(params prot.ProcessParameters, stdioSet *st relay.Start() } - processEntry := newProcessCacheEntry() + processEntry := newProcessCacheEntry("") processEntry.Tty = relay go func() { if err := cmd.Wait(); err != nil { @@ -467,18 +442,18 @@ func (c *gcsCore) RunExternalProcess(params prot.ProcessParameters, stdioSet *st // Run exit hooks for the process. state := cmd.ExitState() - c.externalProcessCacheMutex.Lock() + c.processCacheMutex.Lock() processEntry.ExitStatus = state for _, hook := range processEntry.ExitHooks { hook(state) } - c.externalProcessCacheMutex.Unlock() + c.processCacheMutex.Unlock() }() pid = cmd.Process().Pid() - c.externalProcessCacheMutex.Lock() - c.externalProcessCache[pid] = processEntry - c.externalProcessCacheMutex.Unlock() + c.processCacheMutex.Lock() + c.processCache[pid] = processEntry + c.processCacheMutex.Unlock() return pid, nil } @@ -562,30 +537,14 @@ func (c *gcsCore) RegisterContainerExitHook(id string, exitHook func(oslayer.Pro // process may have multiple exit hooks registered for it. // This function works for both processes that are running in a container, and // ones that are running externally to a container. -func (c *gcsCore) RegisterProcessExitHook(id string, pid int, exitHook func(oslayer.ProcessExitState)) error { - var ( - entry *processCacheEntry - ok bool - ) - - if id == "" { - c.externalProcessCacheMutex.Lock() - defer c.externalProcessCacheMutex.Unlock() - if entry, ok = c.externalProcessCache[pid]; !ok { - return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid)) - } - } else { - c.containerCacheMutex.Lock() - defer c.containerCacheMutex.Unlock() - containerEntry := c.getContainer(id) - if containerEntry == nil { - return errors.WithStack(gcserr.NewContainerDoesNotExistError(id)) - } - containerEntry.processCacheMutex.Lock() - defer containerEntry.processCacheMutex.Unlock() - if entry, ok = containerEntry.processCache[pid]; !ok { - return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid)) - } +func (c *gcsCore) RegisterProcessExitHook(pid int, exitHook func(oslayer.ProcessExitState)) error { + c.processCacheMutex.Lock() + defer c.processCacheMutex.Unlock() + + var entry *processCacheEntry + var ok bool + if entry, ok = c.processCache[pid]; !ok { + return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid)) } exitStatus := entry.ExitStatus @@ -599,35 +558,15 @@ func (c *gcsCore) RegisterProcessExitHook(id string, pid int, exitHook func(osla return nil } -func (c *gcsCore) ResizeConsole(id string, pid int, height, width uint16) error { - var ( - p *processCacheEntry - ok bool - ) - - if id == "" { - c.externalProcessCacheMutex.Lock() - if p, ok = c.externalProcessCache[pid]; !ok { - c.externalProcessCacheMutex.Unlock() - return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid)) - } - c.externalProcessCacheMutex.Unlock() - } else { - c.containerCacheMutex.Lock() - containerEntry := c.getContainer(id) - if containerEntry == nil { - c.containerCacheMutex.Unlock() - return errors.WithStack(gcserr.NewContainerDoesNotExistError(id)) - } - containerEntry.processCacheMutex.Lock() - if p, ok = containerEntry.processCache[pid]; !ok { - containerEntry.processCacheMutex.Unlock() - c.containerCacheMutex.Unlock() - return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid)) - } - containerEntry.processCacheMutex.Unlock() - c.containerCacheMutex.Unlock() +func (c *gcsCore) ResizeConsole(pid int, height, width uint16) error { + c.processCacheMutex.Lock() + var p *processCacheEntry + var ok bool + if p, ok = c.processCache[pid]; !ok { + c.processCacheMutex.Unlock() + return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid)) } + c.processCacheMutex.Unlock() if p.Tty == nil { return fmt.Errorf("pid: %d, is not a tty and cannot be resized", pid) diff --git a/service/gcs/core/gcs/gcs_test.go b/service/gcs/core/gcs/gcs_test.go index 03e58d40..9eb1fc12 100644 --- a/service/gcs/core/gcs/gcs_test.go +++ b/service/gcs/core/gcs/gcs_test.go @@ -854,7 +854,7 @@ var _ = Describe("GCS", func() { sigkillOptions = prot.SignalProcessOptions{Signal: int32(syscall.SIGKILL)} }) JustBeforeEach(func() { - err = coreint.SignalProcess(containerID, processID, sigkillOptions) + err = coreint.SignalProcess(processID, sigkillOptions) }) Context("the process has already been created", func() { BeforeEach(func() { @@ -1069,7 +1069,7 @@ var _ = Describe("GCS", func() { pid int ) JustBeforeEach(func() { - err = coreint.RegisterProcessExitHook(containerID, pid, func(oslayer.ProcessExitState) {}) + err = coreint.RegisterProcessExitHook(pid, func(oslayer.ProcessExitState) {}) }) Context("the container has already been created", func() { BeforeEach(func() { diff --git a/service/gcs/core/mockcore/mockcore.go b/service/gcs/core/mockcore/mockcore.go index fa155f8d..2cb7e136 100644 --- a/service/gcs/core/mockcore/mockcore.go +++ b/service/gcs/core/mockcore/mockcore.go @@ -67,7 +67,6 @@ type RegisterProcessExitHookCall struct { // ResizeConsoleCall captures the arguments of ResizeConsole type ResizeConsoleCall struct { - ID string Pid int Height uint16 Width uint16 @@ -115,7 +114,7 @@ func (c *MockCore) SignalContainer(id string, signal oslayer.Signal) error { } // SignalProcess captures its arguments and returns a nil error. -func (c *MockCore) SignalProcess(id string, pid int, options prot.SignalProcessOptions) error { +func (c *MockCore) SignalProcess(pid int, options prot.SignalProcessOptions) error { c.LastSignalProcess = SignalProcessCall{ Pid: pid, Options: options, @@ -168,7 +167,7 @@ func (c *MockCore) RegisterContainerExitHook(id string, exitHook func(oslayer.Pr // RegisterProcessExitHook captures its arguments, runs the given exit hook on // a process exit state with exit code 103, and returns a nil error. -func (c *MockCore) RegisterProcessExitHook(id string, pid int, exitHook func(oslayer.ProcessExitState)) error { +func (c *MockCore) RegisterProcessExitHook(pid int, exitHook func(oslayer.ProcessExitState)) error { c.LastRegisterProcessExitHook = RegisterProcessExitHookCall{ Pid: pid, ExitHook: exitHook, @@ -178,9 +177,8 @@ func (c *MockCore) RegisterProcessExitHook(id string, pid int, exitHook func(osl } // ResizeConsole captures its arguments and returns a nil error. -func (c *MockCore) ResizeConsole(id string, pid int, height, width uint16) error { +func (c *MockCore) ResizeConsole(pid int, height, width uint16) error { c.LastResizeConsole = ResizeConsoleCall{ - ID: id, Pid: pid, Height: height, Width: width, diff --git a/service/gcs/runtime/runc/runc.go b/service/gcs/runtime/runc/runc.go index b73834e2..6b454cb7 100644 --- a/service/gcs/runtime/runc/runc.go +++ b/service/gcs/runtime/runc/runc.go @@ -536,8 +536,8 @@ func (c *container) startProcess(tempProcessDir string, hasTerminal bool, stdioS } var relay *stdio.TtyRelay - var master *os.File if hasTerminal { + var master *os.File master, err = c.r.getMasterFromSocket(sockListener) if err != nil { cmd.Process.Kill() diff --git a/service/gcs/stdio/stdio.go b/service/gcs/stdio/stdio.go index 3cf8901c..d45fac01 100644 --- a/service/gcs/stdio/stdio.go +++ b/service/gcs/stdio/stdio.go @@ -4,14 +4,12 @@ import ( "io" "os" "sync" - "sync/atomic" "syscall" "unsafe" "github.com/Microsoft/opengcs/service/gcs/transport" "github.com/Sirupsen/logrus" "github.com/pkg/errors" - "golang.org/x/sys/unix" ) @@ -111,31 +109,22 @@ func (s *ConnectionSet) NewTtyRelay(pty *os.File) *TtyRelay { // TtyRelay relays IO between a set of stdio connections and a master PTY file. type TtyRelay struct { - closing int32 - wg sync.WaitGroup - s *ConnectionSet - pty *os.File + m sync.Mutex + closed bool + wg sync.WaitGroup + s *ConnectionSet + pty *os.File } // ResizeConsole sends the appropriate resize to a pTTY FD func (r *TtyRelay) ResizeConsole(height, width uint16) error { - type consoleSize struct { - Height uint16 - Width uint16 - x uint16 - y uint16 - } + r.m.Lock() + defer r.m.Unlock() - r.wg.Add(1) - defer r.wg.Done() - if atomic.LoadInt32(&r.closing) != 0 { + if r.closed { return errors.New("error resizing console pty is closed") } - - if _, _, err := syscall.Syscall(syscall.SYS_IOCTL, r.pty.Fd(), uintptr(unix.TIOCSWINSZ), uintptr(unsafe.Pointer(&consoleSize{Height: height, Width: width}))); err != 0 { - return err - } - return nil + return ResizeConsole(r.pty, height, width) } // Start starts the relay operation. The caller must call Wait to wait @@ -179,12 +168,32 @@ func (r *TtyRelay) Wait() { // Wait for all users of stdioSet and master to finish before closing them. r.wg.Wait() - // Given the expected use of wait we cannot increment closing before calling r.wg.Wait() - // or all calls to ResizeConsole would fail. However, by calling it after there is a very - // small window that ResizeConsole could still get an invalid Fd. We call wait again to - // enusure that no ResizeConsole call came in before actualling closing the pty. - atomic.StoreInt32(&r.closing, 1) - r.wg.Wait() + r.m.Lock() + defer r.m.Unlock() + r.pty.Close() + r.closed = true r.s.Close() + +} + +// ResizeConsole sends the appropriate resize to a pTTY FD +// Synchronization of pty should be handled in the callers context. +func ResizeConsole(pty *os.File, height, width uint16) error { + type consoleSize struct { + Height uint16 + Width uint16 + x uint16 + y uint16 + } + + if pty == nil { + return errors.New("error resizing console for nil pty fd") + } + + if _, _, err := syscall.Syscall(syscall.SYS_IOCTL, pty.Fd(), uintptr(unix.TIOCSWINSZ), uintptr(unsafe.Pointer(&consoleSize{Height: height, Width: width}))); err != 0 { + return err + } + + return nil }