Skip to content

Commit

Permalink
feat: Add transformSafely.
Browse files Browse the repository at this point in the history
  • Loading branch information
RubenVerborgh authored and joachimvh committed Jan 11, 2021
1 parent 61aa2e1 commit 995a2dc
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 3 deletions.
57 changes: 55 additions & 2 deletions src/util/StreamUtil.ts
@@ -1,5 +1,5 @@
import type { Writable, ReadableOptions } from 'stream';
import { Readable } from 'stream';
import type { Writable, ReadableOptions, DuplexOptions } from 'stream';
import { Readable, Transform } from 'stream';
import arrayifyStream from 'arrayify-stream';
import { getLoggerFor } from '../logging/LogUtil';
import type { Guarded } from './GuardedStream';
Expand Down Expand Up @@ -43,6 +43,59 @@ export function pipeSafely<T extends Writable>(readable: NodeJS.ReadableStream,
return guardStream(destination);
}

export interface AsyncTransformOptions<T = any> extends DuplexOptions {
/**
* Transforms data from the source by calling the `push` method
*/
transform?: (this: Transform, data: T, encoding: string) => any | Promise<any>;

/**
* Performs any final actions after the source has ended
*/
flush?: (this: Transform) => any | Promise<any>;
}

/**
* Transforms a stream, ensuring that all errors are forwarded.
* @param source - The stream to be transformed
* @param options - The transformation options
*
* @returns The transformed stream
*/
export function transformSafely<T = any>(
source: NodeJS.ReadableStream,
{
transform = function(data): void {
this.push(data);
},
flush = (): null => null,
...options
}: AsyncTransformOptions<T> = {},
):
Guarded<Transform> {
return pipeSafely(source, new Transform({
...options,
async transform(data, encoding, callback): Promise<void> {
let error: Error | null = null;
try {
await transform.call(this, data, encoding);
} catch (err: unknown) {
error = err as Error;
}
callback(error);
},
async flush(callback): Promise<void> {
let error: Error | null = null;
try {
await flush.call(this);
} catch (err: unknown) {
error = err as Error;
}
callback(error);
},
}));
}

/**
* Converts an iterable to a stream and applies an error guard so that it is {@link Guarded}.
* @param iterable - Data to stream.
Expand Down
101 changes: 100 additions & 1 deletion test/unit/util/StreamUtil.test.ts
@@ -1,6 +1,7 @@
import { PassThrough } from 'stream';
import arrayifyStream from 'arrayify-stream';
import streamifyArray from 'streamify-array';
import { guardedStreamFrom, pipeSafely, readableToString } from '../../../src/util/StreamUtil';
import { guardedStreamFrom, pipeSafely, transformSafely, readableToString } from '../../../src/util/StreamUtil';

describe('StreamUtil', (): void => {
describe('#readableToString', (): void => {
Expand Down Expand Up @@ -41,6 +42,104 @@ describe('StreamUtil', (): void => {
});
});

describe('#transformSafely', (): void => {
it('can transform a stream without arguments.', async(): Promise<void> => {
const source = streamifyArray([ 'data' ]);
const transformed = transformSafely(source);
transformed.setEncoding('utf8');
const result = await arrayifyStream(transformed);
expect(result).toEqual([ 'data' ]);
});

it('can transform a stream synchronously.', async(): Promise<void> => {
const source = streamifyArray([ 'data' ]);
const transformed = transformSafely<string>(source, {
encoding: 'utf8',
transform(data: string): void {
this.push(`${data}1`);
this.push(`${data}2`);
},
flush(): void {
this.push(`data3`);
},
});
const result = await arrayifyStream(transformed);
expect(result).toEqual([ 'data1', 'data2', 'data3' ]);
});

it('can transform a stream asynchronously.', async(): Promise<void> => {
const source = streamifyArray([ 'data' ]);
const transformed = transformSafely<string>(source, {
encoding: 'utf8',
async transform(data: string): Promise<void> {
await new Promise((resolve): any => setImmediate(resolve));
this.push(`${data}1`);
this.push(`${data}2`);
},
async flush(): Promise<void> {
await new Promise((resolve): any => setImmediate(resolve));
this.push(`data3`);
},
});
const result = await arrayifyStream(transformed);
expect(result).toEqual([ 'data1', 'data2', 'data3' ]);
});

it('catches source errors.', async(): Promise<void> => {
const error = new Error('stream error');
const source = new PassThrough();
const transformed = transformSafely<string>(source);
source.emit('error', error);
await expect(arrayifyStream(transformed)).rejects.toThrow(error);
});

it('catches synchronous errors on transform.', async(): Promise<void> => {
const error = new Error('stream error');
const source = streamifyArray([ 'data' ]);
const transformed = transformSafely<string>(source, {
transform(): never {
throw error;
},
});
await expect(arrayifyStream(transformed)).rejects.toThrow(error);
});

it('catches synchronous errors on flush.', async(): Promise<void> => {
const error = new Error('stream error');
const source = streamifyArray([ 'data' ]);
const transformed = transformSafely<string>(source, {
async flush(): Promise<never> {
await new Promise((resolve): any => setImmediate(resolve));
throw error;
},
});
await expect(arrayifyStream(transformed)).rejects.toThrow(error);
});

it('catches asynchronous errors on transform.', async(): Promise<void> => {
const error = new Error('stream error');
const source = streamifyArray([ 'data' ]);
const transformed = transformSafely<string>(source, {
transform(): never {
throw error;
},
});
await expect(arrayifyStream(transformed)).rejects.toThrow(error);
});

it('catches asynchronous errors on flush.', async(): Promise<void> => {
const error = new Error('stream error');
const source = streamifyArray([ 'data' ]);
const transformed = transformSafely<string>(source, {
async flush(): Promise<never> {
await new Promise((resolve): any => setImmediate(resolve));
throw error;
},
});
await expect(arrayifyStream(transformed)).rejects.toThrow(error);
});
});

describe('#guardedStreamFrom', (): void => {
it('converts data to a guarded stream.', async(): Promise<void> => {
const data = [ 'a', 'b' ];
Expand Down

0 comments on commit 995a2dc

Please sign in to comment.