Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement flow-control on the output pipe. #2

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

roadrunner2
Copy link
Contributor

Otherwise a slow reader on the output will cause internal stream buffers
to grow unbounded.

Note that the input pipe already has implicit flow-control via the
callback.

Otherwise a slow reader on the output will cause internal stream buffers
to grow unbounded.

Note that the input pipe already has implicit flow-control via the
callback.
@junosuarez
Copy link
Owner

Thanks for the pull request! Can you please add tests for this functionality in https://github.com/jden/exec-stream/blob/master/test/test.js ? The tests should fail without the PR and pass with it.

Thanks!

@missinglink
Copy link

I tested this with pv and got the following error:

pv: (stdin): select call failed: -1: Operation not supported on socket

@missinglink
Copy link

My issue seems to be resolved by using a bookkeeping boolean to ensure that SIGCONT / SIGSTOP kill signals are not being repeated.

@missinglink
Copy link

  var stopped = false

  AB._read = function (n) {
    if (stopped) {
      stopped = false
      proc.kill('SIGCONT')
    }
  }

  B.on('readable', function () {
    if (!AB.push(B.read()) && !stopped){
      stopped = true
      proc.kill('SIGSTOP')
    }
  })

@missinglink
Copy link

I know this issue is super old but this PR is a very nice addition, I couldn't find any other libraries doing this correctly.
Without this, if you pipe to something like bzip2 the internal nodejs buffer will grow uncontrollably while backpressure exists and can result in a crash.

@missinglink
Copy link

If anyone else is interested, this is the version I've had success with:

// https://github.com/junosuarez/exec-stream/pull/2

const Stream = require('stream')
const spawn = require('child_process').spawn

// optionally increase the internal buffer (up from 64KB)
// this is a tradeoff between using more RAM for the
// buffer and having more frequent stop/starts of
// the subprocess due to back-pressure.
// note: my testing showed no benefit in increasing this
const streamOptions = {
  // highWaterMark: 1e+6 // 1MB
}

function spawnDuplex(cmd, args, options) {

  const AB = new Stream.Duplex(streamOptions)
  const A = new Stream.PassThrough(streamOptions)
  const B = new Stream.PassThrough(streamOptions)

  // keep track of signal state
  // see: https://github.com/junosuarez/exec-stream/pull/2
  let SIGSTOP = false

  AB._write = (chunk, encoding, cb) => {
    return A.write(chunk, encoding, cb)
  }

  AB.on('finish', () => {
    A.end()
  })

  AB._read = function (n) {
    // send SIGCONT to continue
    if (SIGSTOP) {
      SIGSTOP = false
      proc.kill('SIGCONT')
    }
  }

  B.on('readable', () => {
    // send SIGSTOP to handle backpressure
    if (!AB.push(B.read()) && !SIGSTOP){
      SIGSTOP = true
      proc.kill('SIGSTOP')
    }
  })

  B.on('end', () => {
    AB.push(null)
  })

  B.on('error', (err) => {
    AB.emit('error', err)
  })

  A.on('error', (err) => {
    AB.emit('error', err)
  })

  // spawn the child process
  const proc = spawn(cmd, args, options)

  // connect pipes
  A.pipe(proc.stdin)
  proc.stdout.pipe(B)

  // do not discard stderr stream, send to process
  proc.stderr.pipe(process.stderr)

  proc.on('error', (err) => {
    AB.emit('error', err)
  })

  return AB
}

module.exports = spawnDuplex

@missinglink
Copy link

missinglink commented Mar 4, 2020

In order to reproduce the bug, you just need to have a fast writer and a slow reader, you can use a tool like htop to see the memory growing and eventually the process will run out of memory and die:

const execStream = require('exec-stream')

execStream('cat', ['/dev/random'])
  .pipe(execStream('bzip2'))
  .pipe(execStream('bash', ['-c', '1> /dev/null']))

Screenshot 2020-03-04 at 10 52 12

@junosuarez
Copy link
Owner

Thanks for chiming in on this! I no longer have access to the npm account that published this package, nor do I intend to maintain this module further. If you're interested, I would be happy to transfer this repo to your github account.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants