Skip to content

Commit

Permalink
Merge 33d6fe2 into 8663361
Browse files Browse the repository at this point in the history
  • Loading branch information
ehmicky committed Jun 26, 2019
2 parents 8663361 + 33d6fe2 commit 04d5e19
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 227 deletions.
94 changes: 3 additions & 91 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@ const childProcess = require('child_process');
const crossSpawn = require('cross-spawn');
const stripFinalNewline = require('strip-final-newline');
const npmRunPath = require('npm-run-path');
const isStream = require('is-stream');
const getStream = require('get-stream');
const mergeStream = require('merge-stream');
const pFinally = require('p-finally');
const makeError = require('./lib/error');
const normalizeStdio = require('./lib/stdio');
const {spawnedKill, spawnedCancel, setupTimeout, setExitHandler, cleanup} = require('./lib/kill');
const {handleInput, getSpawnedResult, makeAllStream, validateInputSync} = require('./lib/stream.js');

const DEFAULT_MAX_BUFFER = 1000 * 1000 * 100;

Expand Down Expand Up @@ -59,20 +57,6 @@ const handleArgs = (file, args, options = {}) => {
return {file, args, options, parsed};
};

const handleInput = (spawned, input) => {
// Checking for stdin is workaround for https://github.com/nodejs/node/issues/26852
// TODO: Remove `|| spawned.stdin === undefined` once we drop support for Node.js <=12.2.0
if (input === undefined || spawned.stdin === undefined) {
return;
}

if (isStream(input)) {
input.pipe(spawned.stdin);
} else {
spawned.stdin.end(input);
}
};

