From 7556251ab64e43d93f81283c8e7ce0e4ba7d7ddc Mon Sep 17 00:00:00 2001 From: "Justin Terry (VM)" Date: Mon, 31 Jul 2017 12:31:11 -0700 Subject: [PATCH 1/5] Adds console resize support. Resolves: #76 --- service/gcs/bridge/bridge.go | 10 +-- service/gcs/core/core.go | 5 +- service/gcs/core/gcs/gcs.go | 142 +++++++++++++++++++++---------- service/gcs/runtime/runc/runc.go | 19 +++-- service/gcs/runtime/runtime.go | 2 + service/gcs/stdio/tty.go | 18 ++++ 6 files changed, 141 insertions(+), 55 deletions(-) diff --git a/service/gcs/bridge/bridge.go b/service/gcs/bridge/bridge.go index 40a7ac16..a02b786f 100644 --- a/service/gcs/bridge/bridge.go +++ b/service/gcs/bridge/bridge.go @@ -276,7 +276,7 @@ func (b *bridge) signalProcess(message []byte) (*prot.MessageResponseBase, error } response.ActivityID = request.ActivityID - if err := b.coreint.SignalProcess(int(request.ProcessID), request.Options); err != nil { + if err := b.coreint.SignalProcess(request.ContainerID, int(request.ProcessID), request.Options); err != nil { return response, err } @@ -348,15 +348,13 @@ func (b *bridge) waitOnProcess(message []byte, header *prot.MessageHeader) (*pro logrus.Error(errors.Wrapf(err, "failed to send process exit response \"%v\"", response)) } } - if err := b.coreint.RegisterProcessExitHook(int(request.ProcessID), exitHook); err != nil { + if err := b.coreint.RegisterProcessExitHook(request.ContainerID, int(request.ProcessID), exitHook); err != nil { return response, err } return response, nil } -// resizeConsole is currently a nop until the functionality is implemented. -// TODO: Tests still need to be written when it's no longer a nop. func (b *bridge) resizeConsole(message []byte) (*prot.MessageResponseBase, error) { response := newResponseBase() var request prot.ContainerResizeConsole @@ -365,7 +363,9 @@ func (b *bridge) resizeConsole(message []byte) (*prot.MessageResponseBase, error } response.ActivityID = request.ActivityID - // NOP + if err := b.coreint.ResizeConsole(request.ContainerID, int(request.ProcessID), request.Height, request.Width); err != nil { + return response, err + } return response, nil } diff --git a/service/gcs/core/core.go b/service/gcs/core/core.go index 2ebf58df..fe97e2b6 100644 --- a/service/gcs/core/core.go +++ b/service/gcs/core/core.go @@ -16,10 +16,11 @@ type Core interface { CreateContainer(id string, info prot.VMHostedContainerSettings) error ExecProcess(id string, info prot.ProcessParameters, stdioSet *stdio.ConnectionSet) (pid int, err error) SignalContainer(id string, signal oslayer.Signal) error - SignalProcess(pid int, options prot.SignalProcessOptions) error + SignalProcess(id string, pid int, options prot.SignalProcessOptions) error ListProcesses(id string) ([]runtime.ContainerProcessState, error) RunExternalProcess(info prot.ProcessParameters, stdioSet *stdio.ConnectionSet) (pid int, err error) ModifySettings(id string, request prot.ResourceModificationRequestResponse) error RegisterContainerExitHook(id string, onExit func(oslayer.ProcessExitState)) error - RegisterProcessExitHook(pid int, onExit func(oslayer.ProcessExitState)) error + RegisterProcessExitHook(id string, pid int, onExit func(oslayer.ProcessExitState)) error + ResizeConsole(id string, pid int, height, width uint16) error } diff --git a/service/gcs/core/gcs/gcs.go b/service/gcs/core/gcs/gcs.go index 7812b27e..18ff789d 100644 --- a/service/gcs/core/gcs/gcs.go +++ b/service/gcs/core/gcs/gcs.go @@ -37,12 +37,6 @@ type gcsCore struct { // ID to cache entry. containerCache map[string]*containerCacheEntry - processCacheMutex sync.RWMutex - // processCache stores information about processes which persists between - // calls into the gcsCore. It is structured as a map from pid to cache - // entry. - processCache map[int]*processCacheEntry - externalProcessCacheMutex sync.RWMutex // externalProcessCache stores information about external processes which // persists between calls into the gcsCore. It is structured as a map from @@ -56,7 +50,6 @@ func NewGCSCore(rtime runtime.Runtime, os oslayer.OS) *gcsCore { Rtime: rtime, OS: os, containerCache: make(map[string]*containerCacheEntry), - processCache: make(map[int]*processCacheEntry), externalProcessCache: make(map[int]*processCacheEntry), } } @@ -65,12 +58,17 @@ func NewGCSCore(rtime runtime.Runtime, os oslayer.OS) *gcsCore { type containerCacheEntry struct { ID string ExitStatus oslayer.ProcessExitState - Processes []int ExitHooks []func(oslayer.ProcessExitState) MappedVirtualDisks map[uint8]prot.MappedVirtualDisk MappedDirectories map[uint32]prot.MappedDirectory NetworkAdapters []prot.NetworkAdapter container runtime.Container + + processCacheMutex sync.RWMutex + // processCache stores information about processes which persists between + // calls into the gcsCore. It is structured as a map from pid to cache + // entry. + processCache map[int]*processCacheEntry } func newContainerCacheEntry(id string) *containerCacheEntry { @@ -78,14 +76,12 @@ func newContainerCacheEntry(id string) *containerCacheEntry { ID: id, MappedVirtualDisks: make(map[uint8]prot.MappedVirtualDisk), MappedDirectories: make(map[uint32]prot.MappedDirectory), + processCache: make(map[int]*processCacheEntry), } } func (e *containerCacheEntry) AddExitHook(hook func(oslayer.ProcessExitState)) { e.ExitHooks = append(e.ExitHooks, hook) } -func (e *containerCacheEntry) AddProcess(pid int) { - e.Processes = append(e.Processes, pid) -} func (e *containerCacheEntry) AddNetworkAdapter(adapter prot.NetworkAdapter) { e.NetworkAdapters = append(e.NetworkAdapters, adapter) } @@ -122,6 +118,7 @@ func (e *containerCacheEntry) RemoveMappedDirectory(dir prot.MappedDirectory) { type processCacheEntry struct { ExitStatus oslayer.ProcessExitState ExitHooks []func(oslayer.ProcessExitState) + Master *os.File } func newProcessCacheEntry() *processCacheEntry { @@ -207,7 +204,9 @@ func (c *gcsCore) ExecProcess(id string, params prot.ProcessParameters, stdioSet var p runtime.Process - isInitProcess := len(containerEntry.Processes) == 0 + containerEntry.processCacheMutex.Lock() + isInitProcess := len(containerEntry.processCache) == 0 + containerEntry.processCacheMutex.Unlock() if isInitProcess { if err := c.writeConfigFile(id, params.OCISpecification); err != nil { return -1, err @@ -220,6 +219,7 @@ func (c *gcsCore) ExecProcess(id string, params prot.ProcessParameters, stdioSet containerEntry.container = container p = container + processEntry.Master = p.Console() // Configure network adapters in the namespace. for _, adapter := range containerEntry.NetworkAdapters { @@ -244,17 +244,13 @@ func (c *gcsCore) ExecProcess(id string, params prot.ProcessParameters, stdioSet } c.containerCacheMutex.Unlock() - c.processCacheMutex.Lock() + containerEntry.processCacheMutex.Lock() processEntry.ExitStatus = state for _, hook := range processEntry.ExitHooks { hook(state) } - c.processCacheMutex.Unlock() + containerEntry.processCacheMutex.Unlock() c.containerCacheMutex.Lock() - containerEntry.ExitStatus = state - for _, hook := range containerEntry.ExitHooks { - hook(state) - } delete(c.containerCache, id) c.containerCacheMutex.Unlock() }() @@ -271,6 +267,7 @@ func (c *gcsCore) ExecProcess(id string, params prot.ProcessParameters, stdioSet if err != nil { return -1, err } + processEntry.Master = p.Console() go func() { state, err := p.Wait() @@ -279,19 +276,19 @@ func (c *gcsCore) ExecProcess(id string, params prot.ProcessParameters, stdioSet } logrus.Infof("container process %d exited with exit status %d", p.Pid(), state.ExitCode()) - c.processCacheMutex.Lock() + containerEntry.processCacheMutex.Lock() processEntry.ExitStatus = state for _, hook := range processEntry.ExitHooks { hook(state) } - c.processCacheMutex.Unlock() + containerEntry.processCacheMutex.Unlock() if err := p.Delete(); err != nil { logrus.Error(err) } }() } - c.processCacheMutex.Lock() + containerEntry.processCacheMutex.Lock() // If a processCacheEntry with the given pid already exists in the cache, // this will overwrite it. This behavior is expected. Processes are kept in // the cache even after they exit, which allows for exit hooks registered @@ -304,9 +301,8 @@ func (c *gcsCore) ExecProcess(id string, params prot.ProcessParameters, stdioSet // apply to the old process no longer makes sense, so since the old // process's pid has been reused, its cache entry can also be reused. This // applies to external processes as well. - c.processCache[p.Pid()] = processEntry - c.processCacheMutex.Unlock() - containerEntry.AddProcess(p.Pid()) + containerEntry.processCache[p.Pid()] = processEntry + containerEntry.processCacheMutex.Unlock() return p.Pid(), nil } @@ -330,18 +326,30 @@ func (c *gcsCore) SignalContainer(id string, signal oslayer.Signal) error { } // SignalProcess sends the signal specified in options to the given process. -func (c *gcsCore) SignalProcess(pid int, options prot.SignalProcessOptions) error { - c.processCacheMutex.Lock() - c.externalProcessCacheMutex.Lock() - if _, ok := c.processCache[pid]; !ok { +func (c *gcsCore) SignalProcess(id string, pid int, options prot.SignalProcessOptions) error { + if id == "" { + c.externalProcessCacheMutex.Lock() if _, ok := c.externalProcessCache[pid]; !ok { - c.processCacheMutex.Unlock() c.externalProcessCacheMutex.Unlock() return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid)) } + c.externalProcessCacheMutex.Unlock() + } else { + c.containerCacheMutex.Lock() + containerEntry := c.getContainer(id) + if containerEntry == nil { + c.containerCacheMutex.Unlock() + return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid)) + } + containerEntry.processCacheMutex.Lock() + if _, ok := containerEntry.processCache[pid]; !ok { + containerEntry.processCacheMutex.Unlock() + c.containerCacheMutex.Unlock() + return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid)) + } + containerEntry.processCacheMutex.Unlock() + c.containerCacheMutex.Unlock() } - c.processCacheMutex.Unlock() - c.externalProcessCacheMutex.Unlock() // Interpret signal value 0 as SIGKILL. // TODO: Remove this special casing when we are not worried about breaking @@ -395,11 +403,11 @@ func (c *gcsCore) RunExternalProcess(params prot.ProcessParameters, stdioSet *st cmd.SetEnv(ociProcess.Env) var relay *stdio.TtyRelay + var master *os.File if params.EmulateConsole { // Allocate a console for the process. var ( consolePath string - master *os.File ) master, consolePath, err = stdio.NewConsole() if err != nil { @@ -441,6 +449,7 @@ func (c *gcsCore) RunExternalProcess(params prot.ProcessParameters, stdioSet *st } processEntry := newProcessCacheEntry() + processEntry.Master = master go func() { if err := cmd.Wait(); err != nil { // TODO: When cmd is a shell, and last command in the shell @@ -553,18 +562,28 @@ func (c *gcsCore) RegisterContainerExitHook(id string, exitHook func(oslayer.Pro // process may have multiple exit hooks registered for it. // This function works for both processes that are running in a container, and // ones that are running externally to a container. -func (c *gcsCore) RegisterProcessExitHook(pid int, exitHook func(oslayer.ProcessExitState)) error { - c.processCacheMutex.Lock() - defer c.processCacheMutex.Unlock() - c.externalProcessCacheMutex.Lock() - defer c.externalProcessCacheMutex.Unlock() +func (c *gcsCore) RegisterProcessExitHook(id string, pid int, exitHook func(oslayer.ProcessExitState)) error { + var ( + entry *processCacheEntry + ok bool + ) - var entry *processCacheEntry - var ok bool - entry, ok = c.processCache[pid] - if !ok { - entry, ok = c.externalProcessCache[pid] - if !ok { + if id == "" { + c.externalProcessCacheMutex.Lock() + defer c.externalProcessCacheMutex.Unlock() + if entry, ok = c.externalProcessCache[pid]; !ok { + return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid)) + } + } else { + c.containerCacheMutex.Lock() + defer c.containerCacheMutex.Unlock() + containerEntry := c.getContainer(id) + if containerEntry == nil { + return errors.WithStack(gcserr.NewContainerDoesNotExistError(id)) + } + containerEntry.processCacheMutex.Lock() + defer containerEntry.processCacheMutex.Unlock() + if entry, ok = containerEntry.processCache[pid]; !ok { return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid)) } } @@ -580,6 +599,43 @@ func (c *gcsCore) RegisterProcessExitHook(pid int, exitHook func(oslayer.Process return nil } +func (c *gcsCore) ResizeConsole(id string, pid int, height, width uint16) error { + var ( + p *processCacheEntry + ok bool + ) + + if id == "" { + c.externalProcessCacheMutex.Lock() + if p, ok = c.externalProcessCache[pid]; !ok { + c.externalProcessCacheMutex.Unlock() + return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid)) + } + c.externalProcessCacheMutex.Unlock() + } else { + c.containerCacheMutex.Lock() + containerEntry := c.getContainer(id) + if containerEntry == nil { + c.containerCacheMutex.Unlock() + return errors.WithStack(gcserr.NewContainerDoesNotExistError(id)) + } + containerEntry.processCacheMutex.Lock() + if p, ok = containerEntry.processCache[pid]; !ok { + containerEntry.processCacheMutex.Unlock() + c.containerCacheMutex.Unlock() + return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid)) + } + containerEntry.processCacheMutex.Unlock() + c.containerCacheMutex.Unlock() + } + + if err := stdio.ResizeConsole(p.Master, height, width); err != nil { + return errors.Wrapf(err, "failed to resize console for pid: %d", pid) + } + + return nil +} + // setupMappedVirtualDisks is a helper function which calls into the functions // in storage.go to set up a set of mapped virtual disks for a given container. // It then adds them to the container's cache entry. diff --git a/service/gcs/runtime/runc/runc.go b/service/gcs/runtime/runc/runc.go index 441f5390..2aa6d08e 100644 --- a/service/gcs/runtime/runc/runc.go +++ b/service/gcs/runtime/runc/runc.go @@ -48,16 +48,25 @@ func (c *container) Pid() int { return c.init.Pid() } +func (c *container) Console() *os.File { + return c.init.master +} + type process struct { - c *container - pid int - relay *stdio.TtyRelay + c *container + pid int + relay *stdio.TtyRelay + master *os.File } func (p *process) Pid() int { return p.pid } +func (p *process) Console() *os.File { + return p.master +} + // NewRuntime instantiates a new runcRuntime struct. func NewRuntime() (*runcRuntime, error) { rtime := &runcRuntime{} @@ -528,8 +537,8 @@ func (c *container) startProcess(tempProcessDir string, hasTerminal bool, stdioS } var relay *stdio.TtyRelay + var master *os.File if hasTerminal { - var master *os.File master, err = c.r.getMasterFromSocket(sockListener) if err != nil { cmd.Process.Kill() @@ -561,5 +570,5 @@ func (c *container) startProcess(tempProcessDir string, hasTerminal bool, stdioS if relay != nil { relay.Start() } - return &process{c: c, pid: pid, relay: relay}, nil + return &process{c: c, pid: pid, relay: relay, master: master}, nil } diff --git a/service/gcs/runtime/runtime.go b/service/gcs/runtime/runtime.go index 9c4806a4..0ff7501d 100644 --- a/service/gcs/runtime/runtime.go +++ b/service/gcs/runtime/runtime.go @@ -4,6 +4,7 @@ package runtime import ( "io" + "os" "github.com/Microsoft/opengcs/service/gcs/oslayer" "github.com/Microsoft/opengcs/service/gcs/stdio" @@ -43,6 +44,7 @@ type Process interface { Wait() (oslayer.ProcessExitState, error) Pid() int Delete() error + Console() *os.File } // Container is an interface to manipulate container state. diff --git a/service/gcs/stdio/tty.go b/service/gcs/stdio/tty.go index d2cdafd9..7524de9f 100644 --- a/service/gcs/stdio/tty.go +++ b/service/gcs/stdio/tty.go @@ -7,8 +7,17 @@ import ( "unsafe" "github.com/pkg/errors" + + "golang.org/x/sys/unix" ) +type consoleSize struct { + Height uint16 + Width uint16 + x uint16 + y uint16 +} + // NewConsole allocates a new console and returns the File for its master and // path for its slave. func NewConsole() (*os.File, string, error) { @@ -33,6 +42,15 @@ func NewConsole() (*os.File, string, error) { return master, console, nil } +// ResizeConsole sends the appropriate resize to a pTTY FD +func ResizeConsole(master *os.File, height, width uint16) error { + if master == nil { + return errors.New("failed to resize console with null master fd") + } + + return ioctl(master.Fd(), uintptr(unix.TIOCSWINSZ), uintptr(unsafe.Pointer(&consoleSize{Height: height, Width: width}))) +} + func ioctl(fd uintptr, flag, data uintptr) error { if _, _, err := syscall.Syscall(syscall.SYS_IOCTL, fd, flag, data); err != 0 { return err From 5691c4e2f6c98a5e14aa732f334aa7a798c39e05 Mon Sep 17 00:00:00 2001 From: "Justin Terry (VM)" Date: Tue, 1 Aug 2017 10:15:58 -0700 Subject: [PATCH 2/5] Adding console resize bridge tests. --- service/gcs/bridge/bridge_test.go | 31 ++++++++++++++++--- service/gcs/core/gcs/gcs_test.go | 4 +-- service/gcs/core/mockcore/mockcore.go | 25 +++++++++++++-- .../gcs/runtime/mockruntime/mockruntime.go | 5 +++ 4 files changed, 56 insertions(+), 9 deletions(-) diff --git a/service/gcs/bridge/bridge_test.go b/service/gcs/bridge/bridge_test.go index f975167d..c653bae5 100644 --- a/service/gcs/bridge/bridge_test.go +++ b/service/gcs/bridge/bridge_test.go @@ -587,7 +587,7 @@ var _ = Describe("Bridge", func() { Describe("calling resizeConsole", func() { var ( response prot.MessageResponseBase - //callArgs mockcore.ResizeConsoleCall + callArgs mockcore.ResizeConsoleCall ) BeforeEach(func() { messageType = prot.ComputeSystemResizeConsoleV1 @@ -597,7 +597,7 @@ var _ = Describe("Bridge", func() { err := json.Unmarshal([]byte(responseString), &response) Expect(err).NotTo(HaveOccurred()) responseBase = &response - //callArgs = coreint.LastResizeConsole + callArgs = coreint.LastResizeConsole }) Context("the message is normal ASCII", func() { BeforeEach(func() { @@ -613,10 +613,31 @@ var _ = Describe("Bridge", func() { }) AssertNoResponseErrors() AssertActivityIDCorrect() - // TODO: Add tests on callArgs when resizing the console is - // implemented. It("should receive the correct values", func() { - //e.g. Expect(callArgs.Pid).To(Equal(101)) + Expect(callArgs.ID).To(Equal(containerID)) + Expect(callArgs.Pid).To(Equal(101)) + Expect(callArgs.Height).To(Equal(uint16(30))) + Expect(callArgs.Width).To(Equal(uint16(72))) + }) + }) + Context("the message is normal ASCII no containerid", func() { + BeforeEach(func() { + message = prot.ContainerResizeConsole{ + MessageBase: &prot.MessageBase{ + ActivityID: activityID, + }, + ProcessID: 102, + Height: 80, + Width: 80, + } + }) + AssertNoResponseErrors() + AssertActivityIDCorrect() + It("should receive the correct values", func() { + Expect(callArgs.ID).To(Equal("")) + Expect(callArgs.Pid).To(Equal(102)) + Expect(callArgs.Height).To(Equal(uint16(80))) + Expect(callArgs.Width).To(Equal(uint16(80))) }) }) }) diff --git a/service/gcs/core/gcs/gcs_test.go b/service/gcs/core/gcs/gcs_test.go index 9eb1fc12..03e58d40 100644 --- a/service/gcs/core/gcs/gcs_test.go +++ b/service/gcs/core/gcs/gcs_test.go @@ -854,7 +854,7 @@ var _ = Describe("GCS", func() { sigkillOptions = prot.SignalProcessOptions{Signal: int32(syscall.SIGKILL)} }) JustBeforeEach(func() { - err = coreint.SignalProcess(processID, sigkillOptions) + err = coreint.SignalProcess(containerID, processID, sigkillOptions) }) Context("the process has already been created", func() { BeforeEach(func() { @@ -1069,7 +1069,7 @@ var _ = Describe("GCS", func() { pid int ) JustBeforeEach(func() { - err = coreint.RegisterProcessExitHook(pid, func(oslayer.ProcessExitState) {}) + err = coreint.RegisterProcessExitHook(containerID, pid, func(oslayer.ProcessExitState) {}) }) Context("the container has already been created", func() { BeforeEach(func() { diff --git a/service/gcs/core/mockcore/mockcore.go b/service/gcs/core/mockcore/mockcore.go index 876e12db..fa155f8d 100644 --- a/service/gcs/core/mockcore/mockcore.go +++ b/service/gcs/core/mockcore/mockcore.go @@ -65,6 +65,14 @@ type RegisterProcessExitHookCall struct { ExitHook func(oslayer.ProcessExitState) } +// ResizeConsoleCall captures the arguments of ResizeConsole +type ResizeConsoleCall struct { + ID string + Pid int + Height uint16 + Width uint16 +} + // MockCore serves as an argument capture mechanism which implements the Core // interface. Arguments passed to one of its methods are stored to be queried // later. @@ -78,6 +86,7 @@ type MockCore struct { LastModifySettings ModifySettingsCall LastRegisterContainerExitHook RegisterContainerExitHookCall LastRegisterProcessExitHook RegisterProcessExitHookCall + LastResizeConsole ResizeConsoleCall } // CreateContainer captures its arguments and returns a nil error. @@ -106,7 +115,7 @@ func (c *MockCore) SignalContainer(id string, signal oslayer.Signal) error { } // SignalProcess captures its arguments and returns a nil error. -func (c *MockCore) SignalProcess(pid int, options prot.SignalProcessOptions) error { +func (c *MockCore) SignalProcess(id string, pid int, options prot.SignalProcessOptions) error { c.LastSignalProcess = SignalProcessCall{ Pid: pid, Options: options, @@ -159,7 +168,7 @@ func (c *MockCore) RegisterContainerExitHook(id string, exitHook func(oslayer.Pr // RegisterProcessExitHook captures its arguments, runs the given exit hook on // a process exit state with exit code 103, and returns a nil error. -func (c *MockCore) RegisterProcessExitHook(pid int, exitHook func(oslayer.ProcessExitState)) error { +func (c *MockCore) RegisterProcessExitHook(id string, pid int, exitHook func(oslayer.ProcessExitState)) error { c.LastRegisterProcessExitHook = RegisterProcessExitHookCall{ Pid: pid, ExitHook: exitHook, @@ -167,3 +176,15 @@ func (c *MockCore) RegisterProcessExitHook(pid int, exitHook func(oslayer.Proces exitHook(mockos.NewProcessExitState(103)) return nil } + +// ResizeConsole captures its arguments and returns a nil error. +func (c *MockCore) ResizeConsole(id string, pid int, height, width uint16) error { + c.LastResizeConsole = ResizeConsoleCall{ + ID: id, + Pid: pid, + Height: height, + Width: width, + } + + return nil +} diff --git a/service/gcs/runtime/mockruntime/mockruntime.go b/service/gcs/runtime/mockruntime/mockruntime.go index 278949b7..a4170bc5 100644 --- a/service/gcs/runtime/mockruntime/mockruntime.go +++ b/service/gcs/runtime/mockruntime/mockruntime.go @@ -2,6 +2,7 @@ package mockruntime import ( + "os" "sync" "github.com/Microsoft/opengcs/service/gcs/oslayer" @@ -46,6 +47,10 @@ func (c *container) Pid() int { return 101 } +func (c *container) Console() *os.File { + return nil +} + func (c *container) ExecProcess(process oci.Process, stdioSet *stdio.ConnectionSet) (p runtime.Process, err error) { return c, nil } From ab71b3f2c7a694d520be64d2f7262c8b8df6e91d Mon Sep 17 00:00:00 2001 From: "Justin Terry (VM)" Date: Tue, 1 Aug 2017 11:10:35 -0700 Subject: [PATCH 3/5] Synchronizing ResizeConsole --- service/gcs/core/gcs/gcs.go | 16 +++---- .../gcs/runtime/mockruntime/mockruntime.go | 3 +- service/gcs/runtime/runc/runc.go | 17 ++++---- service/gcs/runtime/runtime.go | 3 +- service/gcs/stdio/stdio.go | 42 +++++++++++++++++-- service/gcs/stdio/tty.go | 18 -------- 6 files changed, 56 insertions(+), 43 deletions(-) diff --git a/service/gcs/core/gcs/gcs.go b/service/gcs/core/gcs/gcs.go index 18ff789d..eded1cca 100644 --- a/service/gcs/core/gcs/gcs.go +++ b/service/gcs/core/gcs/gcs.go @@ -118,7 +118,7 @@ func (e *containerCacheEntry) RemoveMappedDirectory(dir prot.MappedDirectory) { type processCacheEntry struct { ExitStatus oslayer.ProcessExitState ExitHooks []func(oslayer.ProcessExitState) - Master *os.File + Tty *stdio.TtyRelay } func newProcessCacheEntry() *processCacheEntry { @@ -219,7 +219,7 @@ func (c *gcsCore) ExecProcess(id string, params prot.ProcessParameters, stdioSet containerEntry.container = container p = container - processEntry.Master = p.Console() + processEntry.Tty = p.Tty() // Configure network adapters in the namespace. for _, adapter := range containerEntry.NetworkAdapters { @@ -267,7 +267,7 @@ func (c *gcsCore) ExecProcess(id string, params prot.ProcessParameters, stdioSet if err != nil { return -1, err } - processEntry.Master = p.Console() + processEntry.Tty = p.Tty() go func() { state, err := p.Wait() @@ -403,10 +403,10 @@ func (c *gcsCore) RunExternalProcess(params prot.ProcessParameters, stdioSet *st cmd.SetEnv(ociProcess.Env) var relay *stdio.TtyRelay - var master *os.File if params.EmulateConsole { // Allocate a console for the process. var ( + master *os.File consolePath string ) master, consolePath, err = stdio.NewConsole() @@ -449,7 +449,7 @@ func (c *gcsCore) RunExternalProcess(params prot.ProcessParameters, stdioSet *st } processEntry := newProcessCacheEntry() - processEntry.Master = master + processEntry.Tty = relay go func() { if err := cmd.Wait(); err != nil { // TODO: When cmd is a shell, and last command in the shell @@ -629,11 +629,11 @@ func (c *gcsCore) ResizeConsole(id string, pid int, height, width uint16) error c.containerCacheMutex.Unlock() } - if err := stdio.ResizeConsole(p.Master, height, width); err != nil { - return errors.Wrapf(err, "failed to resize console for pid: %d", pid) + if p.Tty == nil { + return fmt.Errorf("pid: %d, is not a tty and cannot be resized", pid) } - return nil + return p.Tty.ResizeConsole(height, width) } // setupMappedVirtualDisks is a helper function which calls into the functions diff --git a/service/gcs/runtime/mockruntime/mockruntime.go b/service/gcs/runtime/mockruntime/mockruntime.go index a4170bc5..7d2f8a6f 100644 --- a/service/gcs/runtime/mockruntime/mockruntime.go +++ b/service/gcs/runtime/mockruntime/mockruntime.go @@ -2,7 +2,6 @@ package mockruntime import ( - "os" "sync" "github.com/Microsoft/opengcs/service/gcs/oslayer" @@ -47,7 +46,7 @@ func (c *container) Pid() int { return 101 } -func (c *container) Console() *os.File { +func (c *container) Tty() *stdio.TtyRelay { return nil } diff --git a/service/gcs/runtime/runc/runc.go b/service/gcs/runtime/runc/runc.go index 2aa6d08e..b73834e2 100644 --- a/service/gcs/runtime/runc/runc.go +++ b/service/gcs/runtime/runc/runc.go @@ -48,23 +48,22 @@ func (c *container) Pid() int { return c.init.Pid() } -func (c *container) Console() *os.File { - return c.init.master +func (c *container) Tty() *stdio.TtyRelay { + return c.init.relay } type process struct { - c *container - pid int - relay *stdio.TtyRelay - master *os.File + c *container + pid int + relay *stdio.TtyRelay } func (p *process) Pid() int { return p.pid } -func (p *process) Console() *os.File { - return p.master +func (p *process) Tty() *stdio.TtyRelay { + return p.relay } // NewRuntime instantiates a new runcRuntime struct. @@ -570,5 +569,5 @@ func (c *container) startProcess(tempProcessDir string, hasTerminal bool, stdioS if relay != nil { relay.Start() } - return &process{c: c, pid: pid, relay: relay, master: master}, nil + return &process{c: c, pid: pid, relay: relay}, nil } diff --git a/service/gcs/runtime/runtime.go b/service/gcs/runtime/runtime.go index 0ff7501d..13cb9cff 100644 --- a/service/gcs/runtime/runtime.go +++ b/service/gcs/runtime/runtime.go @@ -4,7 +4,6 @@ package runtime import ( "io" - "os" "github.com/Microsoft/opengcs/service/gcs/oslayer" "github.com/Microsoft/opengcs/service/gcs/stdio" @@ -44,7 +43,7 @@ type Process interface { Wait() (oslayer.ProcessExitState, error) Pid() int Delete() error - Console() *os.File + Tty() *stdio.TtyRelay } // Container is an interface to manipulate container state. diff --git a/service/gcs/stdio/stdio.go b/service/gcs/stdio/stdio.go index b2b2faa2..c78935ed 100644 --- a/service/gcs/stdio/stdio.go +++ b/service/gcs/stdio/stdio.go @@ -4,10 +4,15 @@ import ( "io" "os" "sync" + "sync/atomic" + "syscall" + "unsafe" "github.com/Microsoft/opengcs/service/gcs/transport" "github.com/sirupsen/logrus" "github.com/pkg/errors" + + "golang.org/x/sys/unix" ) // ConnectionSet is a structure defining the readers and writers the Core @@ -100,15 +105,37 @@ func (s *ConnectionSet) Files() (_ *FileSet, err error) { } // NewTtyRelay returns a new TTY relay for a given master PTY file. -func (s *ConnectionSet) NewTtyRelay(pty io.ReadWriteCloser) *TtyRelay { +func (s *ConnectionSet) NewTtyRelay(pty *os.File) *TtyRelay { return &TtyRelay{s: s, pty: pty} } // TtyRelay relays IO between a set of stdio connections and a master PTY file. type TtyRelay struct { - wg sync.WaitGroup - s *ConnectionSet - pty io.ReadWriteCloser + closing int32 + wg sync.WaitGroup + s *ConnectionSet + pty *os.File +} + +// ResizeConsole sends the appropriate resize to a pTTY FD +func (r *TtyRelay) ResizeConsole(height, width uint16) error { + type consoleSize struct { + Height uint16 + Width uint16 + x uint16 + y uint16 + } + + r.wg.Add(1) + defer r.wg.Done() + if atomic.LoadInt32(&r.closing) != 0 { + return errors.New("error resizing console pty is closed") + } + + if _, _, err := syscall.Syscall(syscall.SYS_IOCTL, r.pty.Fd(), uintptr(unix.TIOCSWINSZ), uintptr(unsafe.Pointer(&consoleSize{Height: height, Width: width}))); err != 0 { + return err + } + return nil } // Start starts the relay operation. The caller must call Wait to wait @@ -151,6 +178,13 @@ func (r *TtyRelay) Wait() { // Wait for all users of stdioSet and master to finish before closing them. r.wg.Wait() + + // Given the expected use of wait we cannot increment closing before calling r.wg.Wait() + // or all calls to ResizeConsole would fail. However, by calling it after there is a very + // small window that ResizeConsole could still get an invalid Fd. We call wait again to + // enusure that no ResizeConsole call came in before actualling closing the pty. + atomic.StoreInt32(&r.closing, 1) + r.wg.Wait() r.pty.Close() r.s.Close() } diff --git a/service/gcs/stdio/tty.go b/service/gcs/stdio/tty.go index 7524de9f..d2cdafd9 100644 --- a/service/gcs/stdio/tty.go +++ b/service/gcs/stdio/tty.go @@ -7,17 +7,8 @@ import ( "unsafe" "github.com/pkg/errors" - - "golang.org/x/sys/unix" ) -type consoleSize struct { - Height uint16 - Width uint16 - x uint16 - y uint16 -} - // NewConsole allocates a new console and returns the File for its master and // path for its slave. func NewConsole() (*os.File, string, error) { @@ -42,15 +33,6 @@ func NewConsole() (*os.File, string, error) { return master, console, nil } -// ResizeConsole sends the appropriate resize to a pTTY FD -func ResizeConsole(master *os.File, height, width uint16) error { - if master == nil { - return errors.New("failed to resize console with null master fd") - } - - return ioctl(master.Fd(), uintptr(unix.TIOCSWINSZ), uintptr(unsafe.Pointer(&consoleSize{Height: height, Width: width}))) -} - func ioctl(fd uintptr, flag, data uintptr) error { if _, _, err := syscall.Syscall(syscall.SYS_IOCTL, fd, flag, data); err != 0 { return err From b5e25bab13b9b613de9262f8d3beaf385d0e00bb Mon Sep 17 00:00:00 2001 From: "Justin Terry (VM)" Date: Tue, 1 Aug 2017 14:02:49 -0700 Subject: [PATCH 4/5] Change to use mutex for sync Removes the per container cache for processes as they are all external pid's Changes to use a mutex instead of the wait group for ResizeConsole so we are confident of order. Puts back some of the containerID overloads because HCS always sends this even for external processes. --- service/gcs/bridge/bridge.go | 6 +- service/gcs/bridge/bridge_test.go | 21 ---- service/gcs/core/core.go | 6 +- service/gcs/core/gcs/gcs.go | 167 +++++++++----------------- service/gcs/core/gcs/gcs_test.go | 4 +- service/gcs/core/mockcore/mockcore.go | 8 +- service/gcs/runtime/runc/runc.go | 2 +- service/gcs/stdio/stdio.go | 42 ++----- service/gcs/stdio/tty.go | 14 +++ 9 files changed, 94 insertions(+), 176 deletions(-) diff --git a/service/gcs/bridge/bridge.go b/service/gcs/bridge/bridge.go index a02b786f..9b08733e 100644 --- a/service/gcs/bridge/bridge.go +++ b/service/gcs/bridge/bridge.go @@ -276,7 +276,7 @@ func (b *bridge) signalProcess(message []byte) (*prot.MessageResponseBase, error } response.ActivityID = request.ActivityID - if err := b.coreint.SignalProcess(request.ContainerID, int(request.ProcessID), request.Options); err != nil { + if err := b.coreint.SignalProcess(int(request.ProcessID), request.Options); err != nil { return response, err } @@ -348,7 +348,7 @@ func (b *bridge) waitOnProcess(message []byte, header *prot.MessageHeader) (*pro logrus.Error(errors.Wrapf(err, "failed to send process exit response \"%v\"", response)) } } - if err := b.coreint.RegisterProcessExitHook(request.ContainerID, int(request.ProcessID), exitHook); err != nil { + if err := b.coreint.RegisterProcessExitHook(int(request.ProcessID), exitHook); err != nil { return response, err } @@ -363,7 +363,7 @@ func (b *bridge) resizeConsole(message []byte) (*prot.MessageResponseBase, error } response.ActivityID = request.ActivityID - if err := b.coreint.ResizeConsole(request.ContainerID, int(request.ProcessID), request.Height, request.Width); err != nil { + if err := b.coreint.ResizeConsole(int(request.ProcessID), request.Height, request.Width); err != nil { return response, err } diff --git a/service/gcs/bridge/bridge_test.go b/service/gcs/bridge/bridge_test.go index c653bae5..04e55fec 100644 --- a/service/gcs/bridge/bridge_test.go +++ b/service/gcs/bridge/bridge_test.go @@ -614,32 +614,11 @@ var _ = Describe("Bridge", func() { AssertNoResponseErrors() AssertActivityIDCorrect() It("should receive the correct values", func() { - Expect(callArgs.ID).To(Equal(containerID)) Expect(callArgs.Pid).To(Equal(101)) Expect(callArgs.Height).To(Equal(uint16(30))) Expect(callArgs.Width).To(Equal(uint16(72))) }) }) - Context("the message is normal ASCII no containerid", func() { - BeforeEach(func() { - message = prot.ContainerResizeConsole{ - MessageBase: &prot.MessageBase{ - ActivityID: activityID, - }, - ProcessID: 102, - Height: 80, - Width: 80, - } - }) - AssertNoResponseErrors() - AssertActivityIDCorrect() - It("should receive the correct values", func() { - Expect(callArgs.ID).To(Equal("")) - Expect(callArgs.Pid).To(Equal(102)) - Expect(callArgs.Height).To(Equal(uint16(80))) - Expect(callArgs.Width).To(Equal(uint16(80))) - }) - }) }) Describe("calling modifySettings", func() { diff --git a/service/gcs/core/core.go b/service/gcs/core/core.go index fe97e2b6..8b99249e 100644 --- a/service/gcs/core/core.go +++ b/service/gcs/core/core.go @@ -16,11 +16,11 @@ type Core interface { CreateContainer(id string, info prot.VMHostedContainerSettings) error ExecProcess(id string, info prot.ProcessParameters, stdioSet *stdio.ConnectionSet) (pid int, err error) SignalContainer(id string, signal oslayer.Signal) error - SignalProcess(id string, pid int, options prot.SignalProcessOptions) error + SignalProcess(pid int, options prot.SignalProcessOptions) error ListProcesses(id string) ([]runtime.ContainerProcessState, error) RunExternalProcess(info prot.ProcessParameters, stdioSet *stdio.ConnectionSet) (pid int, err error) ModifySettings(id string, request prot.ResourceModificationRequestResponse) error RegisterContainerExitHook(id string, onExit func(oslayer.ProcessExitState)) error - RegisterProcessExitHook(id string, pid int, onExit func(oslayer.ProcessExitState)) error - ResizeConsole(id string, pid int, height, width uint16) error + RegisterProcessExitHook(pid int, onExit func(oslayer.ProcessExitState)) error + ResizeConsole(pid int, height, width uint16) error } diff --git a/service/gcs/core/gcs/gcs.go b/service/gcs/core/gcs/gcs.go index eded1cca..29c8bb54 100644 --- a/service/gcs/core/gcs/gcs.go +++ b/service/gcs/core/gcs/gcs.go @@ -37,20 +37,20 @@ type gcsCore struct { // ID to cache entry. containerCache map[string]*containerCacheEntry - externalProcessCacheMutex sync.RWMutex + processCacheMutex sync.RWMutex // externalProcessCache stores information about external processes which // persists between calls into the gcsCore. It is structured as a map from // pid to cache entry. - externalProcessCache map[int]*processCacheEntry + processCache map[int]*processCacheEntry } // NewGCSCore creates a new gcsCore struct initialized with the given Runtime. func NewGCSCore(rtime runtime.Runtime, os oslayer.OS) *gcsCore { return &gcsCore{ - Rtime: rtime, - OS: os, - containerCache: make(map[string]*containerCacheEntry), - externalProcessCache: make(map[int]*processCacheEntry), + Rtime: rtime, + OS: os, + containerCache: make(map[string]*containerCacheEntry), + processCache: make(map[int]*processCacheEntry), } } @@ -63,12 +63,7 @@ type containerCacheEntry struct { MappedDirectories map[uint32]prot.MappedDirectory NetworkAdapters []prot.NetworkAdapter container runtime.Container - - processCacheMutex sync.RWMutex - // processCache stores information about processes which persists between - // calls into the gcsCore. It is structured as a map from pid to cache - // entry. - processCache map[int]*processCacheEntry + hasRunInitProcess bool } func newContainerCacheEntry(id string) *containerCacheEntry { @@ -76,7 +71,6 @@ func newContainerCacheEntry(id string) *containerCacheEntry { ID: id, MappedVirtualDisks: make(map[uint8]prot.MappedVirtualDisk), MappedDirectories: make(map[uint32]prot.MappedDirectory), - processCache: make(map[int]*processCacheEntry), } } func (e *containerCacheEntry) AddExitHook(hook func(oslayer.ProcessExitState)) { @@ -116,13 +110,14 @@ func (e *containerCacheEntry) RemoveMappedDirectory(dir prot.MappedDirectory) { // processCacheEntry stores cached information for a single process. type processCacheEntry struct { - ExitStatus oslayer.ProcessExitState - ExitHooks []func(oslayer.ProcessExitState) - Tty *stdio.TtyRelay + ExitStatus oslayer.ProcessExitState + ExitHooks []func(oslayer.ProcessExitState) + Tty *stdio.TtyRelay + ContainerID string // If "" a host process otherwise a container process. } -func newProcessCacheEntry() *processCacheEntry { - return &processCacheEntry{} +func newProcessCacheEntry(containerID string) *processCacheEntry { + return &processCacheEntry{ContainerID: containerID} } func (e *processCacheEntry) AddExitHook(hook func(oslayer.ProcessExitState)) { e.ExitHooks = append(e.ExitHooks, hook) @@ -200,14 +195,11 @@ func (c *gcsCore) ExecProcess(id string, params prot.ProcessParameters, stdioSet if containerEntry == nil { return -1, errors.WithStack(gcserr.NewContainerDoesNotExistError(id)) } - processEntry := newProcessCacheEntry() + processEntry := newProcessCacheEntry(id) var p runtime.Process - - containerEntry.processCacheMutex.Lock() - isInitProcess := len(containerEntry.processCache) == 0 - containerEntry.processCacheMutex.Unlock() - if isInitProcess { + if !containerEntry.hasRunInitProcess { + containerEntry.hasRunInitProcess = true if err := c.writeConfigFile(id, params.OCISpecification); err != nil { return -1, err } @@ -244,13 +236,17 @@ func (c *gcsCore) ExecProcess(id string, params prot.ProcessParameters, stdioSet } c.containerCacheMutex.Unlock() - containerEntry.processCacheMutex.Lock() + c.processCacheMutex.Lock() processEntry.ExitStatus = state for _, hook := range processEntry.ExitHooks { hook(state) } - containerEntry.processCacheMutex.Unlock() + c.processCacheMutex.Unlock() c.containerCacheMutex.Lock() + containerEntry.ExitStatus = state + for _, hook := range containerEntry.ExitHooks { + hook(state) + } delete(c.containerCache, id) c.containerCacheMutex.Unlock() }() @@ -276,19 +272,19 @@ func (c *gcsCore) ExecProcess(id string, params prot.ProcessParameters, stdioSet } logrus.Infof("container process %d exited with exit status %d", p.Pid(), state.ExitCode()) - containerEntry.processCacheMutex.Lock() + c.processCacheMutex.Lock() processEntry.ExitStatus = state for _, hook := range processEntry.ExitHooks { hook(state) } - containerEntry.processCacheMutex.Unlock() + c.processCacheMutex.Unlock() if err := p.Delete(); err != nil { logrus.Error(err) } }() } - containerEntry.processCacheMutex.Lock() + c.processCacheMutex.Lock() // If a processCacheEntry with the given pid already exists in the cache, // this will overwrite it. This behavior is expected. Processes are kept in // the cache even after they exit, which allows for exit hooks registered @@ -301,8 +297,8 @@ func (c *gcsCore) ExecProcess(id string, params prot.ProcessParameters, stdioSet // apply to the old process no longer makes sense, so since the old // process's pid has been reused, its cache entry can also be reused. This // applies to external processes as well. - containerEntry.processCache[p.Pid()] = processEntry - containerEntry.processCacheMutex.Unlock() + c.processCache[p.Pid()] = processEntry + c.processCacheMutex.Unlock() return p.Pid(), nil } @@ -326,30 +322,13 @@ func (c *gcsCore) SignalContainer(id string, signal oslayer.Signal) error { } // SignalProcess sends the signal specified in options to the given process. -func (c *gcsCore) SignalProcess(id string, pid int, options prot.SignalProcessOptions) error { - if id == "" { - c.externalProcessCacheMutex.Lock() - if _, ok := c.externalProcessCache[pid]; !ok { - c.externalProcessCacheMutex.Unlock() - return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid)) - } - c.externalProcessCacheMutex.Unlock() - } else { - c.containerCacheMutex.Lock() - containerEntry := c.getContainer(id) - if containerEntry == nil { - c.containerCacheMutex.Unlock() - return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid)) - } - containerEntry.processCacheMutex.Lock() - if _, ok := containerEntry.processCache[pid]; !ok { - containerEntry.processCacheMutex.Unlock() - c.containerCacheMutex.Unlock() - return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid)) - } - containerEntry.processCacheMutex.Unlock() - c.containerCacheMutex.Unlock() +func (c *gcsCore) SignalProcess(pid int, options prot.SignalProcessOptions) error { + c.processCacheMutex.Lock() + if _, ok := c.processCache[pid]; !ok { + c.processCacheMutex.Unlock() + return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid)) } + c.processCacheMutex.Unlock() // Interpret signal value 0 as SIGKILL. // TODO: Remove this special casing when we are not worried about breaking @@ -448,7 +427,7 @@ func (c *gcsCore) RunExternalProcess(params prot.ProcessParameters, stdioSet *st relay.Start() } - processEntry := newProcessCacheEntry() + processEntry := newProcessCacheEntry("") processEntry.Tty = relay go func() { if err := cmd.Wait(); err != nil { @@ -467,18 +446,18 @@ func (c *gcsCore) RunExternalProcess(params prot.ProcessParameters, stdioSet *st // Run exit hooks for the process. state := cmd.ExitState() - c.externalProcessCacheMutex.Lock() + c.processCacheMutex.Lock() processEntry.ExitStatus = state for _, hook := range processEntry.ExitHooks { hook(state) } - c.externalProcessCacheMutex.Unlock() + c.processCacheMutex.Unlock() }() pid = cmd.Process().Pid() - c.externalProcessCacheMutex.Lock() - c.externalProcessCache[pid] = processEntry - c.externalProcessCacheMutex.Unlock() + c.processCacheMutex.Lock() + c.processCache[pid] = processEntry + c.processCacheMutex.Unlock() return pid, nil } @@ -562,30 +541,14 @@ func (c *gcsCore) RegisterContainerExitHook(id string, exitHook func(oslayer.Pro // process may have multiple exit hooks registered for it. // This function works for both processes that are running in a container, and // ones that are running externally to a container. -func (c *gcsCore) RegisterProcessExitHook(id string, pid int, exitHook func(oslayer.ProcessExitState)) error { - var ( - entry *processCacheEntry - ok bool - ) - - if id == "" { - c.externalProcessCacheMutex.Lock() - defer c.externalProcessCacheMutex.Unlock() - if entry, ok = c.externalProcessCache[pid]; !ok { - return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid)) - } - } else { - c.containerCacheMutex.Lock() - defer c.containerCacheMutex.Unlock() - containerEntry := c.getContainer(id) - if containerEntry == nil { - return errors.WithStack(gcserr.NewContainerDoesNotExistError(id)) - } - containerEntry.processCacheMutex.Lock() - defer containerEntry.processCacheMutex.Unlock() - if entry, ok = containerEntry.processCache[pid]; !ok { - return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid)) - } +func (c *gcsCore) RegisterProcessExitHook(pid int, exitHook func(oslayer.ProcessExitState)) error { + c.processCacheMutex.Lock() + defer c.processCacheMutex.Unlock() + + var entry *processCacheEntry + var ok bool + if entry, ok = c.processCache[pid]; !ok { + return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid)) } exitStatus := entry.ExitStatus @@ -599,35 +562,15 @@ func (c *gcsCore) RegisterProcessExitHook(id string, pid int, exitHook func(osla return nil } -func (c *gcsCore) ResizeConsole(id string, pid int, height, width uint16) error { - var ( - p *processCacheEntry - ok bool - ) - - if id == "" { - c.externalProcessCacheMutex.Lock() - if p, ok = c.externalProcessCache[pid]; !ok { - c.externalProcessCacheMutex.Unlock() - return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid)) - } - c.externalProcessCacheMutex.Unlock() - } else { - c.containerCacheMutex.Lock() - containerEntry := c.getContainer(id) - if containerEntry == nil { - c.containerCacheMutex.Unlock() - return errors.WithStack(gcserr.NewContainerDoesNotExistError(id)) - } - containerEntry.processCacheMutex.Lock() - if p, ok = containerEntry.processCache[pid]; !ok { - containerEntry.processCacheMutex.Unlock() - c.containerCacheMutex.Unlock() - return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid)) - } - containerEntry.processCacheMutex.Unlock() - c.containerCacheMutex.Unlock() +func (c *gcsCore) ResizeConsole(pid int, height, width uint16) error { + c.processCacheMutex.Lock() + var p *processCacheEntry + var ok bool + if p, ok = c.processCache[pid]; !ok { + c.processCacheMutex.Unlock() + return errors.WithStack(gcserr.NewProcessDoesNotExistError(pid)) } + c.processCacheMutex.Unlock() if p.Tty == nil { return fmt.Errorf("pid: %d, is not a tty and cannot be resized", pid) diff --git a/service/gcs/core/gcs/gcs_test.go b/service/gcs/core/gcs/gcs_test.go index 03e58d40..9eb1fc12 100644 --- a/service/gcs/core/gcs/gcs_test.go +++ b/service/gcs/core/gcs/gcs_test.go @@ -854,7 +854,7 @@ var _ = Describe("GCS", func() { sigkillOptions = prot.SignalProcessOptions{Signal: int32(syscall.SIGKILL)} }) JustBeforeEach(func() { - err = coreint.SignalProcess(containerID, processID, sigkillOptions) + err = coreint.SignalProcess(processID, sigkillOptions) }) Context("the process has already been created", func() { BeforeEach(func() { @@ -1069,7 +1069,7 @@ var _ = Describe("GCS", func() { pid int ) JustBeforeEach(func() { - err = coreint.RegisterProcessExitHook(containerID, pid, func(oslayer.ProcessExitState) {}) + err = coreint.RegisterProcessExitHook(pid, func(oslayer.ProcessExitState) {}) }) Context("the container has already been created", func() { BeforeEach(func() { diff --git a/service/gcs/core/mockcore/mockcore.go b/service/gcs/core/mockcore/mockcore.go index fa155f8d..2cb7e136 100644 --- a/service/gcs/core/mockcore/mockcore.go +++ b/service/gcs/core/mockcore/mockcore.go @@ -67,7 +67,6 @@ type RegisterProcessExitHookCall struct { // ResizeConsoleCall captures the arguments of ResizeConsole type ResizeConsoleCall struct { - ID string Pid int Height uint16 Width uint16 @@ -115,7 +114,7 @@ func (c *MockCore) SignalContainer(id string, signal oslayer.Signal) error { } // SignalProcess captures its arguments and returns a nil error. -func (c *MockCore) SignalProcess(id string, pid int, options prot.SignalProcessOptions) error { +func (c *MockCore) SignalProcess(pid int, options prot.SignalProcessOptions) error { c.LastSignalProcess = SignalProcessCall{ Pid: pid, Options: options, @@ -168,7 +167,7 @@ func (c *MockCore) RegisterContainerExitHook(id string, exitHook func(oslayer.Pr // RegisterProcessExitHook captures its arguments, runs the given exit hook on // a process exit state with exit code 103, and returns a nil error. -func (c *MockCore) RegisterProcessExitHook(id string, pid int, exitHook func(oslayer.ProcessExitState)) error { +func (c *MockCore) RegisterProcessExitHook(pid int, exitHook func(oslayer.ProcessExitState)) error { c.LastRegisterProcessExitHook = RegisterProcessExitHookCall{ Pid: pid, ExitHook: exitHook, @@ -178,9 +177,8 @@ func (c *MockCore) RegisterProcessExitHook(id string, pid int, exitHook func(osl } // ResizeConsole captures its arguments and returns a nil error. -func (c *MockCore) ResizeConsole(id string, pid int, height, width uint16) error { +func (c *MockCore) ResizeConsole(pid int, height, width uint16) error { c.LastResizeConsole = ResizeConsoleCall{ - ID: id, Pid: pid, Height: height, Width: width, diff --git a/service/gcs/runtime/runc/runc.go b/service/gcs/runtime/runc/runc.go index b73834e2..6b454cb7 100644 --- a/service/gcs/runtime/runc/runc.go +++ b/service/gcs/runtime/runc/runc.go @@ -536,8 +536,8 @@ func (c *container) startProcess(tempProcessDir string, hasTerminal bool, stdioS } var relay *stdio.TtyRelay - var master *os.File if hasTerminal { + var master *os.File master, err = c.r.getMasterFromSocket(sockListener) if err != nil { cmd.Process.Kill() diff --git a/service/gcs/stdio/stdio.go b/service/gcs/stdio/stdio.go index c78935ed..ced97692 100644 --- a/service/gcs/stdio/stdio.go +++ b/service/gcs/stdio/stdio.go @@ -4,15 +4,10 @@ import ( "io" "os" "sync" - "sync/atomic" - "syscall" - "unsafe" "github.com/Microsoft/opengcs/service/gcs/transport" "github.com/sirupsen/logrus" "github.com/pkg/errors" - - "golang.org/x/sys/unix" ) // ConnectionSet is a structure defining the readers and writers the Core @@ -111,31 +106,22 @@ func (s *ConnectionSet) NewTtyRelay(pty *os.File) *TtyRelay { // TtyRelay relays IO between a set of stdio connections and a master PTY file. type TtyRelay struct { - closing int32 - wg sync.WaitGroup - s *ConnectionSet - pty *os.File + m sync.Mutex + closed bool + wg sync.WaitGroup + s *ConnectionSet + pty *os.File } // ResizeConsole sends the appropriate resize to a pTTY FD func (r *TtyRelay) ResizeConsole(height, width uint16) error { - type consoleSize struct { - Height uint16 - Width uint16 - x uint16 - y uint16 - } + r.m.Lock() + defer r.m.Unlock() - r.wg.Add(1) - defer r.wg.Done() - if atomic.LoadInt32(&r.closing) != 0 { + if r.closed { return errors.New("error resizing console pty is closed") } - - if _, _, err := syscall.Syscall(syscall.SYS_IOCTL, r.pty.Fd(), uintptr(unix.TIOCSWINSZ), uintptr(unsafe.Pointer(&consoleSize{Height: height, Width: width}))); err != 0 { - return err - } - return nil + return ResizeConsole(r.pty, height, width) } // Start starts the relay operation. The caller must call Wait to wait @@ -179,12 +165,10 @@ func (r *TtyRelay) Wait() { // Wait for all users of stdioSet and master to finish before closing them. r.wg.Wait() - // Given the expected use of wait we cannot increment closing before calling r.wg.Wait() - // or all calls to ResizeConsole would fail. However, by calling it after there is a very - // small window that ResizeConsole could still get an invalid Fd. We call wait again to - // enusure that no ResizeConsole call came in before actualling closing the pty. - atomic.StoreInt32(&r.closing, 1) - r.wg.Wait() + r.m.Lock() + defer r.m.Unlock() + r.pty.Close() + r.closed = true r.s.Close() } diff --git a/service/gcs/stdio/tty.go b/service/gcs/stdio/tty.go index d2cdafd9..c1da83a6 100644 --- a/service/gcs/stdio/tty.go +++ b/service/gcs/stdio/tty.go @@ -7,6 +7,7 @@ import ( "unsafe" "github.com/pkg/errors" + "golang.org/x/sys/unix" ) // NewConsole allocates a new console and returns the File for its master and @@ -33,6 +34,19 @@ func NewConsole() (*os.File, string, error) { return master, console, nil } +// ResizeConsole sends the appropriate resize to a pTTY FD +// Synchronization of pty should be handled in the callers context. +func ResizeConsole(pty *os.File, height, width uint16) error { + type consoleSize struct { + Height uint16 + Width uint16 + x uint16 + y uint16 + } + + return ioctl(pty.Fd(), uintptr(unix.TIOCSWINSZ), uintptr(unsafe.Pointer(&consoleSize{Height: height, Width: width}))) +} + func ioctl(fd uintptr, flag, data uintptr) error { if _, _, err := syscall.Syscall(syscall.SYS_IOCTL, fd, flag, data); err != 0 { return err From 32fdcada34a17dc7172ae708bc5826d38dc79e8e Mon Sep 17 00:00:00 2001 From: "Justin Terry (VM)" Date: Fri, 4 Aug 2017 09:11:09 -0700 Subject: [PATCH 5/5] Updating comment --- service/gcs/core/gcs/gcs.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/service/gcs/core/gcs/gcs.go b/service/gcs/core/gcs/gcs.go index 29c8bb54..65219564 100644 --- a/service/gcs/core/gcs/gcs.go +++ b/service/gcs/core/gcs/gcs.go @@ -16,10 +16,10 @@ import ( "github.com/Microsoft/opengcs/service/gcs/prot" "github.com/Microsoft/opengcs/service/gcs/runtime" "github.com/Microsoft/opengcs/service/gcs/stdio" - "github.com/sirupsen/logrus" shellwords "github.com/mattn/go-shellwords" oci "github.com/opencontainers/runtime-spec/specs-go" "github.com/pkg/errors" + "github.com/sirupsen/logrus" ) // gcsCore is an implementation of the Core interface, defining the @@ -38,9 +38,8 @@ type gcsCore struct { containerCache map[string]*containerCacheEntry processCacheMutex sync.RWMutex - // externalProcessCache stores information about external processes which - // persists between calls into the gcsCore. It is structured as a map from - // pid to cache entry. + // processCache stores information about processes which persists between calls + // into the gcsCore. It is structured as a map from pid to cache entry. processCache map[int]*processCacheEntry }