Skip to content

Commit

Permalink
agent: refactor termination and stream management
Browse files Browse the repository at this point in the history
We've been bitten again by golang/go#23019, so rather than a quick hack,
it's time to properly fix agent termination signaling. While the
original plan was to use killpg and Windows job objects to manage agent
process hierarchies, it quickly became clear when trying to implement
that behavior that the APIs needed to accomplish it simply weren't there
on POSIX or Windows. Moreover, it became even less clear what the
correct signaling and forceful termination mechanisms should be,
especially since Mutagen has no insight into transport process
hierarchies. Thus, Mutagen now takes a staged approach to transport
process termination, first waiting, then closing standard input, then
(on POSIX) sending SIGTERM, and finally forcing termination. It relies
on transport processes to correctly forward these mechanisms to the
Mutagen agent and to manage their own internal process hierarchies
accordingly once the Mutagen agent terminates.

This commit also takes the opportunity to impose a size limit on the
in-memory error buffer used to capture transport errors after a
handshake failure.

Updates #223
Updates mutagen-io/mutagen-compose#11

Signed-off-by: Jacob Howard <jacob@mutagen.io>
  • Loading branch information
xenoscopic committed Feb 3, 2022
1 parent 488fb86 commit b910ae7
Show file tree
Hide file tree
Showing 7 changed files with 302 additions and 197 deletions.
66 changes: 40 additions & 26 deletions pkg/agent/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@ import (
"time"
"unicode/utf8"

transportpkg "github.com/mutagen-io/mutagen/pkg/agent/transport"
"github.com/mutagen-io/mutagen/pkg/filesystem"
"github.com/mutagen-io/mutagen/pkg/logging"
"github.com/mutagen-io/mutagen/pkg/mutagen"
"github.com/mutagen-io/mutagen/pkg/process"
"github.com/mutagen-io/mutagen/pkg/prompting"
streampkg "github.com/mutagen-io/mutagen/pkg/stream"
)

const (
// agentKillDelay is the maximum amount of time that Mutagen will wait for
// an agent process to exit on its own in the event of an error before
// issuing a kill request. It's necessary to make this non-zero because
// agent transport executables need to be allowed time to exit on failure so
// that we can get their true exit codes instead of killing them immediately
// on a handshake error.
agentKillDelay = 5 * time.Second
// agentTerminationDelay is the maximum amount of time that Mutagen will
// wait for an agent process to terminate on its own in the event of a
// handshake error before forcing termination.
agentTerminationDelay = 5 * time.Second
// agentErrorInMemoryCutoff is the maximum number of bytes that Mutagen will
// capture in memory from the standard error output of an agent process.
agentErrorInMemoryCutoff = 32 * 1024
)