const handleOutput = (options, value, error) => {
if (typeof value !== 'string' && !Buffer.isBuffer(value)) {
// When `execa.sync()` errors, we normalize it to '' to mimic `execa()`
Expand All @@ -86,76 +70,6 @@ const handleOutput = (options, value, error) => {
return value;
};

const makeAllStream = spawned => {
if (!spawned.stdout && !spawned.stderr) {
return;
}

const mixed = mergeStream();

if (spawned.stdout) {
mixed.add(spawned.stdout);
}

if (spawned.stderr) {
mixed.add(spawned.stderr);
}

return mixed;
};

const getBufferedData = async (stream, streamPromise) => {
if (!stream) {
return;
}

stream.destroy();

try {
return await streamPromise;
} catch (error) {
return error.bufferedData;
}
};

const getStreamPromise = (stream, {encoding, buffer, maxBuffer}) => {
if (!stream) {
return;
}

if (!buffer) {
// TODO: Use `ret = util.promisify(stream.finished)(stream);` when targeting Node.js 10
return new Promise((resolve, reject) => {
stream
.once('end', resolve)
.once('error', reject);
});
}

if (encoding) {
return getStream(stream, {encoding, maxBuffer});
}

return getStream.buffer(stream, {maxBuffer});
};

const getPromiseResult = async ({stdout, stderr, all}, {encoding, buffer, maxBuffer}, processDone) => {
const stdoutPromise = getStreamPromise(stdout, {encoding, buffer, maxBuffer});
const stderrPromise = getStreamPromise(stderr, {encoding, buffer, maxBuffer});
const allPromise = getStreamPromise(all, {encoding, buffer, maxBuffer: maxBuffer * 2});

try {
return await Promise.all([processDone, stdoutPromise, stderrPromise, allPromise]);
} catch (error) {
return Promise.all([
{error, code: error.code, signal: error.signal},
getBufferedData(stdout, stdoutPromise),
getBufferedData(stderr, stderrPromise),
getBufferedData(all, allPromise)
]);
}
};

const joinCommand = (file, args = []) => {
if (!Array.isArray(args)) {
return file;
Expand Down Expand Up @@ -248,7 +162,7 @@ const execa = (file, args, options) => {
});

const handlePromise = async () => {
const [result, stdout, stderr, all] = await getPromiseResult(spawned, parsed.options, processDone);
const [result, stdout, stderr, all] = await getSpawnedResult(spawned, parsed.options, processDone);
result.stdout = handleOutput(parsed.options, stdout);
result.stderr = handleOutput(parsed.options, stderr);
result.all = handleOutput(parsed.options, all);
Expand Down Expand Up @@ -299,9 +213,7 @@ module.exports.sync = (file, args, options) => {
const parsed = handleArgs(file, args, options);
const command = joinCommand(file, args);

if (isStream(parsed.options.input)) {
throw new TypeError('The `input` option cannot be a stream in sync mode');
}
validateInputSync(parsed.options);

let result;
try {
Expand Down
105 changes: 105 additions & 0 deletions lib/stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
const isStream = require('is-stream');
const getStream = require('get-stream');
const mergeStream = require('merge-stream');

// `input` option
const handleInput = (spawned, input) => {
// Checking for stdin is workaround for https://github.com/nodejs/node/issues/26852
// TODO: Remove `|| spawned.stdin === undefined` once we drop support for Node.js <=12.2.0
if (input === undefined || spawned.stdin === undefined) {
return;
}

if (isStream(input)) {
input.pipe(spawned.stdin);
} else {
spawned.stdin.end(input);
}
};

// `all` interleaves `stdout` and `stderr`
const makeAllStream = spawned => {
if (!spawned.stdout && !spawned.stderr) {
return;
}

const mixed = mergeStream();

if (spawned.stdout) {
mixed.add(spawned.stdout);
}

if (spawned.stderr) {
mixed.add(spawned.stderr);
}

return mixed;
};

// On failure, `result.stdout|stderr|all` should contain the currently buffered stream
const getBufferedData = async (stream, streamPromise) => {
if (!stream) {
return;
}

stream.destroy();

try {
return await streamPromise;
} catch (error) {
return error.bufferedData;
}
};

const getStreamPromise = (stream, {encoding, buffer, maxBuffer}) => {
if (!stream) {
return;
}

if (!buffer) {
// TODO: Use `ret = util.promisify(stream.finished)(stream);` when targeting Node.js 10
return new Promise((resolve, reject) => {
stream
.once('end', resolve)
.once('error', reject);
});
}

if (encoding) {
return getStream(stream, {encoding, maxBuffer});
}

return getStream.buffer(stream, {maxBuffer});
};

// Retrieve result of child process: exit code, signal, error, streams (stdout/stderr/all)
const getSpawnedResult = async ({stdout, stderr, all}, {encoding, buffer, maxBuffer}, processDone) => {
const stdoutPromise = getStreamPromise(stdout, {encoding, buffer, maxBuffer});
const stderrPromise = getStreamPromise(stderr, {encoding, buffer, maxBuffer});
const allPromise = getStreamPromise(all, {encoding, buffer, maxBuffer: maxBuffer * 2});

try {
return await Promise.all([processDone, stdoutPromise, stderrPromise, allPromise]);
} catch (error) {
return Promise.all([
{error, code: error.code, signal: error.signal},
getBufferedData(stdout, stdoutPromise),
getBufferedData(stderr, stderrPromise),
getBufferedData(all, allPromise)
]);
}
};

const validateInputSync = ({input}) => {
if (isStream(input)) {
throw new TypeError('The `input` option cannot be a stream in sync mode');
}
};

module.exports = {
handleInput,
makeAllStream,
getSpawnedResult,
validateInputSync
};

141 changes: 141 additions & 0 deletions test/stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import path from 'path';
import fs from 'fs';
import stream from 'stream';
import test from 'ava';
import getStream from 'get-stream';
import tempfile from 'tempfile';
import execa from '..';

process.env.PATH = path.join(__dirname, 'fixtures') + path.delimiter + process.env.PATH;

test('buffer', async t => {
const {stdout} = await execa('noop', ['foo'], {encoding: null});
t.true(Buffer.isBuffer(stdout));
t.is(stdout.toString(), 'foo');
});

test('pass `stdout` to a file descriptor', async t => {
const file = tempfile('.txt');
await execa('test/fixtures/noop', ['foo bar'], {stdout: fs.openSync(file, 'w')});
t.is(fs.readFileSync(file, 'utf8'), 'foo bar\n');
});

test('pass `stderr` to a file descriptor', async t => {
const file = tempfile('.txt');
await execa('test/fixtures/noop-err', ['foo bar'], {stderr: fs.openSync(file, 'w')});
t.is(fs.readFileSync(file, 'utf8'), 'foo bar\n');
});

test.serial('result.all shows both `stdout` and `stderr` intermixed', async t => {
const {all} = await execa('noop-132');
t.is(all, '132');
});

test('stdout/stderr/all are undefined if ignored', async t => {
const {stdout, stderr, all} = await execa('noop', {stdio: 'ignore'});
t.is(stdout, undefined);
t.is(stderr, undefined);
t.is(all, undefined);
});

test('stdout/stderr/all are undefined if ignored in sync mode', t => {
const {stdout, stderr, all} = execa.sync('noop', {stdio: 'ignore'});
t.is(stdout, undefined);
t.is(stderr, undefined);
t.is(all, undefined);
});

test('input option can be a String', async t => {
const {stdout} = await execa('stdin', {input: 'foobar'});
t.is(stdout, 'foobar');
});

test('input option can be a Buffer', async t => {
const {stdout} = await execa('stdin', {input: 'testing12'});
t.is(stdout, 'testing12');
});

test('input can be a Stream', async t => {
const s = new stream.PassThrough();
s.write('howdy');
s.end();
const {stdout} = await execa('stdin', {input: s});
t.is(stdout, 'howdy');
});

test('you can write to child.stdin', async t => {
const child = execa('stdin');
child.stdin.end('unicorns');
t.is((await child).stdout, 'unicorns');
});

test('input option can be a String - sync', t => {
const {stdout} = execa.sync('stdin', {input: 'foobar'});
t.is(stdout, 'foobar');
});

test('input option can be a Buffer - sync', t => {
const {stdout} = execa.sync('stdin', {input: Buffer.from('testing12', 'utf8')});
t.is(stdout, 'testing12');
});

test('opts.stdout:ignore - stdout will not collect data', async t => {
const {stdout} = await execa('stdin', {
input: 'hello',
stdio: [undefined, 'ignore', undefined]
});
t.is(stdout, undefined);
});

test('helpful error trying to provide an input stream in sync mode', t => {
t.throws(
() => {
execa.sync('stdin', {input: new stream.PassThrough()});
},
/The `input` option cannot be a stream in sync mode/
);
});

test('maxBuffer affects stdout', async t => {
await t.notThrowsAsync(execa('max-buffer', ['stdout', '10'], {maxBuffer: 10}));
const {stdout, all} = await t.throwsAsync(execa('max-buffer', ['stdout', '11'], {maxBuffer: 10}), /max-buffer stdout/);
t.is(stdout, '.'.repeat(10));
t.is(all, '.'.repeat(10));
});

test('maxBuffer affects stderr', async t => {
await t.notThrowsAsync(execa('max-buffer', ['stderr', '10'], {maxBuffer: 10}));
const {stderr, all} = await t.throwsAsync(execa('max-buffer', ['stderr', '11'], {maxBuffer: 10}), /max-buffer stderr/);
t.is(stderr, '.'.repeat(10));
t.is(all, '.'.repeat(10));
});

test('do not buffer stdout when `buffer` set to `false`', async t => {
const promise = execa('max-buffer', ['stdout', '10'], {buffer: false});
const [result, stdout] = await Promise.all([
promise,
getStream(promise.stdout),
getStream(promise.all)
]);

t.is(result.stdout, undefined);
t.is(stdout, '.........\n');
});

test('do not buffer stderr when `buffer` set to `false`', async t => {
const promise = execa('max-buffer', ['stderr', '10'], {buffer: false});
const [result, stderr] = await Promise.all([
promise,
getStream(promise.stderr),
getStream(promise.all)
]);

t.is(result.stderr, undefined);
t.is(stderr, '.........\n');
});

test('do not buffer when streaming', async t => {
const {stdout} = execa('max-buffer', ['stdout', '21'], {maxBuffer: 10});
const result = await getStream(stdout);
t.is(result, '....................\n');
});
Loading

0 comments on commit 04d5e19

Please sign in to comment.