From b00a65f71dc881f4ebc622cfad14b2abd99340ee Mon Sep 17 00:00:00 2001 From: mrmlnc Date: Sat, 23 Nov 2019 19:52:49 +0300 Subject: [PATCH] fix: destroy source streams when the merged stream is closed --- src/providers/stream.spec.ts | 18 ++++++++++++++++++ src/providers/stream.ts | 3 +++ src/utils/stream.spec.ts | 24 +++++++++++++++++++++++- src/utils/stream.ts | 4 ++++ 4 files changed, 48 insertions(+), 1 deletion(-) diff --git a/src/providers/stream.spec.ts b/src/providers/stream.spec.ts index a2d02332..99c6979b 100644 --- a/src/providers/stream.spec.ts +++ b/src/providers/stream.spec.ts @@ -100,5 +100,23 @@ describe('Providers → ProviderStream', () => { done(); }); }); + + it('should destroy source stream when the destination stream is closed', (done) => { + const provider = getProvider(); + const task = tests.task.builder().base('.').positive('*').build(); + const stream = new PassThrough(); + + provider.reader.dynamic.returns(stream); + + const actual = provider.read(task); + + actual.once('close', () => { + assert.ok(stream.destroyed); + + done(); + }); + + actual.emit('close'); + }); }); }); diff --git a/src/providers/stream.ts b/src/providers/stream.ts index 33ce3473..790efcb8 100644 --- a/src/providers/stream.ts +++ b/src/providers/stream.ts @@ -20,6 +20,9 @@ export default class ProviderStream extends Provider { .on('data', (entry: Entry) => destination.emit('data', options.transform(entry))) .once('end', () => destination.emit('end')); + destination + .once('close', () => source.destroy()); + return destination; } diff --git a/src/utils/stream.spec.ts b/src/utils/stream.spec.ts index d7a947ef..772789f6 100644 --- a/src/utils/stream.spec.ts +++ b/src/utils/stream.spec.ts @@ -9,7 +9,7 @@ describe('Utils → Stream', () => { const first = new stream.PassThrough(); const second = new stream.PassThrough(); - const expected = 2; + const expected = 3; const mergedStream = util.merge([first, second]); @@ -40,5 +40,27 @@ describe('Utils → Stream', () => { second.emit('error', 2); mergedStream.emit('error', 3); }); + + it('should propagate close event to source streams', (done) => { + const first = new stream.PassThrough(); + const second = new stream.PassThrough(); + + const mergedStream = util.merge([first, second]); + + const expected = [1, 2]; + + const actual: number[] = []; + + first.once('close', () => actual.push(1)); + second.once('close', () => actual.push(2)); + + mergedStream.once('finish', () => { + assert.deepStrictEqual(actual, expected); + + done(); + }); + + mergedStream.emit('close'); + }); }); }); diff --git a/src/utils/stream.ts b/src/utils/stream.ts index 14aff96a..76d0c620 100644 --- a/src/utils/stream.ts +++ b/src/utils/stream.ts @@ -9,5 +9,9 @@ export function merge(streams: Readable[]): NodeJS.ReadableStream { stream.once('error', (error) => mergedStream.emit('error', error)); }); + mergedStream.once('close', () => { + streams.forEach((stream) => stream.emit('close')); + }); + return mergedStream; }