Skip to content

Commit

Permalink
refactor(rstream-csp): use options object arg
Browse files Browse the repository at this point in the history
BREAKING CHANGE: use options object arg for fromChannel()
  • Loading branch information
postspectacular committed Nov 23, 2019
1 parent b466ebc commit b39f4d0
Showing 1 changed file with 36 additions and 26 deletions.
62 changes: 36 additions & 26 deletions packages/rstream-csp/src/from/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,46 @@ import { Channel } from "@thi.ng/csp";
import { LOGGER, Stream } from "@thi.ng/rstream";
import { CommonOpts } from "@thi.ng/rstream/api";

export interface FromChannelOpts extends CommonOpts {
/**
* If true, the parent CSP channel will be closed when this stream
* closes.
*
* @defaultValue true
*/
closeChannel: boolean;
}

/**
* Returns a stream of values received from given
* {@link @thi.ng/csp#Channel}.
*
* @param src
* @param closeWhenCancelled
* @param opts
*/
export const fromChannel = <T>(
src: Channel<T>,
closeWhenCancelled = true,
opts?: Partial<CommonOpts>
) =>
new Stream<T>(
(stream) => {
let isActive = true;
(async () => {
let x;
while (((x = null), (x = await src.read())) !== undefined) {
if (x === undefined || !isActive) {
break;
}
stream.next(x);
}
stream.done();
})();
return () => {
if (closeWhenCancelled) {
src.close(true);
LOGGER.info("closed channel", src.id);
opts?: Partial<FromChannelOpts>
) => {
opts = { id: `channel-${src.id}`, closeChannel: true, ...opts };
return new Stream<T>((stream) => {
let isActive = true;
(async () => {
let x;
while (((x = null), (x = await src.read())) !== undefined) {
if (x === undefined || !isActive) {
break;
}
isActive = false;
};
},
{ id: `channel-${src.id}`, ...opts }
);
stream.next(x);
}
stream.done();
})();
return () => {
if (opts!.closeChannel !== false) {
src.close(true);
LOGGER.info("closed channel", src.id);
}
isActive = false;
};
}, opts);
};

0 comments on commit b39f4d0

Please sign in to comment.