Skip to content

Commit

Permalink
agent: refactor termination and stream management
Browse files Browse the repository at this point in the history
We've already seen several manifestations of golang/go#23019, so this
commit refactors the way that agent input, output, and error streams are
managed, as well as the way that agent process termination is signaled.

Historically (in 6c1a47c), we avoided closing
standard input/output pipes because they were blocking, meaning that a
close wouldn't preempt a read/write and that the close itself could
potentially block. However, this hasn't been the case since Go 1.9, when
os.File was switched to use polling I/O (at least for pipes and other
pollable files).

As such, we can now use closure of standard input as a signal to agent
processes (via their intermediate transport processes) that they should
terminate. Failing that, we still fall back to signal-based termination,
but this standard input closure mechanism is particularly important on
Windows, where no "soft" signaling mechanism (like SIGTERM) exists and
transport process termination via TerminateProcess often triggers
golang/go#23019. This is especially problematic with Docker Desktop,
where an intermediate com.docker.cli process is used underneath the
primary Docker CLI, and standard input closure is the only reliable
termination signaling mechanism.

Just to clarify, there are mechanisms like WM_CLOSE and CTRL_CLOSE_EVENT
on Windows, which some runtimes (such as Go's) will convert to a
SIGTERM, but there's no telling how intermediate transport processes
will interpret these messages. They don't necessarily have the same
semantics as SIGTERM.

And, just in case none of our signaling mechanisms works, we now avoid
using os/exec.Cmd's forwarding Goroutines entirely, meaning that its
Wait method will close all pipes as soon as the child process is
terminated.

As part of this refactor, I've also looked at switching to a more
systematic approach to manage the process hierarchies that could
potentially be generated by transport processes. Things like killpg on
POSIX or job objects on Windows could theoretically facilitate such
management. However, the fact of the matter is that there's simply no
way to reliably enforce termination of such hierarchies and, more
importantly, no way for Mutagen to know what termination signaling
mechanism would be appropriate for any intermediate processes.
Essentially, we just have to rely on transport processes to either
correctly forwarded standard input closure (especially on Windows)
and/or correctly forward SIGTERM on POSIX. But, if they don't, we will
forcibly terminate them and any associated resources in the Mutagen
daemon. If their subprocesses linger on afterward, that's a bug in the
transport process, not Mutagen.

This commit also takes the opportunity to impose a size limit on the
in-memory error buffer used to capture transport errors after a
handshake failure.

Updates #223
Updates mutagen-io/mutagen-compose#11

Backport of 8556e07

Signed-off-by: Jacob Howard <jacob@mutagen.io>
  • Loading branch information
xenoscopic committed Feb 4, 2022
1 parent 4debd08 commit 2ae56da
Show file tree
Hide file tree
Showing 7 changed files with 312 additions and 203 deletions.
72 changes: 40 additions & 32 deletions pkg/agent/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@ import (
"time"
"unicode/utf8"

transportpkg "github.com/mutagen-io/mutagen/pkg/agent/transport"
"github.com/mutagen-io/mutagen/pkg/filesystem"
"github.com/mutagen-io/mutagen/pkg/logging"
"github.com/mutagen-io/mutagen/pkg/mutagen"
"github.com/mutagen-io/mutagen/pkg/process"
"github.com/mutagen-io/mutagen/pkg/prompting"
streampkg "github.com/mutagen-io/mutagen/pkg/stream"
)

const (
// agentKillDelay is the maximum amount of time that Mutagen will wait for
// an agent process to exit on its own in the event of an error before
// issuing a kill request. It's necessary to make this non-zero because
// agent transport executables need to be allowed time to exit on failure so
// that we can get their true exit codes instead of killing them immediately
// on a handshake error.
agentKillDelay = 5 * time.Second
// agentTerminationDelay is the maximum amount of time that Mutagen will
// wait for an agent process to terminate on its own in the event of a
// handshake error before forcing termination.
agentTerminationDelay = 5 * time.Second
// agentErrorInMemoryCutoff is the maximum number of bytes that Mutagen will
// capture in memory from the standard error output of an agent process.
agentErrorInMemoryCutoff = 32 * 1024
)

// connect connects to an agent-based endpoint using the specified transport,
Expand Down Expand Up @@ -67,7 +67,7 @@ func connect(logger *logging.Logger, transport Transport, mode, prompter string,
// Compute the command to invoke.
command := fmt.Sprintf("%s %s", agentInvocationPath, mode)

// Create an agent process.
// Set up (but do not start) an agent process.
message := "Connecting to agent (POSIX)..."
if cmdExe {
message = "Connecting to agent (Windows)..."
Expand All @@ -80,28 +80,36 @@ func connect(logger *logging.Logger, transport Transport, mode, prompter string,
return nil, false, false, fmt.Errorf("unable to create agent command: %w", err)
}

// Create a stream that wraps the process' standard input/output. We set a
// non-zero kill delay so that, if there's a handshake failure, the process
// will be allowed to exit with its natural exit code (instead of an exit
// code due to forced termination) that we can use to diagnose the issue.
stream, err := process.NewStream(agentProcess, agentKillDelay)
if err != nil {
return nil, false, false, fmt.Errorf("unable to create agent process stream: %w", err)
}

// Create a buffer that we can use to capture the process' standard error
// output in order to give better feedback when there's an error.
errorBuffer := bytes.NewBuffer(nil)

// Wrap the error buffer in a valveWriter and defer the closure of that
// writer. This avoids continuing to pipe output into the buffer for the
// lifetime of the process.
errorWriter := streampkg.NewValveWriter(errorBuffer)
defer errorWriter.Shut()

// Redirect the process' standard error output to a tee'd writer that writes
// to both our buffer (via the valveWriter) and the logger.
agentProcess.Stderr = io.MultiWriter(errorWriter, logger.Sublogger("remote").Writer(logging.LevelInfo))
// Create a cutoff for the error buffer that avoids using large amounts of
// memory (while still being sufficiently large to capture any reasonable
// human-readable error message).
errorCutoff := streampkg.NewCutoffWriter(errorBuffer, agentErrorInMemoryCutoff)

// Create a valve that we can use to stop recording the error output once
// this function returns (at which point the error will already have been
// captured or not have occurred).
errorValve := streampkg.NewValveWriter(errorCutoff)
defer errorValve.Shut()

// Create a splitter that will forward standard error output to both the
// error buffer and the logger.
errorTee := io.MultiWriter(errorValve, logger.Sublogger("remote").Writer(logging.LevelInfo))

// Create a transport stream to communicate with the process and forward
// standard error output. Set a non-zero termination delay for the stream so
// that (in the event of a handshake failure) the process will be allowed to
// exit with its natural exit code (instead of an exit code due to forced
// termination) and will be able to yield some error output for diagnosing
// the issue.
stream, err := transportpkg.NewStream(agentProcess, errorTee)
if err != nil {
return nil, false, false, fmt.Errorf("unable to create agent process stream: %w", err)
}
stream.SetTerminationDelay(agentTerminationDelay)

// Start the process.
if err = agentProcess.Start(); err != nil {
Expand All @@ -111,10 +119,10 @@ func connect(logger *logging.Logger, transport Transport, mode, prompter string,
// Perform a handshake with the remote to ensure that we're talking with a
// Mutagen agent.
if err := ClientHandshake(stream); err != nil {
// Close the stream to ensure that the underlying process and its
// Close the stream to ensure that the underlying process and any
// I/O-forwarding Goroutines have terminated. The error returned from
// Close will be non-nil if the process exits with a non-0 exit code, so
// we don't want to check it, but process.Stream guarantees that if
// we don't want to check it, but transport.Stream guarantees that if
// Close returns, then the underlying process has fully terminated,
// which is all we care about.
stream.Close()
Expand Down Expand Up @@ -152,9 +160,9 @@ func connect(logger *logging.Logger, transport Transport, mode, prompter string,
return nil, tryInstall, cmdExe, errors.New("unable to handshake with agent process")
}

// Now that we've successfully connected, disable the kill delay on the
// process stream.
stream.SetKillDelay(time.Duration(0))
// Now that we've successfully connected, disable the termination delay on
// the process stream.
stream.SetTerminationDelay(time.Duration(0))

// Perform a version handshake.
if err := mutagen.ClientVersionHandshake(stream); err != nil {
Expand Down
209 changes: 209 additions & 0 deletions pkg/agent/transport/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package transport

import (
"fmt"
"io"
"os/exec"
"runtime"
"sync"
"syscall"
"time"
)

// Stream implements io.ReadWriteCloser using the standard input and output
// streams of an agent process, with closure implemented via termination
// signaling heuristics designed to shut down agent processes reliably. It
// guarantees that its Close method unblocks pending Read and Write calls. It
// also provides optional forwarding of the process' standard error stream.
type Stream struct {
// process is the underlying process.
process *exec.Cmd
// standardInput is the process' standard input stream.
standardInput io.WriteCloser
// standardOutput is the process' standard output stream.
standardOutput io.Reader
// terminationDelayLock restricts access to terminationDelay.
terminationDelayLock sync.Mutex
// terminationDelay specifies the duration that the stream should wait for
// the underlying process to exit on its own before performing termination.
terminationDelay time.Duration
}

// NewStream creates a new Stream instance that wraps the specified command
// object. It must be called before the corresponding process is started and no
// other I/O redirection should be performed for the process. If this function
// fails, the command should be considered unusable. If standardErrorReceiver is
// non-nil, then the process' standard error output will be forwarded to it.
func NewStream(process *exec.Cmd, standardErrorReceiver io.Writer) (*Stream, error) {
// Create a pipe to the process' standard input stream.
standardInput, err := process.StdinPipe()
if err != nil {
return nil, fmt.Errorf("unable to redirect process input: %w", err)
}

// Create a pipe from the process' standard output stream.
standardOutput, err := process.StdoutPipe()
if err != nil {
standardInput.Close()
return nil, fmt.Errorf("unable to redirect process output: %w", err)
}

// If a standard error receiver has been specified, then create a pipe from
// the process' standard error stream and forward it to the receiver. We do
// this manually (instead of just assigning the receiver to process.Stderr)
// to avoid golang/go#23019. We perform the same closure on the standard
// error stream as os/exec's standard forwarding Goroutines, a fix designed
// to avoid golang/go#10400.
if standardErrorReceiver != nil {
standardError, err := process.StderrPipe()
if err != nil {
standardInput.Close()
standardOutput.Close()
return nil, fmt.Errorf("unable to redirect process error output: %w", err)
}
go func() {
io.Copy(standardErrorReceiver, standardError)
standardError.Close()
}()
}

// Create the result.
return &Stream{
process: process,
standardInput: standardInput,
standardOutput: standardOutput,
}, nil
}

// Read implements io.Reader.Read.
func (s *Stream) Read(buffer []byte) (int, error) {
return s.standardOutput.Read(buffer)
}

// Write implements io.Writer.Write.
func (s *Stream) Write(buffer []byte) (int, error) {
return s.standardInput.Write(buffer)
}

// SetTerminationDelay sets the termination delay for the stream. This method
// will panic if terminationDelay is negative. This method is safe to call
// concurrently with Close, though, if called concurrently, there is no
// guarantee that the new delay will be set in time for Close to use it.
func (s *Stream) SetTerminationDelay(terminationDelay time.Duration) {
// Validate the kill delay time.
if terminationDelay < 0 {
panic("negative termination delay specified")
}

// Lock and defer release of the termination delay lock.
s.terminationDelayLock.Lock()
defer s.terminationDelayLock.Unlock()

// Set the termination delay.
s.terminationDelay = terminationDelay
}

// Close closes the process' streams and terminates the process using heuristics
// designed for agent transport processes. These heuristics are necessary to
// avoid the problem described in golang/go#23019 and experienced in
// mutagen-io/mutagen#223 and mutagen-io/mutagen-compose#11.
//
// First, if a non-negative, non-zero termination delay has been specified, then
// this method will wait (up to the specified duration) for the process to exit
// on its own. If the process exits on its own, then its standard input, output,
// and error streams are closed and this method returns.
//
// Second, the process' standard input stream will be closed. The process will
// then be given up to one second to exit on its own. If it does, then the
// standard output and error streams are closed and this method returns. Closure
// of the standard input stream is recognized by the Mutagen agent as indicating
// termination and should thus be sufficient to cause termination for transport
// processes that forward this closure correctly.
//
// Third, on POSIX systems only, the process will be sent a SIGTERM signal. The
// process will then be given up to one second to exit on its own. If it does,
// then the standard output and error streams are closed and this method
// returns. Reception of SIGTERM is also recognized by the Mutagen agent as
// indicating termination and should thus be sufficient to cause termination for
// transport processes that correctly forward this signal. Windows lacks a
// directly equivalent termination mechanism (the closest analog would be
// sending WM_CLOSE, but reception and processing of such a message may have
// unpredictable effects in different runtimes).
//
// Finally, the process will be sent a SIGKILL signal (on POSIX) or terminated
// via TerminateProcess (on Windows). This method will then wait for the process
// to exit before closing the standard output and error streams and returning.
//
// This method guarantees that, by the time it returns, the underlying transport
// process has terminated and its associated standard input, output, and error
// stream handles in the current process have been closed. The error returned by
// this function will be that returned by os/exec.Cmd.Wait. Note, however, that
// this method cannot guarantee that any or all child processes spawned by the
// transport process have terminated by the time this method returns. This is
// mostly due to operating system API limitations. Specifically, POSIX provides
// no away to restrict subprocesses to a single process group and therefore
// cannot guarantee that a call to killpg will reach all the subprocesses that
// have been spawned. Even if that were possible, there is no mechanism to wait
// for an entire process group to exit, and it's not well-defined exactly what
// signals or stream closures should be used to signal those processes anyway,
// because Mutagen is not privy to the internals of the transport process(es).
// Windows, while it does provide a "job" API for managing and terminating
// process hierarchies, is even less nuanced in its process signaling mechanism
// (essentially offering only the equivalent of SIGKILL) and it's thus even less
// clear how to signal termination there with arbitrary and opaque process
// hierarchies. We thus rely on a certain level of well-behavedness when it
// comes to transport processes. Specifically, we assume that they know how to
// correctly handle and forward standard input closure and SIGTERM signals, and
// that they'll terminate when their underlying agent process terminates.
func (s *Stream) Close() error {
// Start a background Goroutine that will wait for the process to exit and
// return the wait result. We'll rely on this call to Wait to close the
// standard output and error streams. We don't have to worry about
// golang/go#23019 in this case because we're only using pipes and thus Wait
// doesn't have any internal copying Goroutines to wait on.
waitResults := make(chan error, 1)
go func() {
waitResults <- s.process.Wait()
}()

// Start by waiting for the process to terminate on its own.
s.terminationDelayLock.Lock()
terminationDelay := s.terminationDelay
s.terminationDelayLock.Unlock()
waitTimer := time.NewTimer(terminationDelay)
select {
case err := <-waitResults:
waitTimer.Stop()
return err
case <-waitTimer.C:
}

// Close the process' standard input and wait up to one second for it to
// terminate on its own.
s.standardInput.Close()
waitTimer.Reset(time.Second)
select {
case err := <-waitResults:
waitTimer.Stop()
return err
case <-waitTimer.C:
}

// If this is a POSIX system, then send SIGTERM to the process and wait up
// to one second for it to terminate on its own.
if runtime.GOOS != "windows" {
s.process.Process.Signal(syscall.SIGTERM)
waitTimer.Reset(time.Second)
select {
case err := <-waitResults:
waitTimer.Stop()
return err
case <-waitTimer.C:
}
}

// Kill the process (via SIGKILL on POSIX and TerminateProcess on Windows)
// and wait for it to exit.
s.process.Process.Kill()
return <-waitResults
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package process
package transport

// TODO: Implement.
Loading

0 comments on commit 2ae56da

Please sign in to comment.