// connect connects to an agent-based endpoint using the specified transport,
Expand Down Expand Up @@ -67,7 +67,7 @@ func connect(logger *logging.Logger, transport Transport, mode, prompter string,
// Compute the command to invoke.
command := fmt.Sprintf("%s %s", agentInvocationPath, mode)

// Create an agent process.
// Set up (but do not start) an agent process.
message := "Connecting to agent (POSIX)..."
if cmdExe {
message = "Connecting to agent (Windows)..."
Expand All @@ -80,28 +80,42 @@ func connect(logger *logging.Logger, transport Transport, mode, prompter string,
return nil, false, false, fmt.Errorf("unable to create agent command: %w", err)
}

// Create a stream that wraps the process' standard input/output. We set a
// non-zero kill delay so that, if there's a handshake failure, the process
// will be allowed to exit with its natural exit code (instead of an exit
// code due to forced termination) that we can use to diagnose the issue.
stream, err := process.NewStream(agentProcess, agentKillDelay)
// Create a transport stream to communicate with the process.
stream, err := transportpkg.NewStream(agentProcess)
if err != nil {
return nil, false, false, fmt.Errorf("unable to create agent process stream: %w", err)
}

// Set a non-zero termination delay so that (in the event of a handshake
// failure) the process will be allowed to exit with its natural exit code
// (instead of an exit code due to forced termination) and able to yield
// some error output for diagnosing the issue.
stream.SetTerminationDelay(agentTerminationDelay)

// Create a buffer that we can use to capture the process' standard error
// output in order to give better feedback when there's an error.
errorBuffer := bytes.NewBuffer(nil)

// Wrap the error buffer in a valveWriter and defer the closure of that
// writer. This avoids continuing to pipe output into the buffer for the
// lifetime of the process.
errorWriter := streampkg.NewValveWriter(errorBuffer)
defer errorWriter.Shut()
// Create a cutoff for the error buffer that avoids using large amounts of
// memory (while still being sufficiently large to capture any reasonable
// human-readable error message).
errorCutoff := streampkg.NewCutoffWriter(errorBuffer, agentErrorInMemoryCutoff)

// Create a valve that we can use to stop recording the error output once
// this function returns (at which point the error will already have been
// captured or not have occurred).
errorValve := streampkg.NewValveWriter(errorCutoff)
defer errorValve.Shut()

// Create a splitter that will forward standard error output to both the
// error buffer and the logger.
errorTee := io.MultiWriter(errorValve, logger.Sublogger("remote").Writer(logging.LevelInfo))

// Redirect the process' standard error output to a tee'd writer that writes
// to both our buffer (via the valveWriter) and the logger.
agentProcess.Stderr = io.MultiWriter(errorWriter, logger.Sublogger("remote").Writer(logging.LevelInfo))
// Forward the process' standard error output to a tee'd writer that writes
// to both our error buffer and the logger. We do this manually to avoid
// golang/go#23019. The Goroutine will terminate automatically when the
// transport stream is closed.
go io.Copy(errorTee, stream.StandardError)

// Start the process.
if err = agentProcess.Start(); err != nil {
Expand All @@ -114,7 +128,7 @@ func connect(logger *logging.Logger, transport Transport, mode, prompter string,
// Close the stream to ensure that the underlying process and its
// I/O-forwarding Goroutines have terminated. The error returned from
// Close will be non-nil if the process exits with a non-0 exit code, so
// we don't want to check it, but process.Stream guarantees that if
// we don't want to check it, but transport.Stream guarantees that if
// Close returns, then the underlying process has fully terminated,
// which is all we care about.
stream.Close()
Expand Down Expand Up @@ -152,9 +166,9 @@ func connect(logger *logging.Logger, transport Transport, mode, prompter string,
return nil, tryInstall, cmdExe, errors.New("unable to handshake with agent process")
}

// Now that we've successfully connected, disable the kill delay on the
// process stream.
stream.SetKillDelay(time.Duration(0))
// Now that we've successfully connected, disable the termination delay on
// the process stream.
stream.SetTerminationDelay(time.Duration(0))

// Perform a version handshake.
if err := mutagen.ClientVersionHandshake(stream); err != nil {
Expand Down
199 changes: 199 additions & 0 deletions pkg/agent/transport/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package transport

import (
"fmt"
"io"
"os/exec"
"runtime"
"sync"
"syscall"
"time"
)

// Stream manages the input, output, and error streams an agent process, as well
// the lifetime of that process. It implements io.ReadWriteCloser and guarantees
// that its Close method unblocks pending Read and Write calls.
type Stream struct {
// process is the underlying process.
process *exec.Cmd
// standardInput is the process' standard input stream.
standardInput io.WriteCloser
// standardOutput is the process' standard output stream.
standardOutput io.Reader
// StandardError is the process' standard error stream.
StandardError io.Reader
// terminationDelayLock restricts access to terminationDelay.
terminationDelayLock sync.Mutex
// terminationDelay specifies the duration that the stream should wait for
// the underlying process to exit on its own before performing termination.
terminationDelay time.Duration
}

// NewStream creates a new Stream instance that wraps the specified command
// object. It must be called before the corresponding process is started and no
// other I/O redirection must be performed for the process. If this function
// fails, the command should be considered unusable.
func NewStream(process *exec.Cmd) (*Stream, error) {
// Create a pipe to the process' standard input stream.
standardInput, err := process.StdinPipe()
if err != nil {
return nil, fmt.Errorf("unable to redirect process input: %w", err)
}

// Create a pipe from the process' standard output stream.
standardOutput, err := process.StdoutPipe()
if err != nil {
standardInput.Close()
return nil, fmt.Errorf("unable to redirect process output: %w", err)
}

// Create a pipe from the process' standard error stream.
standardError, err := process.StderrPipe()
if err != nil {
standardInput.Close()
standardOutput.Close()
return nil, fmt.Errorf("unable to redirect process error output: %w", err)
}

// Create the result.
return &Stream{
process: process,
standardInput: standardInput,
standardOutput: standardOutput,
StandardError: standardError,
}, nil
}

// Read implements io.Reader.Read.
func (s *Stream) Read(buffer []byte) (int, error) {
return s.standardOutput.Read(buffer)
}

// Write implements io.Writer.Write.
func (s *Stream) Write(buffer []byte) (int, error) {
return s.standardInput.Write(buffer)
}

// SetTerminationDelay sets the termination delay for the stream. This method
// will panic if terminationDelay is negative. This method is safe to call
// concurrently with Close, though, if called concurrently, there is no
// guarantee that the new delay will be set in time for Close to use it.
func (s *Stream) SetTerminationDelay(terminationDelay time.Duration) {
// Validate the kill delay time.
if terminationDelay < 0 {
panic("negative termination delay specified")
}

// Lock and defer release of the termination delay lock.
s.terminationDelayLock.Lock()
defer s.terminationDelayLock.Unlock()

// Set the termination delay.
s.terminationDelay = terminationDelay
}

// Close closes the process' streams and terminates the process using heuristics
// designed for agent transport processes. These heuristics are necessary to
// avoid the problem described in golang/go#23019 and experienced in
// mutagen-io/mutagen#223 and mutagen-io/mutagen-compose#11.
//
// First, if a non-negative, non-zero termination delay has been specified, then
// this method will wait (up to the specified duration) for the process to exit
// on its own. If the process exits on its own, then its standard input, output,
// and error streams are closed and this method returns.
//
// Second, the process' standard input stream will be closed. The process will
// then be given up to one second to exit on its own. If it does, then the
// standard output and error streams are closed and this method returns. Closure
// of the standard input stream is recognized by the Mutagen agent as indicating
// termination and should thus be sufficient to cause termination for transport
// processes that forward this closure correctly.
//
// Third, on POSIX systems only, the process will be sent a SIGTERM signal. The
// process will then be given up to one second to exit on its own. If it does,
// then the standard output and error streams are closed and this method
// returns. Reception of SIGTERM is also recognized by the Mutagen agent as
// indicating termination and should thus be sufficient to cause termination for
// transport processes that correctly forward this signal. Windows lacks a
// directly equivalent termination mechanism (the closest analog would be
// sending WM_CLOSE, but reception and processing of such a message may have
// unpredictable effects in different runtimes).
//
// Finally, the process will be sent a SIGKILL signal (on POSIX) or terminated
// via TerminateProcess (on Windows). This method will then wait for the process
// to exit before closing the standard output and error streams and returning.
//
// This method guarantees that, by the time it returns, the underlying transport
// process has terminated and its associated standard input, output, and error
// stream handles in the current process have been closed. The error returned by
// this function will be that returned by os/exec.Cmd.Wait. Note, however, that
// this method cannot guarantee that any or all child processes spawned by the
// transport process have terminated by the time this method returns. This is
// mostly due to operating system API limitations. Specifically, POSIX provides
// no away to restrict subprocesses to a single process group and therefore
// cannot guarantee that a call to killpg will reach all the subprocesses that
// have been spawned. Even if that were possible, there is no mechanism to wait
// for an entire process group to exit, and it's not well-defined exactly what
// signals or stream closures should be used to signal those processes anyway,
// because Mutagen is not privy to the internals of the transport process(es).
// Windows, while it does provide a "job" API for managing and terminating
// process hierarchies, is even less nuanced in its process signaling mechanism
// (essentially offering only the equivalent of SIGKILL) and it's thus even less
// clear how to signal termination there with arbitrary and opaque process
// hierarchies. We thus rely on a certain level of well-behavedness when it
// comes to transport processes. Specifically, we assume that they know how to
// correctly handle and forward standard input closure and SIGTERM signals, and
// that they'll terminate when their underlying agent process terminates.
func (s *Stream) Close() error {
// Start a background Goroutine that will wait for the process to exit and
// return the wait result. We'll rely on this call to Wait to close the
// standard output and error streams. We don't have to worry about
// golang/go#23019 in this case because we're only using pipes and thus Wait
// doesn't have any internal copying Goroutines to wait on.
waitResults := make(chan error, 1)
go func() {
waitResults <- s.process.Wait()
}()

// Start by waiting for the process to terminate on its own.
s.terminationDelayLock.Lock()
terminationDelay := s.terminationDelay
s.terminationDelayLock.Unlock()
waitTimer := time.NewTimer(terminationDelay)
select {
case err := <-waitResults:
waitTimer.Stop()
s.standardInput.Close()
return err
case <-waitTimer.C:
}

// Close the process' standard input and wait up to one second for it to
// terminate on its own.
s.standardInput.Close()
waitTimer.Reset(time.Second)
select {
case err := <-waitResults:
waitTimer.Stop()
return err
case <-waitTimer.C:
}

// If this is a POSIX system, then send SIGTERM to the process and wait up
// to one second for it to terminate on its own.
if runtime.GOOS != "windows" {
s.process.Process.Signal(syscall.SIGTERM)
waitTimer.Reset(time.Second)
select {
case err := <-waitResults:
waitTimer.Stop()
return err
case <-waitTimer.C:
}
}

// Kill the process (via SIGKILL on POSIX and TerminateProcess on Windows)
// and wait for it to exit.
s.process.Process.Kill()
return <-waitResults
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package process
package transport

// TODO: Implement.
Loading

0 comments on commit b910ae7

Please sign in to comment.