Skip to content

Commit

Permalink
fix: destroy source streams when the merged stream is closed
Browse files Browse the repository at this point in the history
  • Loading branch information
mrmlnc committed Nov 23, 2019
1 parent 0bd3ae4 commit b00a65f
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 1 deletion.
18 changes: 18 additions & 0 deletions src/providers/stream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,23 @@ describe('Providers → ProviderStream', () => {
done();
});
});

it('should destroy source stream when the destination stream is closed', (done) => {
const provider = getProvider();
const task = tests.task.builder().base('.').positive('*').build();
const stream = new PassThrough();

provider.reader.dynamic.returns(stream);

const actual = provider.read(task);

actual.once('close', () => {
assert.ok(stream.destroyed);

done();
});

actual.emit('close');
});
});
});
3 changes: 3 additions & 0 deletions src/providers/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ export default class ProviderStream extends Provider<Readable> {
.on('data', (entry: Entry) => destination.emit('data', options.transform(entry)))
.once('end', () => destination.emit('end'));

destination
.once('close', () => source.destroy());

return destination;
}

Expand Down
24 changes: 23 additions & 1 deletion src/utils/stream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ describe('Utils → Stream', () => {
const first = new stream.PassThrough();
const second = new stream.PassThrough();

const expected = 2;
const expected = 3;

const mergedStream = util.merge([first, second]);

Expand Down Expand Up @@ -40,5 +40,27 @@ describe('Utils → Stream', () => {
second.emit('error', 2);
mergedStream.emit('error', 3);
});

it('should propagate close event to source streams', (done) => {
const first = new stream.PassThrough();
const second = new stream.PassThrough();

const mergedStream = util.merge([first, second]);

const expected = [1, 2];

const actual: number[] = [];

first.once('close', () => actual.push(1));
second.once('close', () => actual.push(2));

mergedStream.once('finish', () => {
assert.deepStrictEqual(actual, expected);

done();
});

mergedStream.emit('close');
});
});
});
4 changes: 4 additions & 0 deletions src/utils/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,9 @@ export function merge(streams: Readable[]): NodeJS.ReadableStream {
stream.once('error', (error) => mergedStream.emit('error', error));
});

mergedStream.once('close', () => {
streams.forEach((stream) => stream.emit('close'));
});

return mergedStream;
}

0 comments on commit b00a65f

Please sign in to comment.