From fcda14cccef2d6d230cf187e1050fa0026b0f9b7 Mon Sep 17 00:00:00 2001 From: "arvid.nicolaas" Date: Fri, 10 Jun 2022 16:29:36 +0200 Subject: [PATCH] feat(stream): add stream transformers and additional methods to Stream and AsyncStream --- .../async-custom/async-fast-iterator-base.ts | 110 ++++++++ .../async-custom/async-stream-custom.ts | 239 ++++++++++++++---- deno_dist/stream/async/async-transformer.ts | 146 +++++++++++ deno_dist/stream/async/index.ts | 2 + deno_dist/stream/async/interface.ts | 90 +++++-- .../stream/custom/fast-iterator-custom.ts | 108 ++++++++ deno_dist/stream/custom/stream-custom.ts | 138 +++++++++- deno_dist/stream/main/index.ts | 2 + deno_dist/stream/main/interface.ts | 93 ++++++- deno_dist/stream/main/transformer.ts | 141 +++++++++++ .../async-custom/async-fast-iterator-base.ts | 110 ++++++++ .../src/async-custom/async-stream-custom.ts | 239 ++++++++++++++---- .../stream/src/async/async-transformer.ts | 146 +++++++++++ packages/stream/src/async/index.ts | 2 + packages/stream/src/async/interface.ts | 90 +++++-- .../stream/src/custom/fast-iterator-custom.ts | 108 ++++++++ packages/stream/src/custom/stream-custom.ts | 138 +++++++++- packages/stream/src/main/index.ts | 2 + packages/stream/src/main/interface.ts | 93 ++++++- packages/stream/src/main/transformer.ts | 141 +++++++++++ packages/stream/test/async-stream.test.ts | 134 ++++++++++ .../stream/test/async-transformer.test.ts | 70 +++++ packages/stream/test/stream.test.ts | 84 ++++++ packages/stream/test/transformer.test.ts | 60 +++++ 24 files changed, 2326 insertions(+), 160 deletions(-) create mode 100644 deno_dist/stream/async/async-transformer.ts create mode 100644 deno_dist/stream/main/transformer.ts create mode 100644 packages/stream/src/async/async-transformer.ts create mode 100644 packages/stream/src/main/transformer.ts create mode 100644 packages/stream/test/async-transformer.test.ts create mode 100644 packages/stream/test/transformer.test.ts diff --git a/deno_dist/stream/async-custom/async-fast-iterator-base.ts b/deno_dist/stream/async-custom/async-fast-iterator-base.ts index 331d771b9..299f77b05 100644 --- a/deno_dist/stream/async-custom/async-fast-iterator-base.ts +++ b/deno_dist/stream/async-custom/async-fast-iterator-base.ts @@ -1245,3 +1245,113 @@ export class AsyncReduceAllIterator extends AsyncFastIteratorBase { ) as any; } } + +export class AsyncDistinctPreviousIterator extends AsyncFastIteratorBase { + constructor(readonly source: AsyncFastIterator, readonly eq: Eq) { + super(); + this.return = (): Promise => closeIters(source); + } + + readonly previous = [] as T[]; + + async fastNext(otherwise?: AsyncOptLazy): Promise { + const done = Symbol('Done'); + + let next: T | typeof done; + const source = this.source; + const previous = this.previous; + + while (done !== (next = await source.fastNext(done))) { + previous.push(next); + + if (previous.length === 1) { + return next; + } + + const prev = previous.shift()!; + + if (!this.eq(prev, next)) { + return next; + } + } + + return AsyncOptLazy.toMaybePromise(otherwise!); + } +} + +export class AsyncWindowIterator extends AsyncFastIteratorBase { + constructor( + readonly source: AsyncFastIterator, + readonly windowSize: number, + readonly skipAmount: number, + readonly collector: AsyncReducer + ) { + super(); + this.return = (): Promise => closeIters(source); + } + + state = new Set<{ + result: unknown; + size: number; + halted: boolean; + halt: () => void; + }>(); + index = 0; + + async fastNext(otherwise?: AsyncOptLazy): Promise { + const source = this.source; + const collector = this.collector; + const windowSize = this.windowSize; + const skipAmount = this.skipAmount; + const done = Symbol('Done'); + const state = this.state; + + let next: T | typeof done; + let result: R | typeof done = done; + + while (done !== (next = await source.fastNext(done))) { + for (const current of state) { + current.result = await collector.next( + current.result, + next, + current.size, + current.halt + ); + current.size++; + + if (current.size >= windowSize || current.halted) { + result = await collector.stateToResult(current.result); + state.delete(current); + } + } + + if (this.index % skipAmount === 0) { + const newState = { + result: await AsyncOptLazy.toMaybePromise(collector.init), + size: 1, + halted: false, + halt(): void { + this.halted = true; + }, + }; + + newState.result = await collector.next( + AsyncOptLazy.toMaybePromise(collector.init), + next, + 0, + newState.halt + ); + + state.add(newState); + } + + this.index++; + + if (done !== result) { + return result; + } + } + + return AsyncOptLazy.toMaybePromise(otherwise!); + } +} diff --git a/deno_dist/stream/async-custom/async-stream-custom.ts b/deno_dist/stream/async-custom/async-stream-custom.ts index c43d925a0..2a3959611 100644 --- a/deno_dist/stream/async-custom/async-stream-custom.ts +++ b/deno_dist/stream/async-custom/async-stream-custom.ts @@ -15,6 +15,7 @@ import type { AsyncFastIterator, AsyncStream, AsyncStreamSource, + AsyncTransformer, } from '../../stream/async/index.ts'; import { closeIters, @@ -32,6 +33,7 @@ import { AsyncConcatIterator, AsyncDropIterator, AsyncSplitWhereIterator, + AsyncDistinctPreviousIterator, AsyncDropWhileIterator, AsyncFilterIterator, AsyncFilterPureIterator, @@ -52,6 +54,7 @@ import { AsyncStreamConstructors, AsyncTakeIterator, AsyncTakeWhileIterator, + AsyncWindowIterator, } from '../../stream/async-custom/index.ts'; import { Stream } from '../../stream/mod.ts'; import type { StreamSource } from '../../stream/mod.ts'; @@ -196,10 +199,10 @@ export abstract class AsyncStreamBase implements AsyncStream { ); } - flatReduceStream( - reducer: AsyncReducer> + transform( + transformer: AsyncReducer> ): AsyncStream { - return AsyncStreamConstructorsImpl.flatten(this.reduceStream(reducer)); + return AsyncStreamConstructorsImpl.flatten(this.reduceStream(transformer)); } filter( @@ -239,7 +242,7 @@ export abstract class AsyncStreamBase implements AsyncStream { const value = await iter.fastNext(done); if (done === value) { - return AsyncOptLazy.toMaybePromise(otherwise!); + return AsyncOptLazy.toPromise(otherwise!); } await closeIters(iter); return value; @@ -255,11 +258,28 @@ export abstract class AsyncStreamBase implements AsyncStream { lastValue = value; } - if (done === lastValue) return AsyncOptLazy.toMaybePromise(otherwise!); + if (done === lastValue) return AsyncOptLazy.toPromise(otherwise!); return lastValue; } + async single(otherwise?: AsyncOptLazy): Promise { + const iterator = this[Symbol.asyncIterator](); + const done = Symbol('Done'); + + const value = await iterator.fastNext(done); + + if (done !== value) { + if (done === (await iterator.fastNext(done))) { + return value; + } + } + + await closeIters(iterator); + + return AsyncOptLazy.toPromise(otherwise!); + } + async count(): Promise { let result = 0; @@ -317,7 +337,7 @@ export abstract class AsyncStreamBase implements AsyncStream { occurrance = 1, otherwise?: AsyncOptLazy ): Promise { - if (occurrance <= 0) return AsyncOptLazy.toMaybePromise(otherwise!); + if (occurrance <= 0) return AsyncOptLazy.toPromise(otherwise!); const done = Symbol('Done'); const iterator = this[Symbol.asyncIterator](); @@ -333,7 +353,7 @@ export abstract class AsyncStreamBase implements AsyncStream { } } - return AsyncOptLazy.toMaybePromise(otherwise!); + return AsyncOptLazy.toPromise(otherwise!); } finally { if (done !== value!) await closeIters(iterator); } @@ -343,7 +363,7 @@ export abstract class AsyncStreamBase implements AsyncStream { index: number, otherwise?: AsyncOptLazy ): Promise { - if (index < 0) return AsyncOptLazy.toMaybePromise(otherwise!); + if (index < 0) return AsyncOptLazy.toPromise(otherwise!); const done = Symbol('Done'); const iterator = this[Symbol.asyncIterator](); @@ -358,7 +378,7 @@ export abstract class AsyncStreamBase implements AsyncStream { i++; } - return AsyncOptLazy.toMaybePromise(otherwise!); + return AsyncOptLazy.toPromise(otherwise!); } finally { if (done !== value!) await closeIters(iterator); } @@ -469,6 +489,34 @@ export abstract class AsyncStreamBase implements AsyncStream { return undefined !== (await this.indexOf(searchValue, amount, eq)); } + async containsSlice( + source: AsyncStreamSource.NonEmpty, + eq = Eq.objectIs + ): Promise { + const iterator = this[Symbol.asyncIterator](); + const sourceStream = fromAsyncStreamSource(source); + let sourceIterator = sourceStream[Symbol.asyncIterator](); + + const done = Symbol('Done'); + + while (true) { + const sourceValue = await sourceIterator.fastNext(done); + if (done === sourceValue) { + await closeIters(iterator); + return true; + } + + const value = await iterator.fastNext(done); + if (done === value) { + return false; + } + + if (!eq(sourceValue, value)) { + sourceIterator = sourceStream[Symbol.asyncIterator](); + } + } + } + takeWhile( pred: (value: T, index: number) => MaybePromise ): AsyncStream { @@ -526,7 +574,7 @@ export abstract class AsyncStreamBase implements AsyncStream { try { result = await iterator.fastNext(done); - if (done === result) return AsyncOptLazy.toMaybePromise(otherwise!); + if (done === result) return AsyncOptLazy.toPromise(otherwise!); while (done !== (value = await iterator.fastNext(done))) { if (compare(value, result) < 0) result = value; @@ -555,7 +603,7 @@ export abstract class AsyncStreamBase implements AsyncStream { try { result = await iterator.fastNext(done); - if (done === result) return AsyncOptLazy.toMaybePromise(otherwise!); + if (done === result) return AsyncOptLazy.toPromise(otherwise!); while (done !== (value = await iterator.fastNext(done))) { if (compare(value, result) > 0) result = value; @@ -624,6 +672,18 @@ export abstract class AsyncStreamBase implements AsyncStream { return new AsyncSplitOnStream(this, sepElem, eq); } + distinctPrevious(eq: Eq = Eq.objectIs): AsyncStream { + return new AsyncDistinctPreviousStream(this, eq); + } + + window( + windowSize: number, + skipAmount = windowSize, + collector: AsyncReducer = AsyncReducer.toArray() as any + ): AsyncStream { + return new AsyncWindowStream(this, windowSize, skipAmount, collector); + } + async fold( init: AsyncOptLazy, next: ( @@ -763,7 +823,7 @@ export abstract class AsyncStreamBase implements AsyncStream { reduceAllStream( ...reducers: { [K in keyof R]: AsyncReducer } ): any { - return new AsyncReduceAllStream(this, reducers); + return new AsyncReduceAllStream(this, reducers); } async toArray(): Promise { @@ -810,14 +870,14 @@ class AsyncPrependStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncPrependIterator( + return new AsyncPrependIterator( this.source[Symbol.asyncIterator](), this.item ); } async first(): Promise { - return AsyncOptLazy.toMaybePromise(this.item); + return AsyncOptLazy.toPromise(this.item); } async last(): Promise { @@ -835,7 +895,7 @@ class AsyncAppendStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncAppendIterator( + return new AsyncAppendIterator( this.source[Symbol.asyncIterator](), this.item ); @@ -846,7 +906,7 @@ class AsyncAppendStream extends AsyncStreamBase { } async last(): Promise { - return AsyncOptLazy.toMaybePromise(this.item); + return AsyncOptLazy.toPromise(this.item); } async count(): Promise { @@ -889,14 +949,14 @@ class AsyncMapStream extends AsyncStreamBase { async first(otherwise?: AsyncOptLazy): Promise { const done = Symbol('Done'); const value = await this.source.first(done); - if (done === value) return AsyncOptLazy.toMaybePromise(otherwise!); + if (done === value) return AsyncOptLazy.toPromise(otherwise!); return this.mapFun(value, 0); } async last(otherwise?: AsyncOptLazy): Promise { const done = Symbol('Done'); const value = await this.source.last(done); - if (done === value) return AsyncOptLazy.toMaybePromise(otherwise!); + if (done === value) return AsyncOptLazy.toPromise(otherwise!); return this.mapFun(value, 0); } @@ -910,14 +970,14 @@ class AsyncMapStream extends AsyncStreamBase { ): Promise { const done = Symbol('Done'); const value = await this.source.elementAt(index, done); - if (done === value) return AsyncOptLazy.toMaybePromise(otherwise!); + if (done === value) return AsyncOptLazy.toPromise(otherwise!); return this.mapFun(value, index); } map( mapFun: (value: T2, index: number) => MaybePromise ): AsyncStream { - return new AsyncMapStream(this.source, async (value, index) => + return new AsyncMapStream(this.source, async (value, index) => mapFun(await this.mapFun(value, index), index) ); } @@ -937,7 +997,7 @@ class AsyncMapPureStream< } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncMapPureIterator( + return new AsyncMapPureIterator( this.source[Symbol.asyncIterator](), this.mapFun, this.args @@ -947,14 +1007,14 @@ class AsyncMapPureStream< async first(otherwise?: AsyncOptLazy): Promise { const done = Symbol('Done'); const value = await this.source.first(done); - if (done === value) return AsyncOptLazy.toMaybePromise(otherwise!); + if (done === value) return AsyncOptLazy.toPromise(otherwise!); return this.mapFun(value, ...this.args); } async last(otherwise?: AsyncOptLazy): Promise { const done = Symbol('Done'); const value = await this.source.last(done); - if (done === value) return AsyncOptLazy.toMaybePromise(otherwise!); + if (done === value) return AsyncOptLazy.toPromise(otherwise!); return this.mapFun(value, ...this.args); } @@ -968,7 +1028,7 @@ class AsyncMapPureStream< ): Promise { const done = Symbol('Done'); const value = await this.source.elementAt(index, done); - if (done === value) return AsyncOptLazy.toMaybePromise(otherwise!); + if (done === value) return AsyncOptLazy.toPromise(otherwise!); return this.mapFun(value, ...this.args); } } @@ -1033,7 +1093,7 @@ class AsyncFilterStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncFilterIterator( + return new AsyncFilterIterator( this.source[Symbol.asyncIterator](), this.pred, this.invert @@ -1065,7 +1125,7 @@ class AsyncFilterPureStream< } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncFilterPureIterator( + return new AsyncFilterPureIterator( this.source[Symbol.asyncIterator](), this.pred, this.args, @@ -1083,7 +1143,7 @@ class AsyncCollectStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncCollectIterator( + return new AsyncCollectIterator( this.source[Symbol.asyncIterator](), this.collectFun ); @@ -1099,7 +1159,7 @@ class AsyncIndicesWhereStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncIndicesWhereIterator( + return new AsyncIndicesWhereIterator( this.source[Symbol.asyncIterator](), this.pred ); @@ -1116,7 +1176,7 @@ class AsyncIndicesOfStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncIndicesOfIterator( + return new AsyncIndicesOfIterator( this.source[Symbol.asyncIterator](), this.searchValue, this.eq @@ -1133,7 +1193,7 @@ class AsyncTakeWhileStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncTakeWhileIterator( + return new AsyncTakeWhileIterator( this.source[Symbol.asyncIterator](), this.pred ); @@ -1149,7 +1209,7 @@ class AsyncDropWhileStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncDropWhileIterator( + return new AsyncDropWhileIterator( this.source[Symbol.asyncIterator](), this.pred ); @@ -1162,7 +1222,7 @@ class AsyncTakeStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncTakeIterator( + return new AsyncTakeIterator( this.source[Symbol.asyncIterator](), this.amount ); @@ -1180,7 +1240,7 @@ class AsyncDropStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncDropIterator( + return new AsyncDropIterator( this.source[Symbol.asyncIterator](), this.amount ); @@ -1201,7 +1261,7 @@ class AsyncSplitWhereStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncSplitWhereIterator( + return new AsyncSplitWhereIterator( this.source[Symbol.asyncIterator](), this.pred ); @@ -1218,7 +1278,7 @@ class AsyncSplitOnStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncSplitOnIterator( + return new AsyncSplitOnIterator( this.source[Symbol.asyncIterator](), this.sepElem, this.eq @@ -1226,6 +1286,39 @@ class AsyncSplitOnStream extends AsyncStreamBase { } } +class AsyncDistinctPreviousStream extends AsyncStreamBase { + constructor(readonly source: AsyncStream, readonly eq: Eq) { + super(); + } + + [Symbol.asyncIterator](): AsyncFastIterator { + return new AsyncDistinctPreviousIterator( + this.source[Symbol.asyncIterator](), + this.eq + ); + } +} + +class AsyncWindowStream extends AsyncStreamBase { + constructor( + readonly source: AsyncStream, + readonly windowSize: number, + readonly skipAmount: number, + readonly collector: AsyncReducer + ) { + super(); + } + + [Symbol.asyncIterator](): AsyncFastIterator { + return new AsyncWindowIterator( + this.source[Symbol.asyncIterator](), + this.windowSize, + this.skipAmount, + this.collector + ); + } +} + class AsyncReduceStream extends AsyncStreamBase { constructor( readonly source: AsyncStream, @@ -1235,7 +1328,7 @@ class AsyncReduceStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncReduceIterator( + return new AsyncReduceIterator( this.source[Symbol.asyncIterator](), this.reducerDef ); @@ -1251,7 +1344,7 @@ class AsyncReduceAllStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncReduceAllIterator( + return new AsyncReduceAllIterator( this.source[Symbol.asyncIterator](), this.reducers ); @@ -1264,7 +1357,7 @@ export class AsyncOfStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncOfIterator(this.values); + return new AsyncOfIterator(this.values); } } @@ -1280,29 +1373,61 @@ class AsyncEmptyStream return emptyAsyncFastIterator; } + asyncStream(): this { + return this; + } + + async equals(other: AsyncStreamSource): Promise { + if (other === this) return true; + + const done = Symbol('done'); + + return ( + done === + fromAsyncStreamSource(other)[Symbol.asyncIterator]().fastNext(done) + ); + } + + prepend(value: AsyncOptLazy): AsyncStream.NonEmpty { + return AsyncStreamConstructorsImpl.of(value); + } + + append(value: AsyncOptLazy): AsyncStream.NonEmpty { + return AsyncStreamConstructorsImpl.of(value); + } + assumeNonEmpty(): never { RimbuError.throwEmptyCollectionAssumedNonEmptyError(); } - async forEach(): Promise { // } - + async forEachPure(): Promise { + // + } indexed(): AsyncStream<[number, T]> { return this as any; } - map(): AsyncStream { return this as any; } - mapPure(): AsyncStream { return this as any; } - flatMap(): AsyncStream { return this as any; } + flatZip(): AsyncStream<[T, T2]> { + return this as any; + } + transform(transformer: AsyncTransformer): AsyncStream { + return AsyncStreamConstructorsImpl.from( + async () => + await transformer.stateToResult( + await AsyncOptLazy.toMaybePromise(transformer.init) + ) + ); + } filter(): AsyncStream { return this; } @@ -1324,6 +1449,9 @@ class AsyncEmptyStream last(otherwise?: AsyncOptLazy): Promise { return AsyncOptLazy.toPromise(otherwise!); } + single(otherwise?: AsyncOptLazy): Promise { + return AsyncOptLazy.toPromise(otherwise!); + } async count(): Promise<0> { return 0; } @@ -1364,6 +1492,9 @@ class AsyncEmptyStream async contains(): Promise { return false; } + async containsSlice(): Promise { + return false; + } takeWhile(): AsyncStream { return this; } @@ -1417,6 +1548,18 @@ class AsyncEmptyStream } = {}): AsyncStream.NonEmpty { return fromAsyncStreamSource(start).concat(end) as any; } + splitOn(): AsyncStream { + return this as any; + } + splitWhere(): AsyncStream { + return this as any; + } + distinctPrevious(): AsyncStream { + return this; + } + window(): AsyncStream { + return this as any; + } fold(init: AsyncOptLazy): Promise { return AsyncOptLazy.toPromise(init); } @@ -1424,9 +1567,7 @@ class AsyncEmptyStream return this as any; } async reduce(reducer: AsyncReducer): Promise { - return reducer.stateToResult( - await AsyncOptLazy.toMaybePromise(reducer.init) - ); + return reducer.stateToResult(await AsyncOptLazy.toPromise(reducer.init)); } reduceStream(): any { return this; @@ -1447,6 +1588,12 @@ class AsyncEmptyStream toString(): string { return `AsyncStream()`; } + async toJSON(): Promise> { + return { + dataType: 'AsyncStream', + value: [], + }; + } } export class FromSource extends AsyncStreamBase { diff --git a/deno_dist/stream/async/async-transformer.ts b/deno_dist/stream/async/async-transformer.ts new file mode 100644 index 000000000..909405757 --- /dev/null +++ b/deno_dist/stream/async/async-transformer.ts @@ -0,0 +1,146 @@ +import { AsyncOptLazy, AsyncReducer, Eq } from '../../common/mod.ts'; +import { AsyncStream, AsyncStreamSource } from '../../stream/async/index.ts'; + +/** + * An AsyncReducer that produces instances of `AsyncStreamSource`. + * @typeparam T - the input element type + * @typeparam R - the result stream element type + */ +export type AsyncTransformer = AsyncReducer>; + +export namespace AsyncTransformer { + /** + * An AsyncReducer that produces instances `AsyncStreamSource.NonEmpty`. + * @typeparam T - the input element type + * @typeparam R - the result stream element type + */ + export type NonEmpty = AsyncReducer< + T, + AsyncStreamSource.NonEmpty + >; + + /** + * Returns an async transformer that produces windows/collections of `windowSize` size, each + * window starting `skipAmount` of elements after the previous, and optionally collected + * by a custom reducer. + * @typeparam T - the input element type + * @typeparam R - the window type + * @param windowSize - the amount of elements for each window + * @param skipAmount - (default: `windowSize`) the amount of elements between the start of each window + * @param collector - (default: Reducer.toArray()) the reducer to use to convert elements to windows + * @example + * ```ts + * await AsyncStream.of(1, 2, 3, 4, 5, 6) + * .transform(AsyncTransformer.window(3)) + * .toArray() + * // => [[1, 2, 3], [4, 5, 6]] + * ``` + */ + export const window: { + (windowSize: number, skipAmount?: number): AsyncTransformer; + ( + windowSize: number, + skipAmount?: number, + collector?: AsyncReducer + ): AsyncTransformer; + } = ( + windowSize: number, + skipAmount = windowSize, + collector = AsyncReducer.toArray() + ) => { + return AsyncReducer.create< + T, + AsyncStream, + Set<{ result: unknown; size: number; halted: boolean; halt: () => void }> + >( + () => new Set(), + async (state, elem, index) => { + for (const current of state) { + if (current.size >= windowSize || current.halted) { + state.delete(current); + } + + current.result = await collector.next( + current.result, + elem, + current.size, + current.halt + ); + current.size++; + } + + if (index % skipAmount === 0) { + const newState = { + result: await AsyncOptLazy.toMaybePromise(collector.init), + size: 1, + halted: false, + halt(): void { + this.halted = true; + }, + }; + + newState.result = collector.next( + await AsyncOptLazy.toMaybePromise(collector.init), + elem, + 0, + newState.halt + ); + + state.add(newState); + } + + return state; + }, + (current) => { + return AsyncStream.from(current) + .collect((v, _, skip) => + v.size === windowSize + ? AsyncStream.of(collector.stateToResult(v.result) as any) + : skip + ) + .first(AsyncStream.empty()); + } + ); + }; + + /** + * Returns an async transformer that returns only those elements from the input that are different to previous element + * according to the optionally given `eq` function. + * @param eq - (default: `Eq.objectIs`) the equality testing function + * @example + * ```ts + * await AsyncStream.of(1, 1, 2, 3, 2, 2) + * .transform(AsyncTransformer.distinctPrevious()) + * .toArray() + * // => [1, 2, 3, 2] + * ``` + */ + export function distinctPrevious( + eq: Eq = Eq.objectIs + ): AsyncTransformer { + return AsyncReducer.create( + () => [] as T[], + (current, elem) => { + current.push(elem); + + if (current.length > 2) { + current.shift(); + } + + return current; + }, + (state) => { + if (state.length > 0) { + if (state.length === 1) { + return AsyncStream.of(state[0]); + } + if (!eq(state[0], state[1])) { + return AsyncStream.of(state[1]); + } + } + + return AsyncStream.empty(); + } + ); + } +} diff --git a/deno_dist/stream/async/index.ts b/deno_dist/stream/async/index.ts index c327b33aa..d4bf2a0b0 100644 --- a/deno_dist/stream/async/index.ts +++ b/deno_dist/stream/async/index.ts @@ -12,3 +12,5 @@ export * from './async-fast-iterable.ts'; export * from './async-streamable.ts'; export * from './interface.ts'; + +export * from './async-transformer.ts'; diff --git a/deno_dist/stream/async/interface.ts b/deno_dist/stream/async/interface.ts index 3a9c54e12..c80a8b1e5 100644 --- a/deno_dist/stream/async/interface.ts +++ b/deno_dist/stream/async/interface.ts @@ -12,6 +12,7 @@ import type { AsyncFastIterable, AsyncStreamable, AsyncStreamSource, + AsyncTransformer, } from '../../stream/async/index.ts'; import type { AsyncStreamConstructors } from '../../stream/async-custom/index.ts'; import { AsyncStreamConstructorsImpl } from '../../stream/async-custom/index.ts'; @@ -227,18 +228,15 @@ export interface AsyncStream /** * Returns an AsyncStream consisting of the concatenation of AsyncStreamSource elements resulting from applying the given `reducer` to each element. * @typeparam R - the resulting element type - * @param reducer - an async reducer taking elements ot type T as input, and returing an `AsyncStreamSource` of element type R - * + * @param transformer - an async reducer taking elements ot type T as input, and returing an `AsyncStreamSource` of element type R * @note O(1) * @example * ```ts - * await AsyncStream.of(1, 2, 3, 4, 5, 6).flatReduceStream(Reducer.windowReducer(2)).toArray() + * await AsyncStream.of(1, 2, 3, 4, 5, 6).transform(AsyncTransformer.window(3)).toArray() * // => [[1, 2, 3], [4, 5, 6]] * ``` */ - flatReduceStream( - reducer: AsyncReducer> - ): AsyncStream; + transform(transformer: AsyncTransformer): AsyncStream; /** * Returns an AsyncStream containing only those elements from this stream for which the given `pred` function returns true. * @param pred - a potentially asynchronous function taking an element and its index, and returning true if the element should be included in the resulting stream. @@ -330,8 +328,22 @@ export interface AsyncStream * ``` * @note O(1) */ - first(): MaybePromise; + first(): Promise; first(otherwise: AsyncOptLazy): Promise; + /** + * Returns the first element of the Stream if it only has one element, or a fallback value if the Stream does not have exactly one value. + * @typeparam O - the optional value to return if the stream does not have exactly one value. + * @param otherwise - (default: undefined) an `OptLazy` value to return if the Stream does not have exactly one value. + * @example + * ```ts + * await AsyncStream.empty().single() // => undefined + * await AsyncStream.of(1, 2, 3).single() // => undefined + * await AsyncStream.of(1).single() // => 1 + * await AsyncStream.of(1, 2, 3).single(0) // => 0 + * ``` + */ + single(): Promise; + single(otherwise: AsyncOptLazy): Promise; /** * Returns the last element of the AsyncStream, or a fallback value (default undefined) if the stream is empty. * @typeparam O - the optional value type to return if the stream is empty @@ -524,6 +536,23 @@ export interface AsyncStream * @note O(N) */ contains(value: T, amount?: number, eq?: Eq): Promise; + /** + * Returns true if this stream contains the same sequence of elements as the given `source`, + * false otherwise. + * @param source - a non-empty async stream source containing the element sequence to find + * @param eq - (default: `Eq.objectIs`) the function to use to test element equality + * @example + * ```ts + * await AsyncStream.of(1, 2, 3, 4, 5).containsSlice([2, 3, 4]) + * // => true + * await AsyncStream.of(1, 2, 3, 4, 5).containsSlice([4, 3, 2]) + * // => false + * ``` + */ + containsSlice( + source: AsyncStreamSource.NonEmpty, + eq?: Eq + ): Promise; /** * Returns an AsyncStream that contains the elements of this stream up to the first element that does not satisfy given `pred` function. * @param pred - a potentially asynchronous predicate function taking an element and its index @@ -752,6 +781,40 @@ export interface AsyncStream * @note O(1) */ splitOn(sepElem: T, eq?: Eq): AsyncStream; + /** + * Returns an AsyncStream containing non-repetitive elements of the source stream, where repetitive elements + * are compared using the optionally given `eq` equality function. + * @param eq - (default: `Eq.objectIs`) the `Eq` instance to use to test equality of elements + * @example + * ```ts + * await AsyncStream.of(1, 1, 2, 2, 3, 1).distinctPrevious().toArray() + * // => [1, 2, 3, 1] + * ``` + */ + distinctPrevious(eq?: Eq): AsyncStream; + /** + * Returns an AsyncStream containing `windows` of `windowSize` consecutive elements of the source stream, with each + * window starting `skipAmount` elements after the previous one. + * @typeparam R - the collector reducer result type + * @param windowSize - the size in elements of the windows + * @param skipAmount - (default: `windowSize`) the amount of elements to skip to start the next window + * @param collector - (default: `AsyncArray.toArray()`) the async reducer to use to collect the window values + * @example + * ```ts + * await Stream.of(1, 2, 3, 4, 5, 6, 7).window(3).toArray() + * // => [[1, 2, 3], [4, 5, 6]] + * await Stream.of(1, 2, 3, 4, 5).window(3, 1).toArray() + * // => [[1, 2, 3], [2, 3, 4], [3, 4, 5]] + * await Stream.of(1, 2, 3, 4).window(2, 2, AsyncReducer.toJSSet()).toArray() + * // => [Set(1, 2), Set(3, 4)] + * ``` + */ + window(windowSize: number, skipAmount?: number): AsyncStream; + window( + windowSize: number, + skipAmount?: number, + collector?: AsyncReducer + ): AsyncStream; /** * Returns the value resulting from applying the given the given `next` function to a current state (initially the given `init` value), * and the next stream value, and returning the new state. When all elements are processed, the resulting state is returned. @@ -1045,21 +1108,18 @@ export namespace AsyncStream { /** * Returns an AsyncStream consisting of the concatenation of AsyncStreamSource elements resulting from applying the given `reducer` to each element. * @typeparam R - the resulting element type - * @param reducer - an async reducer taking elements ot type T as input, and returing an `AsyncStreamSource` of element type R - * + * @param transformer - an async reducer taking elements ot type T as input, and returing an `AsyncStreamSource` of element type R. * @note O(1) * @example * ```ts - * await AsyncStream.of(1, 2, 3, 4, 5, 6).flatReduceStream(Reducer.windowReducer(2)).toArray() + * await AsyncStream.of(1, 2, 3, 4, 5, 6).transform(AsyncTransformer.window(3)).toArray() * // => [[1, 2, 3], [4, 5, 6]] * ``` */ - flatReduceStream( - reducer: AsyncReducer> + transform( + transformer: AsyncTransformer.NonEmpty ): AsyncStream.NonEmpty; - flatReduceStream( - reducer: AsyncReducer> - ): AsyncStream; + transform(transformer: AsyncTransformer): AsyncStream; /** * Returns the first element of the AsyncStream. * @example diff --git a/deno_dist/stream/custom/fast-iterator-custom.ts b/deno_dist/stream/custom/fast-iterator-custom.ts index 2e5e4279f..69a764dd6 100644 --- a/deno_dist/stream/custom/fast-iterator-custom.ts +++ b/deno_dist/stream/custom/fast-iterator-custom.ts @@ -1008,3 +1008,111 @@ export class MapPureIterator< return this.mapFun(next, ...this.args); } } + +export class DistinctPreviousIterator extends FastIteratorBase { + constructor(readonly source: FastIterator, readonly eq: Eq) { + super(); + } + + readonly previous = [] as T[]; + + fastNext(otherwise?: OptLazy): T | O { + const done = Symbol('Done'); + + let next: T | typeof done; + const source = this.source; + const previous = this.previous; + + while (done !== (next = source.fastNext(done))) { + previous.push(next); + + if (previous.length === 1) { + return next; + } + + const prev = previous.shift()!; + + if (!this.eq(prev, next)) { + return next; + } + } + + return OptLazy(otherwise!); + } +} + +export class WindowIterator extends FastIteratorBase { + constructor( + readonly source: FastIterator, + readonly windowSize: number, + readonly skipAmount: number, + readonly collector: Reducer + ) { + super(); + } + + state = new Set<{ + result: unknown; + size: number; + halted: boolean; + halt: () => void; + }>(); + index = 0; + + fastNext(otherwise?: OptLazy): R | O { + const source = this.source; + const collector = this.collector; + const windowSize = this.windowSize; + const skipAmount = this.skipAmount; + const done = Symbol('Done'); + const state = this.state; + + let next: T | typeof done; + let result: R | typeof done = done; + + while (done !== (next = source.fastNext(done))) { + for (const current of state) { + current.result = collector.next( + current.result, + next, + current.size, + current.halt + ); + current.size++; + + if (current.size >= windowSize || current.halted) { + result = collector.stateToResult(current.result); + state.delete(current); + } + } + + if (this.index % skipAmount === 0) { + const newState = { + result: OptLazy(collector.init), + size: 1, + halted: false, + halt(): void { + this.halted = true; + }, + }; + + newState.result = collector.next( + OptLazy(collector.init), + next, + 0, + newState.halt + ); + + state.add(newState); + } + + this.index++; + + if (done !== result) { + return result; + } + } + + return OptLazy(otherwise!); + } +} diff --git a/deno_dist/stream/custom/stream-custom.ts b/deno_dist/stream/custom/stream-custom.ts index 1eea16853..f154d6057 100644 --- a/deno_dist/stream/custom/stream-custom.ts +++ b/deno_dist/stream/custom/stream-custom.ts @@ -11,7 +11,12 @@ import { ToJSON, TraverseState, } from '../../common/mod.ts'; -import type { FastIterator, Stream, StreamSource } from '../../stream/mod.ts'; +import type { + FastIterator, + Stream, + StreamSource, + Transformer, +} from '../../stream/mod.ts'; import type { StreamConstructors } from '../../stream/custom/index.ts'; import { isFastIterator, @@ -50,6 +55,8 @@ import { MapIterator, MapPureIterator, PrependIterator, + DistinctPreviousIterator, + WindowIterator, } from '../../stream/custom/index.ts'; import { RimbuError, Token } from '../../base/mod.ts'; @@ -151,8 +158,8 @@ export abstract class StreamBase implements Stream { ); } - flatReduceStream(reducer: Reducer>): Stream { - return StreamConstructorsImpl.flatten(this.reduceStream(reducer)); + transform(transformer: Transformer): Stream { + return StreamConstructorsImpl.flatten(this.reduceStream(transformer)); } filter( @@ -203,6 +210,21 @@ export abstract class StreamBase implements Stream { return lastValue; } + single(otherwise?: OptLazy): T | O { + const iterator = this[Symbol.iterator](); + const done = Symbol('Done'); + + const value = iterator.fastNext(done); + + if (done !== value) { + if (done === iterator.fastNext(done)) { + return value; + } + } + + return OptLazy(otherwise!); + } + count(): number { let result = 0; @@ -358,6 +380,26 @@ export abstract class StreamBase implements Stream { return undefined !== this.indexOf(searchValue, amount, eq); } + containsSlice(source: StreamSource.NonEmpty, eq = Eq.objectIs): boolean { + const iterator = this[Symbol.iterator](); + const sourceStream = fromStreamSource(source); + let sourceIterator = sourceStream[Symbol.iterator](); + + const done = Symbol('Done'); + + while (true) { + const sourceValue = sourceIterator.fastNext(done); + if (done === sourceValue) return true; + + const value = iterator.fastNext(done); + if (done === value) return false; + + if (!eq(sourceValue, value)) { + sourceIterator = sourceStream[Symbol.iterator](); + } + } + } + takeWhile(pred: (value: T, index: number) => boolean): Stream { return new TakeWhileStream(this, pred); } @@ -479,6 +521,18 @@ export abstract class StreamBase implements Stream { return new SplitOnStream(this, sepElem, eq); } + distinctPrevious(eq: Eq = Eq.objectIs): Stream { + return new DistinctPreviousStream(this, eq); + } + + window( + windowSize: number, + skipAmount = windowSize, + collector: Reducer = Reducer.toArray() as any + ): Stream { + return new WindowStream(this, windowSize, skipAmount, collector); + } + fold( init: OptLazy, next: (current: R, value: T, index: number, halt: () => void) => R @@ -1138,14 +1192,29 @@ class EmptyStream extends StreamBase implements Stream { return emptyFastIterator; } + stream(): this { + return this; + } assumeNonEmpty(): never { RimbuError.throwEmptyCollectionAssumedNonEmptyError(); } + equals(other: StreamSource): boolean { + const done = Symbol('Done'); + return done === fromStreamSource(other)[Symbol.iterator]().fastNext(done); + } + prepend(value: OptLazy): Stream.NonEmpty { + return StreamConstructorsImpl.of(OptLazy(value)); + } + append(value: OptLazy): Stream.NonEmpty { + return StreamConstructorsImpl.of(OptLazy(value)); + } forEach(): void { // } - + forEachPure(): void { + // + } indexed(): Stream<[number, T]> { return this as any; } @@ -1161,9 +1230,9 @@ class EmptyStream extends StreamBase implements Stream { flatZip(): Stream<[T, T2]> { return this as any; } - flatReduceStream(reducer: Reducer>): Stream { + transform(transformer: Transformer): Stream { return StreamConstructorsImpl.from( - reducer.stateToResult(OptLazy(reducer.init)) + transformer.stateToResult(OptLazy(transformer.init)) ); } filter(): Stream { @@ -1187,6 +1256,9 @@ class EmptyStream extends StreamBase implements Stream { last(otherwise?: OptLazy): O { return OptLazy(otherwise) as O; } + single(otherwise?: OptLazy): O { + return OptLazy(otherwise) as O; + } count(): 0 { return 0; } @@ -1227,6 +1299,9 @@ class EmptyStream extends StreamBase implements Stream { contains(): false { return false; } + containsSlice(): false { + return false; + } takeWhile(): Stream { return this; } @@ -1275,6 +1350,18 @@ class EmptyStream extends StreamBase implements Stream { } = {}): Stream.NonEmpty { return fromStreamSource(start).concat(end) as any; } + splitOn(): Stream { + return this as any; + } + splitWhere(): Stream { + return this as any; + } + distinctPrevious(): Stream { + return this; + } + window(): Stream { + return this as any; + } fold(init: OptLazy): R { return OptLazy(init); } @@ -1299,6 +1386,12 @@ class EmptyStream extends StreamBase implements Stream { toString(): string { return `Stream()`; } + toJSON(): ToJSON { + return { + dataType: 'Stream', + value: [], + }; + } } export class ArrayStream extends StreamBase { @@ -1657,6 +1750,39 @@ export class RangeStream extends StreamBase { } } +export class DistinctPreviousStream extends StreamBase { + constructor(readonly source: Stream, readonly eq: Eq) { + super(); + } + + [Symbol.iterator](): FastIterator { + return new DistinctPreviousIterator( + this.source[Symbol.iterator](), + this.eq + ); + } +} + +export class WindowStream extends StreamBase { + constructor( + readonly source: Stream, + readonly windowSize: number, + readonly skipAmount: number, + readonly collector: Reducer + ) { + super(); + } + + [Symbol.iterator](): FastIterator { + return new WindowIterator( + this.source[Symbol.iterator](), + this.windowSize, + this.skipAmount, + this.collector + ); + } +} + export const emptyStream: Stream = Object.freeze(new EmptyStream()); export function isStream(obj: any): obj is Stream { diff --git a/deno_dist/stream/main/index.ts b/deno_dist/stream/main/index.ts index 2e95e1d32..d9acc5741 100644 --- a/deno_dist/stream/main/index.ts +++ b/deno_dist/stream/main/index.ts @@ -16,4 +16,6 @@ export * from './stream-source.ts'; export * from './interface.ts'; +export * from './transformer.ts'; + export * from '../async/index.ts'; diff --git a/deno_dist/stream/main/interface.ts b/deno_dist/stream/main/interface.ts index 4f0ec9dad..083c841dd 100644 --- a/deno_dist/stream/main/interface.ts +++ b/deno_dist/stream/main/interface.ts @@ -9,7 +9,12 @@ import type { } from '../../common/mod.ts'; import type { StreamConstructors } from '../../stream/custom/index.ts'; import { StreamConstructorsImpl } from '../../stream/custom/index.ts'; -import type { FastIterable, Streamable, StreamSource } from '../../stream/mod.ts'; +import type { + FastIterable, + Streamable, + StreamSource, + Transformer, +} from '../../stream/mod.ts'; /** * A possibly infinite sequence of elements of type T. @@ -210,16 +215,17 @@ export interface Stream extends FastIterable, Streamable { /** * Returns a Stream consisting of the concatenation of StreamSource elements resulting from applying the given `reducer` to each element. * @typeparam R - the resulting element type - * @param reducer - a reducer taking elements ot type T as input, and returing a `StreamSource` of element type R - * + * @param transformer - a reducer taking elements ot type T as input, and returing a `StreamSource` of element type R * @note O(1) * @example * ```ts - * Stream.of(1, 2, 3, 4, 5, 6).flatReduceStream(Reducer.windowReducer(2)).toArray() + * Stream.of(1, 2, 3, 4, 5, 6) + * .transform(Transformer.window(3)) + * .toArray() * // => [[1, 2, 3], [4, 5, 6]] * ``` */ - flatReduceStream(reducer: Reducer>): Stream; + transform(transformer: Transformer): Stream; /** * Returns a Stream containing only those elements from this Stream for which the given `pred` function returns true. * @param pred - a function taking an element and its index, and returning true if the element should be included in the resulting Stream. @@ -319,7 +325,7 @@ export interface Stream extends FastIterable, Streamable { /** * Returns the last element of the Stream, or a fallback value (default undefined) if the Stream is empty. * @typeparam O - the optional value type to return if the stream is empty - * @param otherwise - (default: undefined) an `OptLazy` value to be returned if the Stream is empty. + * @param otherwise - (default: undefined) an `OptLazy` value to return if the Stream is empty. * @example * ```ts * Stream.of(1, 2, 3).last() // => 3 @@ -330,6 +336,20 @@ export interface Stream extends FastIterable, Streamable { */ last(): T | undefined; last(otherwise: OptLazy): T | O; + /** + * Returns the first element of the Stream if it only has one element, or a fallback value if the Stream does not have exactly one value. + * @typeparam O - the optional value to return if the stream does not have exactly one value. + * @param otherwise - (default: undefined) an `OptLazy` value to return if the Stream does not have exactly one value. + * @example + * ```ts + * Stream.empty().single() // => undefined + * Stream.of(1, 2, 3).single() // => undefined + * Stream.of(1).single() // => 1 + * Stream.of(1, 2, 3).single(0) // => 0 + * ``` + */ + single(): T | undefined; + single(otherwise: OptLazy): T | O; /** * Returns the amount of elements in the Stream. * @example @@ -497,6 +517,20 @@ export interface Stream extends FastIterable, Streamable { * @note O(N) */ contains(value: T, amount?: number, eq?: Eq): boolean; + /** + * Returns true if this stream contains the same sequence of elements as the given `source`, + * false otherwise. + * @param source - a non-empty stream source containing the element sequence to find + * @param eq - (default: `Eq.objectIs`) the function to use to test element equality + * @example + * ```ts + * Stream.of(1, 2, 3, 4, 5).containsSlice([2, 3, 4]) + * // => true + * Stream.of(1, 2, 3, 4, 5).containsSlice([4, 3, 2]) + * // => false + * ``` + */ + containsSlice(source: StreamSource.NonEmpty, eq?: Eq): boolean; /** * Returns a Stream that contains the elements of this Stream up to the first element that does not satisfy given `pred` function. * @param pred - a predicate function taking an element and its index @@ -708,6 +742,40 @@ export interface Stream extends FastIterable, Streamable { * @note O(1) */ splitOn(sepElem: T, eq?: Eq): Stream; + /** + * Returns a Stream containing non-repetitive elements of the source stream, where repetitive elements + * are compared using the optionally given `eq` equality function. + * @param eq - the `Eq` instance to use to test equality of elements + * @example + * ```ts + * Stream.of(1, 1, 2, 2, 3, 1).distinctPrevious().toArray() + * // => [1, 2, 3, 1] + * ``` + */ + distinctPrevious(eq?: Eq): Stream; + /** + * Returns a Stream containing `windows` of `windowSize` consecutive elements of the source stream, with each + * window starting `skipAmount` elements after the previous one. + * @typeparam R - the collector reducer result type + * @param windowSize - the size in elements of the windows + * @param skipAmount - (default: windowsize) the amount of elements to skip to start the next window + * @param collector - (optional, default: toArray reducer) the reducer to use to collect the window values + * @example + * ```ts + * console.log(Stream.of(1, 2, 3, 4, 5, 6, 7).window(3).toArray()) + * // => [[1, 2, 3], [4, 5, 6]] + * console.log(Stream.of(1, 2, 3, 4, 5).window(3, 1).toArray()) + * // => [[1, 2, 3], [2, 3, 4], [3, 4, 5]] + * console.log(Stream.of(1, 2, 3, 4).window(2, 2, Reducer.toJSSet()).toArray()) + * // => [Set(1, 2), Set(3, 4)] + * ``` + */ + window(windowSize: number, skipAmount?: number): Stream; + window( + windowSize: number, + skipAmount?: number, + collector?: Reducer + ): Stream; /** * Returns the value resulting from applying the given the given `next` function to a current state (initially the given `init` value), * and the next Stream value, and returning the new state. When all elements are processed, the resulting state is returned. @@ -987,19 +1055,18 @@ export namespace Stream { /** * Returns a Stream consisting of the concatenation of StreamSource elements resulting from applying the given `reducer` to each element. * @typeparam R - the resulting element type - * @param reducer - a reducer taking elements ot type T as input, and returing a `StreamSource` of element type R - * + * @param transformer - a reducer taking elements ot type T as input, and returing a `StreamSource` of element type R * @note O(1) * @example * ```ts - * Stream.of(1, 2, 3, 4, 5, 6).flatReduceStream(Reducer.createWindowReducer(2)).toArray() + * Stream.of(1, 2, 3, 4, 5, 6) + * .transform(Transformer.window(3)) + * .toArray() * // => [[1, 2, 3], [4, 5, 6]] * ``` */ - flatReduceStream( - reducer: Reducer> - ): Stream.NonEmpty; - flatReduceStream(reducer: Reducer>): Stream; + transform(transformer: Transformer.NonEmpty): Stream.NonEmpty; + transform(transformer: Transformer): Stream; /** * Returns the first element of the Stream. * @example diff --git a/deno_dist/stream/main/transformer.ts b/deno_dist/stream/main/transformer.ts new file mode 100644 index 000000000..48dbab4aa --- /dev/null +++ b/deno_dist/stream/main/transformer.ts @@ -0,0 +1,141 @@ +import { Eq, OptLazy, Reducer } from '../../common/mod.ts'; +import { Stream, StreamSource } from '../../stream/mod.ts'; + +/** + * A Reducer that produces instances of `StreamSource`. + * @typeparam T - the input element type + * @typeparam R - the result stream element type + */ +export type Transformer = Reducer>; + +export namespace Transformer { + /** + * A Reducer that produces instances of `StreamSource.NonEmpty`. + * @typeparam T - the input element type + * @typeparam R - the result stream element type + */ + export type NonEmpty = Reducer>; + + /** + * Returns a transformer that produces windows/collections of `windowSize` size, each + * window starting `skipAmount` of elements after the previous, and optionally collected + * by a custom reducer. + * @typeparam T - the input element type + * @typeparam R - the window type + * @param windowSize - the amount of elements for each window + * @param skipAmount - (default: `windowSize`) the amount of elements between the start of each window + * @param collector - (default: Reducer.toArray()) the reducer to use to convert elements to windows + * @example + * ```ts + * Stream.of(1, 2, 3, 4, 5, 6) + * .transform(Transformer.window(3)) + * .toArray() + * // => [[1, 2, 3], [4, 5, 6]] + * ``` + */ + export const window: { + (windowSize: number, skipAmount?: number): Transformer; + ( + windowSize: number, + skipAmount?: number, + collector?: Reducer + ): Transformer; + } = ( + windowSize: number, + skipAmount = windowSize, + collector = Reducer.toArray() + ) => { + return Reducer.create< + T, + Stream, + Set<{ result: unknown; size: number; halted: boolean; halt: () => void }> + >( + () => new Set(), + (state, elem, index) => { + for (const current of state) { + if (current.size >= windowSize || current.halted) { + state.delete(current); + } + + current.result = collector.next( + current.result, + elem, + current.size, + current.halt + ); + current.size++; + } + + if (index % skipAmount === 0) { + const newState = { + result: OptLazy(collector.init), + size: 1, + halted: false, + halt(): void { + this.halted = true; + }, + }; + + newState.result = collector.next( + OptLazy(collector.init), + elem, + 0, + newState.halt + ); + + state.add(newState); + } + + return state; + }, + (current) => { + return Stream.from(current) + .collect((v, _, skip) => + v.size === windowSize + ? Stream.of(collector.stateToResult(v.result) as any) + : skip + ) + .first(Stream.empty()); + } + ); + }; + + /** + * Returns a transformer that returns only those elements from the input that are different to previous element + * according to the optionally given `eq` function. + * @param eq - (default: `Eq.objectIs`) the equality testing function + * @example + * ```ts + * Stream.of(1, 1, 2, 3, 2, 2) + * .transform(Transformer.distinctPrevious()) + * .toArray() + * // => [1, 2, 3, 2] + * ``` + */ + export function distinctPrevious(eq: Eq = Eq.objectIs): Transformer { + return Reducer.create( + () => [] as T[], + (current, elem) => { + current.push(elem); + + if (current.length > 2) { + current.shift(); + } + + return current; + }, + (state) => { + if (state.length > 0) { + if (state.length === 1) { + return Stream.of(state[0]); + } + if (!eq(state[0], state[1])) { + return Stream.of(state[1]); + } + } + + return Stream.empty(); + } + ); + } +} diff --git a/packages/stream/src/async-custom/async-fast-iterator-base.ts b/packages/stream/src/async-custom/async-fast-iterator-base.ts index e5a68c190..9db99a52c 100644 --- a/packages/stream/src/async-custom/async-fast-iterator-base.ts +++ b/packages/stream/src/async-custom/async-fast-iterator-base.ts @@ -1245,3 +1245,113 @@ export class AsyncReduceAllIterator extends AsyncFastIteratorBase { ) as any; } } + +export class AsyncDistinctPreviousIterator extends AsyncFastIteratorBase { + constructor(readonly source: AsyncFastIterator, readonly eq: Eq) { + super(); + this.return = (): Promise => closeIters(source); + } + + readonly previous = [] as T[]; + + async fastNext(otherwise?: AsyncOptLazy): Promise { + const done = Symbol('Done'); + + let next: T | typeof done; + const source = this.source; + const previous = this.previous; + + while (done !== (next = await source.fastNext(done))) { + previous.push(next); + + if (previous.length === 1) { + return next; + } + + const prev = previous.shift()!; + + if (!this.eq(prev, next)) { + return next; + } + } + + return AsyncOptLazy.toMaybePromise(otherwise!); + } +} + +export class AsyncWindowIterator extends AsyncFastIteratorBase { + constructor( + readonly source: AsyncFastIterator, + readonly windowSize: number, + readonly skipAmount: number, + readonly collector: AsyncReducer + ) { + super(); + this.return = (): Promise => closeIters(source); + } + + state = new Set<{ + result: unknown; + size: number; + halted: boolean; + halt: () => void; + }>(); + index = 0; + + async fastNext(otherwise?: AsyncOptLazy): Promise { + const source = this.source; + const collector = this.collector; + const windowSize = this.windowSize; + const skipAmount = this.skipAmount; + const done = Symbol('Done'); + const state = this.state; + + let next: T | typeof done; + let result: R | typeof done = done; + + while (done !== (next = await source.fastNext(done))) { + for (const current of state) { + current.result = await collector.next( + current.result, + next, + current.size, + current.halt + ); + current.size++; + + if (current.size >= windowSize || current.halted) { + result = await collector.stateToResult(current.result); + state.delete(current); + } + } + + if (this.index % skipAmount === 0) { + const newState = { + result: await AsyncOptLazy.toMaybePromise(collector.init), + size: 1, + halted: false, + halt(): void { + this.halted = true; + }, + }; + + newState.result = await collector.next( + AsyncOptLazy.toMaybePromise(collector.init), + next, + 0, + newState.halt + ); + + state.add(newState); + } + + this.index++; + + if (done !== result) { + return result; + } + } + + return AsyncOptLazy.toMaybePromise(otherwise!); + } +} diff --git a/packages/stream/src/async-custom/async-stream-custom.ts b/packages/stream/src/async-custom/async-stream-custom.ts index b2f27abd9..972101866 100644 --- a/packages/stream/src/async-custom/async-stream-custom.ts +++ b/packages/stream/src/async-custom/async-stream-custom.ts @@ -15,6 +15,7 @@ import type { AsyncFastIterator, AsyncStream, AsyncStreamSource, + AsyncTransformer, } from '@rimbu/stream/async'; import { closeIters, @@ -32,6 +33,7 @@ import { AsyncConcatIterator, AsyncDropIterator, AsyncSplitWhereIterator, + AsyncDistinctPreviousIterator, AsyncDropWhileIterator, AsyncFilterIterator, AsyncFilterPureIterator, @@ -52,6 +54,7 @@ import { AsyncStreamConstructors, AsyncTakeIterator, AsyncTakeWhileIterator, + AsyncWindowIterator, } from '@rimbu/stream/async-custom'; import { Stream } from '@rimbu/stream'; import type { StreamSource } from '@rimbu/stream'; @@ -196,10 +199,10 @@ export abstract class AsyncStreamBase implements AsyncStream { ); } - flatReduceStream( - reducer: AsyncReducer> + transform( + transformer: AsyncReducer> ): AsyncStream { - return AsyncStreamConstructorsImpl.flatten(this.reduceStream(reducer)); + return AsyncStreamConstructorsImpl.flatten(this.reduceStream(transformer)); } filter( @@ -239,7 +242,7 @@ export abstract class AsyncStreamBase implements AsyncStream { const value = await iter.fastNext(done); if (done === value) { - return AsyncOptLazy.toMaybePromise(otherwise!); + return AsyncOptLazy.toPromise(otherwise!); } await closeIters(iter); return value; @@ -255,11 +258,28 @@ export abstract class AsyncStreamBase implements AsyncStream { lastValue = value; } - if (done === lastValue) return AsyncOptLazy.toMaybePromise(otherwise!); + if (done === lastValue) return AsyncOptLazy.toPromise(otherwise!); return lastValue; } + async single(otherwise?: AsyncOptLazy): Promise { + const iterator = this[Symbol.asyncIterator](); + const done = Symbol('Done'); + + const value = await iterator.fastNext(done); + + if (done !== value) { + if (done === (await iterator.fastNext(done))) { + return value; + } + } + + await closeIters(iterator); + + return AsyncOptLazy.toPromise(otherwise!); + } + async count(): Promise { let result = 0; @@ -317,7 +337,7 @@ export abstract class AsyncStreamBase implements AsyncStream { occurrance = 1, otherwise?: AsyncOptLazy ): Promise { - if (occurrance <= 0) return AsyncOptLazy.toMaybePromise(otherwise!); + if (occurrance <= 0) return AsyncOptLazy.toPromise(otherwise!); const done = Symbol('Done'); const iterator = this[Symbol.asyncIterator](); @@ -333,7 +353,7 @@ export abstract class AsyncStreamBase implements AsyncStream { } } - return AsyncOptLazy.toMaybePromise(otherwise!); + return AsyncOptLazy.toPromise(otherwise!); } finally { if (done !== value!) await closeIters(iterator); } @@ -343,7 +363,7 @@ export abstract class AsyncStreamBase implements AsyncStream { index: number, otherwise?: AsyncOptLazy ): Promise { - if (index < 0) return AsyncOptLazy.toMaybePromise(otherwise!); + if (index < 0) return AsyncOptLazy.toPromise(otherwise!); const done = Symbol('Done'); const iterator = this[Symbol.asyncIterator](); @@ -358,7 +378,7 @@ export abstract class AsyncStreamBase implements AsyncStream { i++; } - return AsyncOptLazy.toMaybePromise(otherwise!); + return AsyncOptLazy.toPromise(otherwise!); } finally { if (done !== value!) await closeIters(iterator); } @@ -469,6 +489,34 @@ export abstract class AsyncStreamBase implements AsyncStream { return undefined !== (await this.indexOf(searchValue, amount, eq)); } + async containsSlice( + source: AsyncStreamSource.NonEmpty, + eq = Eq.objectIs + ): Promise { + const iterator = this[Symbol.asyncIterator](); + const sourceStream = fromAsyncStreamSource(source); + let sourceIterator = sourceStream[Symbol.asyncIterator](); + + const done = Symbol('Done'); + + while (true) { + const sourceValue = await sourceIterator.fastNext(done); + if (done === sourceValue) { + await closeIters(iterator); + return true; + } + + const value = await iterator.fastNext(done); + if (done === value) { + return false; + } + + if (!eq(sourceValue, value)) { + sourceIterator = sourceStream[Symbol.asyncIterator](); + } + } + } + takeWhile( pred: (value: T, index: number) => MaybePromise ): AsyncStream { @@ -526,7 +574,7 @@ export abstract class AsyncStreamBase implements AsyncStream { try { result = await iterator.fastNext(done); - if (done === result) return AsyncOptLazy.toMaybePromise(otherwise!); + if (done === result) return AsyncOptLazy.toPromise(otherwise!); while (done !== (value = await iterator.fastNext(done))) { if (compare(value, result) < 0) result = value; @@ -555,7 +603,7 @@ export abstract class AsyncStreamBase implements AsyncStream { try { result = await iterator.fastNext(done); - if (done === result) return AsyncOptLazy.toMaybePromise(otherwise!); + if (done === result) return AsyncOptLazy.toPromise(otherwise!); while (done !== (value = await iterator.fastNext(done))) { if (compare(value, result) > 0) result = value; @@ -624,6 +672,18 @@ export abstract class AsyncStreamBase implements AsyncStream { return new AsyncSplitOnStream(this, sepElem, eq); } + distinctPrevious(eq: Eq = Eq.objectIs): AsyncStream { + return new AsyncDistinctPreviousStream(this, eq); + } + + window( + windowSize: number, + skipAmount = windowSize, + collector: AsyncReducer = AsyncReducer.toArray() as any + ): AsyncStream { + return new AsyncWindowStream(this, windowSize, skipAmount, collector); + } + async fold( init: AsyncOptLazy, next: ( @@ -763,7 +823,7 @@ export abstract class AsyncStreamBase implements AsyncStream { reduceAllStream( ...reducers: { [K in keyof R]: AsyncReducer } ): any { - return new AsyncReduceAllStream(this, reducers); + return new AsyncReduceAllStream(this, reducers); } async toArray(): Promise { @@ -810,14 +870,14 @@ class AsyncPrependStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncPrependIterator( + return new AsyncPrependIterator( this.source[Symbol.asyncIterator](), this.item ); } async first(): Promise { - return AsyncOptLazy.toMaybePromise(this.item); + return AsyncOptLazy.toPromise(this.item); } async last(): Promise { @@ -835,7 +895,7 @@ class AsyncAppendStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncAppendIterator( + return new AsyncAppendIterator( this.source[Symbol.asyncIterator](), this.item ); @@ -846,7 +906,7 @@ class AsyncAppendStream extends AsyncStreamBase { } async last(): Promise { - return AsyncOptLazy.toMaybePromise(this.item); + return AsyncOptLazy.toPromise(this.item); } async count(): Promise { @@ -889,14 +949,14 @@ class AsyncMapStream extends AsyncStreamBase { async first(otherwise?: AsyncOptLazy): Promise { const done = Symbol('Done'); const value = await this.source.first(done); - if (done === value) return AsyncOptLazy.toMaybePromise(otherwise!); + if (done === value) return AsyncOptLazy.toPromise(otherwise!); return this.mapFun(value, 0); } async last(otherwise?: AsyncOptLazy): Promise { const done = Symbol('Done'); const value = await this.source.last(done); - if (done === value) return AsyncOptLazy.toMaybePromise(otherwise!); + if (done === value) return AsyncOptLazy.toPromise(otherwise!); return this.mapFun(value, 0); } @@ -910,14 +970,14 @@ class AsyncMapStream extends AsyncStreamBase { ): Promise { const done = Symbol('Done'); const value = await this.source.elementAt(index, done); - if (done === value) return AsyncOptLazy.toMaybePromise(otherwise!); + if (done === value) return AsyncOptLazy.toPromise(otherwise!); return this.mapFun(value, index); } map( mapFun: (value: T2, index: number) => MaybePromise ): AsyncStream { - return new AsyncMapStream(this.source, async (value, index) => + return new AsyncMapStream(this.source, async (value, index) => mapFun(await this.mapFun(value, index), index) ); } @@ -937,7 +997,7 @@ class AsyncMapPureStream< } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncMapPureIterator( + return new AsyncMapPureIterator( this.source[Symbol.asyncIterator](), this.mapFun, this.args @@ -947,14 +1007,14 @@ class AsyncMapPureStream< async first(otherwise?: AsyncOptLazy): Promise { const done = Symbol('Done'); const value = await this.source.first(done); - if (done === value) return AsyncOptLazy.toMaybePromise(otherwise!); + if (done === value) return AsyncOptLazy.toPromise(otherwise!); return this.mapFun(value, ...this.args); } async last(otherwise?: AsyncOptLazy): Promise { const done = Symbol('Done'); const value = await this.source.last(done); - if (done === value) return AsyncOptLazy.toMaybePromise(otherwise!); + if (done === value) return AsyncOptLazy.toPromise(otherwise!); return this.mapFun(value, ...this.args); } @@ -968,7 +1028,7 @@ class AsyncMapPureStream< ): Promise { const done = Symbol('Done'); const value = await this.source.elementAt(index, done); - if (done === value) return AsyncOptLazy.toMaybePromise(otherwise!); + if (done === value) return AsyncOptLazy.toPromise(otherwise!); return this.mapFun(value, ...this.args); } } @@ -1033,7 +1093,7 @@ class AsyncFilterStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncFilterIterator( + return new AsyncFilterIterator( this.source[Symbol.asyncIterator](), this.pred, this.invert @@ -1065,7 +1125,7 @@ class AsyncFilterPureStream< } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncFilterPureIterator( + return new AsyncFilterPureIterator( this.source[Symbol.asyncIterator](), this.pred, this.args, @@ -1083,7 +1143,7 @@ class AsyncCollectStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncCollectIterator( + return new AsyncCollectIterator( this.source[Symbol.asyncIterator](), this.collectFun ); @@ -1099,7 +1159,7 @@ class AsyncIndicesWhereStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncIndicesWhereIterator( + return new AsyncIndicesWhereIterator( this.source[Symbol.asyncIterator](), this.pred ); @@ -1116,7 +1176,7 @@ class AsyncIndicesOfStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncIndicesOfIterator( + return new AsyncIndicesOfIterator( this.source[Symbol.asyncIterator](), this.searchValue, this.eq @@ -1133,7 +1193,7 @@ class AsyncTakeWhileStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncTakeWhileIterator( + return new AsyncTakeWhileIterator( this.source[Symbol.asyncIterator](), this.pred ); @@ -1149,7 +1209,7 @@ class AsyncDropWhileStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncDropWhileIterator( + return new AsyncDropWhileIterator( this.source[Symbol.asyncIterator](), this.pred ); @@ -1162,7 +1222,7 @@ class AsyncTakeStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncTakeIterator( + return new AsyncTakeIterator( this.source[Symbol.asyncIterator](), this.amount ); @@ -1180,7 +1240,7 @@ class AsyncDropStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncDropIterator( + return new AsyncDropIterator( this.source[Symbol.asyncIterator](), this.amount ); @@ -1201,7 +1261,7 @@ class AsyncSplitWhereStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncSplitWhereIterator( + return new AsyncSplitWhereIterator( this.source[Symbol.asyncIterator](), this.pred ); @@ -1218,7 +1278,7 @@ class AsyncSplitOnStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncSplitOnIterator( + return new AsyncSplitOnIterator( this.source[Symbol.asyncIterator](), this.sepElem, this.eq @@ -1226,6 +1286,39 @@ class AsyncSplitOnStream extends AsyncStreamBase { } } +class AsyncDistinctPreviousStream extends AsyncStreamBase { + constructor(readonly source: AsyncStream, readonly eq: Eq) { + super(); + } + + [Symbol.asyncIterator](): AsyncFastIterator { + return new AsyncDistinctPreviousIterator( + this.source[Symbol.asyncIterator](), + this.eq + ); + } +} + +class AsyncWindowStream extends AsyncStreamBase { + constructor( + readonly source: AsyncStream, + readonly windowSize: number, + readonly skipAmount: number, + readonly collector: AsyncReducer + ) { + super(); + } + + [Symbol.asyncIterator](): AsyncFastIterator { + return new AsyncWindowIterator( + this.source[Symbol.asyncIterator](), + this.windowSize, + this.skipAmount, + this.collector + ); + } +} + class AsyncReduceStream extends AsyncStreamBase { constructor( readonly source: AsyncStream, @@ -1235,7 +1328,7 @@ class AsyncReduceStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncReduceIterator( + return new AsyncReduceIterator( this.source[Symbol.asyncIterator](), this.reducerDef ); @@ -1251,7 +1344,7 @@ class AsyncReduceAllStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncReduceAllIterator( + return new AsyncReduceAllIterator( this.source[Symbol.asyncIterator](), this.reducers ); @@ -1264,7 +1357,7 @@ export class AsyncOfStream extends AsyncStreamBase { } [Symbol.asyncIterator](): AsyncFastIterator { - return new AsyncOfIterator(this.values); + return new AsyncOfIterator(this.values); } } @@ -1280,29 +1373,61 @@ class AsyncEmptyStream return emptyAsyncFastIterator; } + asyncStream(): this { + return this; + } + + async equals(other: AsyncStreamSource): Promise { + if (other === this) return true; + + const done = Symbol('done'); + + return ( + done === + fromAsyncStreamSource(other)[Symbol.asyncIterator]().fastNext(done) + ); + } + + prepend(value: AsyncOptLazy): AsyncStream.NonEmpty { + return AsyncStreamConstructorsImpl.of(value); + } + + append(value: AsyncOptLazy): AsyncStream.NonEmpty { + return AsyncStreamConstructorsImpl.of(value); + } + assumeNonEmpty(): never { RimbuError.throwEmptyCollectionAssumedNonEmptyError(); } - async forEach(): Promise { // } - + async forEachPure(): Promise { + // + } indexed(): AsyncStream<[number, T]> { return this as any; } - map(): AsyncStream { return this as any; } - mapPure(): AsyncStream { return this as any; } - flatMap(): AsyncStream { return this as any; } + flatZip(): AsyncStream<[T, T2]> { + return this as any; + } + transform(transformer: AsyncTransformer): AsyncStream { + return AsyncStreamConstructorsImpl.from( + async () => + await transformer.stateToResult( + await AsyncOptLazy.toMaybePromise(transformer.init) + ) + ); + } filter(): AsyncStream { return this; } @@ -1324,6 +1449,9 @@ class AsyncEmptyStream last(otherwise?: AsyncOptLazy): Promise { return AsyncOptLazy.toPromise(otherwise!); } + single(otherwise?: AsyncOptLazy): Promise { + return AsyncOptLazy.toPromise(otherwise!); + } async count(): Promise<0> { return 0; } @@ -1364,6 +1492,9 @@ class AsyncEmptyStream async contains(): Promise { return false; } + async containsSlice(): Promise { + return false; + } takeWhile(): AsyncStream { return this; } @@ -1417,6 +1548,18 @@ class AsyncEmptyStream } = {}): AsyncStream.NonEmpty { return fromAsyncStreamSource(start).concat(end) as any; } + splitOn(): AsyncStream { + return this as any; + } + splitWhere(): AsyncStream { + return this as any; + } + distinctPrevious(): AsyncStream { + return this; + } + window(): AsyncStream { + return this as any; + } fold(init: AsyncOptLazy): Promise { return AsyncOptLazy.toPromise(init); } @@ -1424,9 +1567,7 @@ class AsyncEmptyStream return this as any; } async reduce(reducer: AsyncReducer): Promise { - return reducer.stateToResult( - await AsyncOptLazy.toMaybePromise(reducer.init) - ); + return reducer.stateToResult(await AsyncOptLazy.toPromise(reducer.init)); } reduceStream(): any { return this; @@ -1447,6 +1588,12 @@ class AsyncEmptyStream toString(): string { return `AsyncStream()`; } + async toJSON(): Promise> { + return { + dataType: 'AsyncStream', + value: [], + }; + } } export class FromSource extends AsyncStreamBase { diff --git a/packages/stream/src/async/async-transformer.ts b/packages/stream/src/async/async-transformer.ts new file mode 100644 index 000000000..4b8df7b60 --- /dev/null +++ b/packages/stream/src/async/async-transformer.ts @@ -0,0 +1,146 @@ +import { AsyncOptLazy, AsyncReducer, Eq } from '@rimbu/common'; +import { AsyncStream, AsyncStreamSource } from '@rimbu/stream/async'; + +/** + * An AsyncReducer that produces instances of `AsyncStreamSource`. + * @typeparam T - the input element type + * @typeparam R - the result stream element type + */ +export type AsyncTransformer = AsyncReducer>; + +export namespace AsyncTransformer { + /** + * An AsyncReducer that produces instances `AsyncStreamSource.NonEmpty`. + * @typeparam T - the input element type + * @typeparam R - the result stream element type + */ + export type NonEmpty = AsyncReducer< + T, + AsyncStreamSource.NonEmpty + >; + + /** + * Returns an async transformer that produces windows/collections of `windowSize` size, each + * window starting `skipAmount` of elements after the previous, and optionally collected + * by a custom reducer. + * @typeparam T - the input element type + * @typeparam R - the window type + * @param windowSize - the amount of elements for each window + * @param skipAmount - (default: `windowSize`) the amount of elements between the start of each window + * @param collector - (default: Reducer.toArray()) the reducer to use to convert elements to windows + * @example + * ```ts + * await AsyncStream.of(1, 2, 3, 4, 5, 6) + * .transform(AsyncTransformer.window(3)) + * .toArray() + * // => [[1, 2, 3], [4, 5, 6]] + * ``` + */ + export const window: { + (windowSize: number, skipAmount?: number): AsyncTransformer; + ( + windowSize: number, + skipAmount?: number, + collector?: AsyncReducer + ): AsyncTransformer; + } = ( + windowSize: number, + skipAmount = windowSize, + collector = AsyncReducer.toArray() + ) => { + return AsyncReducer.create< + T, + AsyncStream, + Set<{ result: unknown; size: number; halted: boolean; halt: () => void }> + >( + () => new Set(), + async (state, elem, index) => { + for (const current of state) { + if (current.size >= windowSize || current.halted) { + state.delete(current); + } + + current.result = await collector.next( + current.result, + elem, + current.size, + current.halt + ); + current.size++; + } + + if (index % skipAmount === 0) { + const newState = { + result: await AsyncOptLazy.toMaybePromise(collector.init), + size: 1, + halted: false, + halt(): void { + this.halted = true; + }, + }; + + newState.result = collector.next( + await AsyncOptLazy.toMaybePromise(collector.init), + elem, + 0, + newState.halt + ); + + state.add(newState); + } + + return state; + }, + (current) => { + return AsyncStream.from(current) + .collect((v, _, skip) => + v.size === windowSize + ? AsyncStream.of(collector.stateToResult(v.result) as any) + : skip + ) + .first(AsyncStream.empty()); + } + ); + }; + + /** + * Returns an async transformer that returns only those elements from the input that are different to previous element + * according to the optionally given `eq` function. + * @param eq - (default: `Eq.objectIs`) the equality testing function + * @example + * ```ts + * await AsyncStream.of(1, 1, 2, 3, 2, 2) + * .transform(AsyncTransformer.distinctPrevious()) + * .toArray() + * // => [1, 2, 3, 2] + * ``` + */ + export function distinctPrevious( + eq: Eq = Eq.objectIs + ): AsyncTransformer { + return AsyncReducer.create( + () => [] as T[], + (current, elem) => { + current.push(elem); + + if (current.length > 2) { + current.shift(); + } + + return current; + }, + (state) => { + if (state.length > 0) { + if (state.length === 1) { + return AsyncStream.of(state[0]); + } + if (!eq(state[0], state[1])) { + return AsyncStream.of(state[1]); + } + } + + return AsyncStream.empty(); + } + ); + } +} diff --git a/packages/stream/src/async/index.ts b/packages/stream/src/async/index.ts index 42c109d1a..ab3746ae9 100644 --- a/packages/stream/src/async/index.ts +++ b/packages/stream/src/async/index.ts @@ -12,3 +12,5 @@ export * from './async-fast-iterable'; export * from './async-streamable'; export * from './interface'; + +export * from './async-transformer'; diff --git a/packages/stream/src/async/interface.ts b/packages/stream/src/async/interface.ts index e246cdcb5..0b50c88a7 100644 --- a/packages/stream/src/async/interface.ts +++ b/packages/stream/src/async/interface.ts @@ -12,6 +12,7 @@ import type { AsyncFastIterable, AsyncStreamable, AsyncStreamSource, + AsyncTransformer, } from '@rimbu/stream/async'; import type { AsyncStreamConstructors } from '@rimbu/stream/async-custom'; import { AsyncStreamConstructorsImpl } from '@rimbu/stream/async-custom'; @@ -227,18 +228,15 @@ export interface AsyncStream /** * Returns an AsyncStream consisting of the concatenation of AsyncStreamSource elements resulting from applying the given `reducer` to each element. * @typeparam R - the resulting element type - * @param reducer - an async reducer taking elements ot type T as input, and returing an `AsyncStreamSource` of element type R - * + * @param transformer - an async reducer taking elements ot type T as input, and returing an `AsyncStreamSource` of element type R * @note O(1) * @example * ```ts - * await AsyncStream.of(1, 2, 3, 4, 5, 6).flatReduceStream(Reducer.windowReducer(2)).toArray() + * await AsyncStream.of(1, 2, 3, 4, 5, 6).transform(AsyncTransformer.window(3)).toArray() * // => [[1, 2, 3], [4, 5, 6]] * ``` */ - flatReduceStream( - reducer: AsyncReducer> - ): AsyncStream; + transform(transformer: AsyncTransformer): AsyncStream; /** * Returns an AsyncStream containing only those elements from this stream for which the given `pred` function returns true. * @param pred - a potentially asynchronous function taking an element and its index, and returning true if the element should be included in the resulting stream. @@ -330,8 +328,22 @@ export interface AsyncStream * ``` * @note O(1) */ - first(): MaybePromise; + first(): Promise; first(otherwise: AsyncOptLazy): Promise; + /** + * Returns the first element of the Stream if it only has one element, or a fallback value if the Stream does not have exactly one value. + * @typeparam O - the optional value to return if the stream does not have exactly one value. + * @param otherwise - (default: undefined) an `OptLazy` value to return if the Stream does not have exactly one value. + * @example + * ```ts + * await AsyncStream.empty().single() // => undefined + * await AsyncStream.of(1, 2, 3).single() // => undefined + * await AsyncStream.of(1).single() // => 1 + * await AsyncStream.of(1, 2, 3).single(0) // => 0 + * ``` + */ + single(): Promise; + single(otherwise: AsyncOptLazy): Promise; /** * Returns the last element of the AsyncStream, or a fallback value (default undefined) if the stream is empty. * @typeparam O - the optional value type to return if the stream is empty @@ -524,6 +536,23 @@ export interface AsyncStream * @note O(N) */ contains(value: T, amount?: number, eq?: Eq): Promise; + /** + * Returns true if this stream contains the same sequence of elements as the given `source`, + * false otherwise. + * @param source - a non-empty async stream source containing the element sequence to find + * @param eq - (default: `Eq.objectIs`) the function to use to test element equality + * @example + * ```ts + * await AsyncStream.of(1, 2, 3, 4, 5).containsSlice([2, 3, 4]) + * // => true + * await AsyncStream.of(1, 2, 3, 4, 5).containsSlice([4, 3, 2]) + * // => false + * ``` + */ + containsSlice( + source: AsyncStreamSource.NonEmpty, + eq?: Eq + ): Promise; /** * Returns an AsyncStream that contains the elements of this stream up to the first element that does not satisfy given `pred` function. * @param pred - a potentially asynchronous predicate function taking an element and its index @@ -752,6 +781,40 @@ export interface AsyncStream * @note O(1) */ splitOn(sepElem: T, eq?: Eq): AsyncStream; + /** + * Returns an AsyncStream containing non-repetitive elements of the source stream, where repetitive elements + * are compared using the optionally given `eq` equality function. + * @param eq - (default: `Eq.objectIs`) the `Eq` instance to use to test equality of elements + * @example + * ```ts + * await AsyncStream.of(1, 1, 2, 2, 3, 1).distinctPrevious().toArray() + * // => [1, 2, 3, 1] + * ``` + */ + distinctPrevious(eq?: Eq): AsyncStream; + /** + * Returns an AsyncStream containing `windows` of `windowSize` consecutive elements of the source stream, with each + * window starting `skipAmount` elements after the previous one. + * @typeparam R - the collector reducer result type + * @param windowSize - the size in elements of the windows + * @param skipAmount - (default: `windowSize`) the amount of elements to skip to start the next window + * @param collector - (default: `AsyncArray.toArray()`) the async reducer to use to collect the window values + * @example + * ```ts + * await Stream.of(1, 2, 3, 4, 5, 6, 7).window(3).toArray() + * // => [[1, 2, 3], [4, 5, 6]] + * await Stream.of(1, 2, 3, 4, 5).window(3, 1).toArray() + * // => [[1, 2, 3], [2, 3, 4], [3, 4, 5]] + * await Stream.of(1, 2, 3, 4).window(2, 2, AsyncReducer.toJSSet()).toArray() + * // => [Set(1, 2), Set(3, 4)] + * ``` + */ + window(windowSize: number, skipAmount?: number): AsyncStream; + window( + windowSize: number, + skipAmount?: number, + collector?: AsyncReducer + ): AsyncStream; /** * Returns the value resulting from applying the given the given `next` function to a current state (initially the given `init` value), * and the next stream value, and returning the new state. When all elements are processed, the resulting state is returned. @@ -1045,21 +1108,18 @@ export namespace AsyncStream { /** * Returns an AsyncStream consisting of the concatenation of AsyncStreamSource elements resulting from applying the given `reducer` to each element. * @typeparam R - the resulting element type - * @param reducer - an async reducer taking elements ot type T as input, and returing an `AsyncStreamSource` of element type R - * + * @param transformer - an async reducer taking elements ot type T as input, and returing an `AsyncStreamSource` of element type R. * @note O(1) * @example * ```ts - * await AsyncStream.of(1, 2, 3, 4, 5, 6).flatReduceStream(Reducer.windowReducer(2)).toArray() + * await AsyncStream.of(1, 2, 3, 4, 5, 6).transform(AsyncTransformer.window(3)).toArray() * // => [[1, 2, 3], [4, 5, 6]] * ``` */ - flatReduceStream( - reducer: AsyncReducer> + transform( + transformer: AsyncTransformer.NonEmpty ): AsyncStream.NonEmpty; - flatReduceStream( - reducer: AsyncReducer> - ): AsyncStream; + transform(transformer: AsyncTransformer): AsyncStream; /** * Returns the first element of the AsyncStream. * @example diff --git a/packages/stream/src/custom/fast-iterator-custom.ts b/packages/stream/src/custom/fast-iterator-custom.ts index aed4748bb..4b233af78 100644 --- a/packages/stream/src/custom/fast-iterator-custom.ts +++ b/packages/stream/src/custom/fast-iterator-custom.ts @@ -1008,3 +1008,111 @@ export class MapPureIterator< return this.mapFun(next, ...this.args); } } + +export class DistinctPreviousIterator extends FastIteratorBase { + constructor(readonly source: FastIterator, readonly eq: Eq) { + super(); + } + + readonly previous = [] as T[]; + + fastNext(otherwise?: OptLazy): T | O { + const done = Symbol('Done'); + + let next: T | typeof done; + const source = this.source; + const previous = this.previous; + + while (done !== (next = source.fastNext(done))) { + previous.push(next); + + if (previous.length === 1) { + return next; + } + + const prev = previous.shift()!; + + if (!this.eq(prev, next)) { + return next; + } + } + + return OptLazy(otherwise!); + } +} + +export class WindowIterator extends FastIteratorBase { + constructor( + readonly source: FastIterator, + readonly windowSize: number, + readonly skipAmount: number, + readonly collector: Reducer + ) { + super(); + } + + state = new Set<{ + result: unknown; + size: number; + halted: boolean; + halt: () => void; + }>(); + index = 0; + + fastNext(otherwise?: OptLazy): R | O { + const source = this.source; + const collector = this.collector; + const windowSize = this.windowSize; + const skipAmount = this.skipAmount; + const done = Symbol('Done'); + const state = this.state; + + let next: T | typeof done; + let result: R | typeof done = done; + + while (done !== (next = source.fastNext(done))) { + for (const current of state) { + current.result = collector.next( + current.result, + next, + current.size, + current.halt + ); + current.size++; + + if (current.size >= windowSize || current.halted) { + result = collector.stateToResult(current.result); + state.delete(current); + } + } + + if (this.index % skipAmount === 0) { + const newState = { + result: OptLazy(collector.init), + size: 1, + halted: false, + halt(): void { + this.halted = true; + }, + }; + + newState.result = collector.next( + OptLazy(collector.init), + next, + 0, + newState.halt + ); + + state.add(newState); + } + + this.index++; + + if (done !== result) { + return result; + } + } + + return OptLazy(otherwise!); + } +} diff --git a/packages/stream/src/custom/stream-custom.ts b/packages/stream/src/custom/stream-custom.ts index 7c3a71fa8..b015a38f1 100644 --- a/packages/stream/src/custom/stream-custom.ts +++ b/packages/stream/src/custom/stream-custom.ts @@ -11,7 +11,12 @@ import { ToJSON, TraverseState, } from '@rimbu/common'; -import type { FastIterator, Stream, StreamSource } from '@rimbu/stream'; +import type { + FastIterator, + Stream, + StreamSource, + Transformer, +} from '@rimbu/stream'; import type { StreamConstructors } from '@rimbu/stream/custom'; import { isFastIterator, @@ -50,6 +55,8 @@ import { MapIterator, MapPureIterator, PrependIterator, + DistinctPreviousIterator, + WindowIterator, } from '@rimbu/stream/custom'; import { RimbuError, Token } from '@rimbu/base'; @@ -151,8 +158,8 @@ export abstract class StreamBase implements Stream { ); } - flatReduceStream(reducer: Reducer>): Stream { - return StreamConstructorsImpl.flatten(this.reduceStream(reducer)); + transform(transformer: Transformer): Stream { + return StreamConstructorsImpl.flatten(this.reduceStream(transformer)); } filter( @@ -203,6 +210,21 @@ export abstract class StreamBase implements Stream { return lastValue; } + single(otherwise?: OptLazy): T | O { + const iterator = this[Symbol.iterator](); + const done = Symbol('Done'); + + const value = iterator.fastNext(done); + + if (done !== value) { + if (done === iterator.fastNext(done)) { + return value; + } + } + + return OptLazy(otherwise!); + } + count(): number { let result = 0; @@ -358,6 +380,26 @@ export abstract class StreamBase implements Stream { return undefined !== this.indexOf(searchValue, amount, eq); } + containsSlice(source: StreamSource.NonEmpty, eq = Eq.objectIs): boolean { + const iterator = this[Symbol.iterator](); + const sourceStream = fromStreamSource(source); + let sourceIterator = sourceStream[Symbol.iterator](); + + const done = Symbol('Done'); + + while (true) { + const sourceValue = sourceIterator.fastNext(done); + if (done === sourceValue) return true; + + const value = iterator.fastNext(done); + if (done === value) return false; + + if (!eq(sourceValue, value)) { + sourceIterator = sourceStream[Symbol.iterator](); + } + } + } + takeWhile(pred: (value: T, index: number) => boolean): Stream { return new TakeWhileStream(this, pred); } @@ -479,6 +521,18 @@ export abstract class StreamBase implements Stream { return new SplitOnStream(this, sepElem, eq); } + distinctPrevious(eq: Eq = Eq.objectIs): Stream { + return new DistinctPreviousStream(this, eq); + } + + window( + windowSize: number, + skipAmount = windowSize, + collector: Reducer = Reducer.toArray() as any + ): Stream { + return new WindowStream(this, windowSize, skipAmount, collector); + } + fold( init: OptLazy, next: (current: R, value: T, index: number, halt: () => void) => R @@ -1138,14 +1192,29 @@ class EmptyStream extends StreamBase implements Stream { return emptyFastIterator; } + stream(): this { + return this; + } assumeNonEmpty(): never { RimbuError.throwEmptyCollectionAssumedNonEmptyError(); } + equals(other: StreamSource): boolean { + const done = Symbol('Done'); + return done === fromStreamSource(other)[Symbol.iterator]().fastNext(done); + } + prepend(value: OptLazy): Stream.NonEmpty { + return StreamConstructorsImpl.of(OptLazy(value)); + } + append(value: OptLazy): Stream.NonEmpty { + return StreamConstructorsImpl.of(OptLazy(value)); + } forEach(): void { // } - + forEachPure(): void { + // + } indexed(): Stream<[number, T]> { return this as any; } @@ -1161,9 +1230,9 @@ class EmptyStream extends StreamBase implements Stream { flatZip(): Stream<[T, T2]> { return this as any; } - flatReduceStream(reducer: Reducer>): Stream { + transform(transformer: Transformer): Stream { return StreamConstructorsImpl.from( - reducer.stateToResult(OptLazy(reducer.init)) + transformer.stateToResult(OptLazy(transformer.init)) ); } filter(): Stream { @@ -1187,6 +1256,9 @@ class EmptyStream extends StreamBase implements Stream { last(otherwise?: OptLazy): O { return OptLazy(otherwise) as O; } + single(otherwise?: OptLazy): O { + return OptLazy(otherwise) as O; + } count(): 0 { return 0; } @@ -1227,6 +1299,9 @@ class EmptyStream extends StreamBase implements Stream { contains(): false { return false; } + containsSlice(): false { + return false; + } takeWhile(): Stream { return this; } @@ -1275,6 +1350,18 @@ class EmptyStream extends StreamBase implements Stream { } = {}): Stream.NonEmpty { return fromStreamSource(start).concat(end) as any; } + splitOn(): Stream { + return this as any; + } + splitWhere(): Stream { + return this as any; + } + distinctPrevious(): Stream { + return this; + } + window(): Stream { + return this as any; + } fold(init: OptLazy): R { return OptLazy(init); } @@ -1299,6 +1386,12 @@ class EmptyStream extends StreamBase implements Stream { toString(): string { return `Stream()`; } + toJSON(): ToJSON { + return { + dataType: 'Stream', + value: [], + }; + } } export class ArrayStream extends StreamBase { @@ -1657,6 +1750,39 @@ export class RangeStream extends StreamBase { } } +export class DistinctPreviousStream extends StreamBase { + constructor(readonly source: Stream, readonly eq: Eq) { + super(); + } + + [Symbol.iterator](): FastIterator { + return new DistinctPreviousIterator( + this.source[Symbol.iterator](), + this.eq + ); + } +} + +export class WindowStream extends StreamBase { + constructor( + readonly source: Stream, + readonly windowSize: number, + readonly skipAmount: number, + readonly collector: Reducer + ) { + super(); + } + + [Symbol.iterator](): FastIterator { + return new WindowIterator( + this.source[Symbol.iterator](), + this.windowSize, + this.skipAmount, + this.collector + ); + } +} + export const emptyStream: Stream = Object.freeze(new EmptyStream()); export function isStream(obj: any): obj is Stream { diff --git a/packages/stream/src/main/index.ts b/packages/stream/src/main/index.ts index a0bbe673b..f8b7bcd50 100644 --- a/packages/stream/src/main/index.ts +++ b/packages/stream/src/main/index.ts @@ -16,4 +16,6 @@ export * from './stream-source'; export * from './interface'; +export * from './transformer'; + export * from '../async'; diff --git a/packages/stream/src/main/interface.ts b/packages/stream/src/main/interface.ts index 4fefe91c2..b85a8bb7f 100644 --- a/packages/stream/src/main/interface.ts +++ b/packages/stream/src/main/interface.ts @@ -9,7 +9,12 @@ import type { } from '@rimbu/common'; import type { StreamConstructors } from '@rimbu/stream/custom'; import { StreamConstructorsImpl } from '@rimbu/stream/custom'; -import type { FastIterable, Streamable, StreamSource } from '@rimbu/stream'; +import type { + FastIterable, + Streamable, + StreamSource, + Transformer, +} from '@rimbu/stream'; /** * A possibly infinite sequence of elements of type T. @@ -210,16 +215,17 @@ export interface Stream extends FastIterable, Streamable { /** * Returns a Stream consisting of the concatenation of StreamSource elements resulting from applying the given `reducer` to each element. * @typeparam R - the resulting element type - * @param reducer - a reducer taking elements ot type T as input, and returing a `StreamSource` of element type R - * + * @param transformer - a reducer taking elements ot type T as input, and returing a `StreamSource` of element type R * @note O(1) * @example * ```ts - * Stream.of(1, 2, 3, 4, 5, 6).flatReduceStream(Reducer.windowReducer(2)).toArray() + * Stream.of(1, 2, 3, 4, 5, 6) + * .transform(Transformer.window(3)) + * .toArray() * // => [[1, 2, 3], [4, 5, 6]] * ``` */ - flatReduceStream(reducer: Reducer>): Stream; + transform(transformer: Transformer): Stream; /** * Returns a Stream containing only those elements from this Stream for which the given `pred` function returns true. * @param pred - a function taking an element and its index, and returning true if the element should be included in the resulting Stream. @@ -319,7 +325,7 @@ export interface Stream extends FastIterable, Streamable { /** * Returns the last element of the Stream, or a fallback value (default undefined) if the Stream is empty. * @typeparam O - the optional value type to return if the stream is empty - * @param otherwise - (default: undefined) an `OptLazy` value to be returned if the Stream is empty. + * @param otherwise - (default: undefined) an `OptLazy` value to return if the Stream is empty. * @example * ```ts * Stream.of(1, 2, 3).last() // => 3 @@ -330,6 +336,20 @@ export interface Stream extends FastIterable, Streamable { */ last(): T | undefined; last(otherwise: OptLazy): T | O; + /** + * Returns the first element of the Stream if it only has one element, or a fallback value if the Stream does not have exactly one value. + * @typeparam O - the optional value to return if the stream does not have exactly one value. + * @param otherwise - (default: undefined) an `OptLazy` value to return if the Stream does not have exactly one value. + * @example + * ```ts + * Stream.empty().single() // => undefined + * Stream.of(1, 2, 3).single() // => undefined + * Stream.of(1).single() // => 1 + * Stream.of(1, 2, 3).single(0) // => 0 + * ``` + */ + single(): T | undefined; + single(otherwise: OptLazy): T | O; /** * Returns the amount of elements in the Stream. * @example @@ -497,6 +517,20 @@ export interface Stream extends FastIterable, Streamable { * @note O(N) */ contains(value: T, amount?: number, eq?: Eq): boolean; + /** + * Returns true if this stream contains the same sequence of elements as the given `source`, + * false otherwise. + * @param source - a non-empty stream source containing the element sequence to find + * @param eq - (default: `Eq.objectIs`) the function to use to test element equality + * @example + * ```ts + * Stream.of(1, 2, 3, 4, 5).containsSlice([2, 3, 4]) + * // => true + * Stream.of(1, 2, 3, 4, 5).containsSlice([4, 3, 2]) + * // => false + * ``` + */ + containsSlice(source: StreamSource.NonEmpty, eq?: Eq): boolean; /** * Returns a Stream that contains the elements of this Stream up to the first element that does not satisfy given `pred` function. * @param pred - a predicate function taking an element and its index @@ -708,6 +742,40 @@ export interface Stream extends FastIterable, Streamable { * @note O(1) */ splitOn(sepElem: T, eq?: Eq): Stream; + /** + * Returns a Stream containing non-repetitive elements of the source stream, where repetitive elements + * are compared using the optionally given `eq` equality function. + * @param eq - the `Eq` instance to use to test equality of elements + * @example + * ```ts + * Stream.of(1, 1, 2, 2, 3, 1).distinctPrevious().toArray() + * // => [1, 2, 3, 1] + * ``` + */ + distinctPrevious(eq?: Eq): Stream; + /** + * Returns a Stream containing `windows` of `windowSize` consecutive elements of the source stream, with each + * window starting `skipAmount` elements after the previous one. + * @typeparam R - the collector reducer result type + * @param windowSize - the size in elements of the windows + * @param skipAmount - (default: windowsize) the amount of elements to skip to start the next window + * @param collector - (optional, default: toArray reducer) the reducer to use to collect the window values + * @example + * ```ts + * console.log(Stream.of(1, 2, 3, 4, 5, 6, 7).window(3).toArray()) + * // => [[1, 2, 3], [4, 5, 6]] + * console.log(Stream.of(1, 2, 3, 4, 5).window(3, 1).toArray()) + * // => [[1, 2, 3], [2, 3, 4], [3, 4, 5]] + * console.log(Stream.of(1, 2, 3, 4).window(2, 2, Reducer.toJSSet()).toArray()) + * // => [Set(1, 2), Set(3, 4)] + * ``` + */ + window(windowSize: number, skipAmount?: number): Stream; + window( + windowSize: number, + skipAmount?: number, + collector?: Reducer + ): Stream; /** * Returns the value resulting from applying the given the given `next` function to a current state (initially the given `init` value), * and the next Stream value, and returning the new state. When all elements are processed, the resulting state is returned. @@ -987,19 +1055,18 @@ export namespace Stream { /** * Returns a Stream consisting of the concatenation of StreamSource elements resulting from applying the given `reducer` to each element. * @typeparam R - the resulting element type - * @param reducer - a reducer taking elements ot type T as input, and returing a `StreamSource` of element type R - * + * @param transformer - a reducer taking elements ot type T as input, and returing a `StreamSource` of element type R * @note O(1) * @example * ```ts - * Stream.of(1, 2, 3, 4, 5, 6).flatReduceStream(Reducer.createWindowReducer(2)).toArray() + * Stream.of(1, 2, 3, 4, 5, 6) + * .transform(Transformer.window(3)) + * .toArray() * // => [[1, 2, 3], [4, 5, 6]] * ``` */ - flatReduceStream( - reducer: Reducer> - ): Stream.NonEmpty; - flatReduceStream(reducer: Reducer>): Stream; + transform(transformer: Transformer.NonEmpty): Stream.NonEmpty; + transform(transformer: Transformer): Stream; /** * Returns the first element of the Stream. * @example diff --git a/packages/stream/src/main/transformer.ts b/packages/stream/src/main/transformer.ts new file mode 100644 index 000000000..2d990b0cc --- /dev/null +++ b/packages/stream/src/main/transformer.ts @@ -0,0 +1,141 @@ +import { Eq, OptLazy, Reducer } from '@rimbu/common'; +import { Stream, StreamSource } from '@rimbu/stream'; + +/** + * A Reducer that produces instances of `StreamSource`. + * @typeparam T - the input element type + * @typeparam R - the result stream element type + */ +export type Transformer = Reducer>; + +export namespace Transformer { + /** + * A Reducer that produces instances of `StreamSource.NonEmpty`. + * @typeparam T - the input element type + * @typeparam R - the result stream element type + */ + export type NonEmpty = Reducer>; + + /** + * Returns a transformer that produces windows/collections of `windowSize` size, each + * window starting `skipAmount` of elements after the previous, and optionally collected + * by a custom reducer. + * @typeparam T - the input element type + * @typeparam R - the window type + * @param windowSize - the amount of elements for each window + * @param skipAmount - (default: `windowSize`) the amount of elements between the start of each window + * @param collector - (default: Reducer.toArray()) the reducer to use to convert elements to windows + * @example + * ```ts + * Stream.of(1, 2, 3, 4, 5, 6) + * .transform(Transformer.window(3)) + * .toArray() + * // => [[1, 2, 3], [4, 5, 6]] + * ``` + */ + export const window: { + (windowSize: number, skipAmount?: number): Transformer; + ( + windowSize: number, + skipAmount?: number, + collector?: Reducer + ): Transformer; + } = ( + windowSize: number, + skipAmount = windowSize, + collector = Reducer.toArray() + ) => { + return Reducer.create< + T, + Stream, + Set<{ result: unknown; size: number; halted: boolean; halt: () => void }> + >( + () => new Set(), + (state, elem, index) => { + for (const current of state) { + if (current.size >= windowSize || current.halted) { + state.delete(current); + } + + current.result = collector.next( + current.result, + elem, + current.size, + current.halt + ); + current.size++; + } + + if (index % skipAmount === 0) { + const newState = { + result: OptLazy(collector.init), + size: 1, + halted: false, + halt(): void { + this.halted = true; + }, + }; + + newState.result = collector.next( + OptLazy(collector.init), + elem, + 0, + newState.halt + ); + + state.add(newState); + } + + return state; + }, + (current) => { + return Stream.from(current) + .collect((v, _, skip) => + v.size === windowSize + ? Stream.of(collector.stateToResult(v.result) as any) + : skip + ) + .first(Stream.empty()); + } + ); + }; + + /** + * Returns a transformer that returns only those elements from the input that are different to previous element + * according to the optionally given `eq` function. + * @param eq - (default: `Eq.objectIs`) the equality testing function + * @example + * ```ts + * Stream.of(1, 1, 2, 3, 2, 2) + * .transform(Transformer.distinctPrevious()) + * .toArray() + * // => [1, 2, 3, 2] + * ``` + */ + export function distinctPrevious(eq: Eq = Eq.objectIs): Transformer { + return Reducer.create( + () => [] as T[], + (current, elem) => { + current.push(elem); + + if (current.length > 2) { + current.shift(); + } + + return current; + }, + (state) => { + if (state.length > 0) { + if (state.length === 1) { + return Stream.of(state[0]); + } + if (!eq(state[0], state[1])) { + return Stream.of(state[1]); + } + } + + return Stream.empty(); + } + ); + } +} diff --git a/packages/stream/test/async-stream.test.ts b/packages/stream/test/async-stream.test.ts index 3c6f9cf29..8cd18cc61 100644 --- a/packages/stream/test/async-stream.test.ts +++ b/packages/stream/test/async-stream.test.ts @@ -717,6 +717,28 @@ describe('AsyncStream methods', () => { await createResourceStream([1, 2, 3]).last(); expect(close).toBeCalledTimes(1); }); + it('single', async () => { + expect(await AsyncStream.empty().single()).toBeUndefined(); + expect(await AsyncStream.empty().single(1)).toBe(1); + expect(await AsyncStream.of(1).single()).toBe(1); + expect(await AsyncStream.of(1).single('a')).toBe(1); + expect(await AsyncStream.of(1, 2, 3).single()).toBeUndefined(); + expect(await AsyncStream.of(1, 2, 3).single('a')).toBe('a'); + + for (const source of sources) { + const value = + (await source.count()) === 1 ? await source.first() : undefined; + expect(await source.single()).toBe(value); + expect(await source.single('a')).toBe(value ?? 'a'); + } + }); + it('last close', async () => { + await createResourceStream([1, 2, 3]).single(); + expect(close).toBeCalledTimes(1); + close.mockReset(); + await createResourceStream([1]).single(); + expect(close).toBeCalledTimes(1); + }); it('count', async () => { expect(await AsyncStream.empty().count()).toBe(0); expect(await AsyncStream.of(1, 2, 3).count()).toBe(3); @@ -983,6 +1005,27 @@ describe('AsyncStream methods', () => { await createResourceStream([1, 2, 3]).contains(100); expect(close).toBeCalledTimes(1); }); + it('containsSlice', async () => { + expect(await AsyncStream.empty().containsSlice([1, 2, 3])).toBe(false); + expect(await AsyncStream.of(1, 2).containsSlice([1, 2, 3])).toBe(false); + expect(await AsyncStream.of(1, 2, 3).containsSlice([1, 2, 3])).toBe(true); + expect(await AsyncStream.of(9, 8, 1, 2, 3).containsSlice([1, 2, 3])).toBe( + true + ); + expect(await AsyncStream.of(1, 2, 3, 9, 8).containsSlice([1, 2, 3])).toBe( + true + ); + expect( + await AsyncStream.of(9, 8, 1, 2, 3, 9, 8).containsSlice([1, 2, 3]) + ).toBe(true); + }); + it('containsSlice close', async () => { + await createResourceStream([1, 2, 3]).containsSlice([2]); + expect(close).toBeCalledTimes(1); + close.mockReset(); + await createResourceStream([1, 2, 3]).containsSlice([100]); + expect(close).toBeCalledTimes(1); + }); it('takeWhile', async () => { expect(AsyncStream.empty().takeWhile((v) => true)).toBe( AsyncStream.empty() @@ -1459,6 +1502,97 @@ describe('AsyncStream methods', () => { await testResForEach(createResourceStream([1, 2, 3]).splitOn(2)); await testResForEach(createResourceStream([1, 2, 3]).splitOn(10)); }); + it('distinctPrevious', async () => { + expect( + await AsyncStream.empty().distinctPrevious().toArray() + ).toEqual([]); + expect(await AsyncStream.of(1, 2, 3).distinctPrevious().toArray()).toEqual([ + 1, 2, 3, + ]); + expect( + await AsyncStream.of(1, 2, 2, 3).distinctPrevious().toArray() + ).toEqual([1, 2, 3]); + expect( + await AsyncStream.of(1, 2, 2, 3, 1, 1, 3).distinctPrevious().toArray() + ).toEqual([1, 2, 3, 1, 3]); + }); + it('distinctPrevious close', async () => { + await testResForEach(createResourceStream([1, 2, 3]).distinctPrevious()); + await testResForEach( + createResourceStream([1, 1, 1, 2, 2, 3, 1, 1, 3]).distinctPrevious() + ); + }); + it('window', async () => { + expect(await AsyncStream.empty().window(3).toArray()).toEqual([]); + expect(await AsyncStream.of(1, 2).window(3).toArray()).toEqual([]); + expect(await AsyncStream.of(1, 2, 3).window(3).toArray()).toEqual([ + [1, 2, 3], + ]); + expect(await AsyncStream.of(1, 2, 3, 4, 5).window(3).toArray()).toEqual([ + [1, 2, 3], + ]); + expect(await AsyncStream.of(1, 2, 3, 4, 5, 6).window(3).toArray()).toEqual([ + [1, 2, 3], + [4, 5, 6], + ]); + expect( + await AsyncStream.of(1, 2, 3, 4, 5, 6).window(3, 1).toArray() + ).toEqual([ + [1, 2, 3], + [2, 3, 4], + [3, 4, 5], + [4, 5, 6], + ]); + expect( + await AsyncStream.of(1, 2, 3, 4, 5, 6).window(2, 3).toArray() + ).toEqual([ + [1, 2], + [4, 5], + ]); + }); + it('window collector', async () => { + expect( + await AsyncStream.empty() + .window(3, undefined, AsyncReducer.toJSSet()) + .toArray() + ).toEqual([]); + expect( + await AsyncStream.of(1, 2) + .window(3, undefined, AsyncReducer.toJSSet()) + .toArray() + ).toEqual([]); + expect( + await AsyncStream.of(1, 2, 3) + .window(3, undefined, AsyncReducer.toJSSet()) + .toArray() + ).toEqual([new Set([1, 2, 3])]); + expect( + await AsyncStream.of(1, 2, 3, 4, 5) + .window(3, undefined, AsyncReducer.toJSSet()) + .toArray() + ).toEqual([new Set([1, 2, 3])]); + expect( + await AsyncStream.of(1, 2, 3, 4, 5, 6) + .window(3, undefined, AsyncReducer.toJSSet()) + .toArray() + ).toEqual([new Set([1, 2, 3]), new Set([4, 5, 6])]); + expect( + await AsyncStream.of(1, 2, 3, 4, 5, 6) + .window(3, 1, AsyncReducer.toJSSet()) + .toArray() + ).toEqual([ + new Set([1, 2, 3]), + new Set([2, 3, 4]), + new Set([3, 4, 5]), + new Set([4, 5, 6]), + ]); + }); + it('window close', async () => { + await testResForEach(createResourceStream([1, 2, 3]).window(2)); + await testResForEach( + createResourceStream([1, 1, 1, 2, 2, 3, 1, 1, 3]).window(2, 3) + ); + }); it('fold', async () => { async function sum( current: number, diff --git a/packages/stream/test/async-transformer.test.ts b/packages/stream/test/async-transformer.test.ts new file mode 100644 index 000000000..a24874d27 --- /dev/null +++ b/packages/stream/test/async-transformer.test.ts @@ -0,0 +1,70 @@ +import { AsyncReducer } from '@rimbu/common'; +import { AsyncStream, AsyncTransformer } from '@rimbu/stream/async'; + +describe('AsyncCollector', () => { + it('window', async () => { + expect( + await AsyncStream.empty().transform(AsyncTransformer.window(3)).toArray() + ).toEqual([]); + expect( + await AsyncStream.of(1, 2).transform(AsyncTransformer.window(3)).toArray() + ).toEqual([]); + expect( + await AsyncStream.of(1, 2, 3) + .transform(AsyncTransformer.window(3)) + .toArray() + ).toEqual([[1, 2, 3]]); + expect( + await AsyncStream.of(1, 2, 3, 4, 5) + .transform(AsyncTransformer.window(3)) + .toArray() + ).toEqual([[1, 2, 3]]); + expect( + await AsyncStream.of(1, 2, 3, 4, 5, 6) + .transform(AsyncTransformer.window(3)) + .toArray() + ).toEqual([ + [1, 2, 3], + [4, 5, 6], + ]); + }); + + it('sliding window', async () => { + expect( + await AsyncStream.of(1, 2, 3, 4, 5, 6) + .transform(AsyncTransformer.window(3, 1)) + .toArray() + ).toEqual([ + [1, 2, 3], + [2, 3, 4], + [3, 4, 5], + [4, 5, 6], + ]); + }); + + it('collects to different targets', async () => { + expect( + await AsyncStream.of(1, 2, 3, 4, 5, 6) + .transform(AsyncTransformer.window(3, 1, AsyncReducer.toJSSet())) + .toArray() + ).toEqual([ + new Set([1, 2, 3]), + new Set([2, 3, 4]), + new Set([3, 4, 5]), + new Set([4, 5, 6]), + ]); + }); + + it('distinctPrevious', async () => { + expect( + await AsyncStream.empty() + .transform(AsyncTransformer.distinctPrevious()) + .toArray() + ).toEqual([]); + expect( + await AsyncStream.of(1, 2, 2, 3, 1, 1, 3) + .transform(AsyncTransformer.distinctPrevious()) + .toArray() + ).toEqual([1, 2, 3, 1, 3]); + }); +}); diff --git a/packages/stream/test/stream.test.ts b/packages/stream/test/stream.test.ts index ed623692e..40ef96419 100644 --- a/packages/stream/test/stream.test.ts +++ b/packages/stream/test/stream.test.ts @@ -517,6 +517,21 @@ describe('Stream methods', () => { }); }); + it('single', () => { + expect(Stream.empty().single()).toBeUndefined(); + expect(Stream.empty().single(1)).toBe(1); + expect(Stream.of(1).single()).toBe(1); + expect(Stream.of(1).single('a')).toBe(1); + expect(Stream.of(1, 2, 3).single()).toBeUndefined(); + expect(Stream.of(1, 2, 3).single('a')).toBe('a'); + + sources.forEach((source) => { + const value = source.count() === 1 ? source.first() : undefined; + expect(source.single()).toBe(value); + expect(source.single('a')).toBe(value ?? 'a'); + }); + }); + it('count', () => { expect(Stream.empty().count()).toBe(0); expect(Stream.of(1, 2, 3).count()).toBe(3); @@ -671,6 +686,15 @@ describe('Stream methods', () => { }); }); + it('containsSlice', () => { + expect(Stream.empty().containsSlice([1, 2, 3])).toBe(false); + expect(Stream.of(1, 2).containsSlice([1, 2, 3])).toBe(false); + expect(Stream.of(1, 2, 3).containsSlice([1, 2, 3])).toBe(true); + expect(Stream.of(9, 8, 1, 2, 3).containsSlice([1, 2, 3])).toBe(true); + expect(Stream.of(1, 2, 3, 9, 8).containsSlice([1, 2, 3])).toBe(true); + expect(Stream.of(9, 8, 1, 2, 3, 9, 8).containsSlice([1, 2, 3])).toBe(true); + }); + it('takeWhile', () => { expect(Stream.empty().takeWhile((v) => true)).toBe(Stream.empty()); expect(Stream.empty().takeWhile((v) => false)).toBe(Stream.empty()); @@ -1124,4 +1148,64 @@ describe('Stream methods', () => { expect(u2l.toArray()).toEqual([1, 2]); expect(u2r.toArray()).toEqual(['a', 'b']); }); + + it('distinctPrevious', () => { + expect(Stream.empty().distinctPrevious().toArray()).toEqual([]); + expect(Stream.of(1, 2, 3).distinctPrevious().toArray()).toEqual([1, 2, 3]); + expect(Stream.of(1, 2, 2, 3).distinctPrevious().toArray()).toEqual([ + 1, 2, 3, + ]); + expect(Stream.of(1, 2, 2, 3, 1, 1, 3).distinctPrevious().toArray()).toEqual( + [1, 2, 3, 1, 3] + ); + }); + + it('window', () => { + expect(Stream.empty().window(3).toArray()).toEqual([]); + expect(Stream.of(1, 2).window(3).toArray()).toEqual([]); + expect(Stream.of(1, 2, 3).window(3).toArray()).toEqual([[1, 2, 3]]); + expect(Stream.of(1, 2, 3, 4, 5).window(3).toArray()).toEqual([[1, 2, 3]]); + expect(Stream.of(1, 2, 3, 4, 5, 6).window(3).toArray()).toEqual([ + [1, 2, 3], + [4, 5, 6], + ]); + expect(Stream.of(1, 2, 3, 4, 5, 6).window(3, 1).toArray()).toEqual([ + [1, 2, 3], + [2, 3, 4], + [3, 4, 5], + [4, 5, 6], + ]); + expect(Stream.of(1, 2, 3, 4, 5, 6).window(2, 3).toArray()).toEqual([ + [1, 2], + [4, 5], + ]); + }); + + it('window collector', () => { + expect( + Stream.empty().window(3, undefined, Reducer.toJSSet()).toArray() + ).toEqual([]); + expect( + Stream.of(1, 2).window(3, undefined, Reducer.toJSSet()).toArray() + ).toEqual([]); + expect( + Stream.of(1, 2, 3).window(3, undefined, Reducer.toJSSet()).toArray() + ).toEqual([new Set([1, 2, 3])]); + expect( + Stream.of(1, 2, 3, 4, 5).window(3, undefined, Reducer.toJSSet()).toArray() + ).toEqual([new Set([1, 2, 3])]); + expect( + Stream.of(1, 2, 3, 4, 5, 6) + .window(3, undefined, Reducer.toJSSet()) + .toArray() + ).toEqual([new Set([1, 2, 3]), new Set([4, 5, 6])]); + expect( + Stream.of(1, 2, 3, 4, 5, 6).window(3, 1, Reducer.toJSSet()).toArray() + ).toEqual([ + new Set([1, 2, 3]), + new Set([2, 3, 4]), + new Set([3, 4, 5]), + new Set([4, 5, 6]), + ]); + }); }); diff --git a/packages/stream/test/transformer.test.ts b/packages/stream/test/transformer.test.ts new file mode 100644 index 000000000..51afacaeb --- /dev/null +++ b/packages/stream/test/transformer.test.ts @@ -0,0 +1,60 @@ +import { Reducer } from '@rimbu/common'; +import { Stream, Transformer } from '@rimbu/stream'; + +describe('Collector', () => { + it('window', () => { + expect(Stream.empty().transform(Transformer.window(3)).toArray()).toEqual( + [] + ); + expect(Stream.of(1, 2).transform(Transformer.window(3)).toArray()).toEqual( + [] + ); + expect( + Stream.of(1, 2, 3).transform(Transformer.window(3)).toArray() + ).toEqual([[1, 2, 3]]); + expect( + Stream.of(1, 2, 3, 4, 5).transform(Transformer.window(3)).toArray() + ).toEqual([[1, 2, 3]]); + expect( + Stream.of(1, 2, 3, 4, 5, 6).transform(Transformer.window(3)).toArray() + ).toEqual([ + [1, 2, 3], + [4, 5, 6], + ]); + }); + + it('sliding window', () => { + expect( + Stream.of(1, 2, 3, 4, 5, 6).transform(Transformer.window(3, 1)).toArray() + ).toEqual([ + [1, 2, 3], + [2, 3, 4], + [3, 4, 5], + [4, 5, 6], + ]); + }); + + it('collects to different targets', () => { + expect( + Stream.of(1, 2, 3, 4, 5, 6) + .transform(Transformer.window(3, 1, Reducer.toJSSet())) + .toArray() + ).toEqual([ + new Set([1, 2, 3]), + new Set([2, 3, 4]), + new Set([3, 4, 5]), + new Set([4, 5, 6]), + ]); + }); + + it('distinctPrevious', () => { + expect( + Stream.empty().transform(Transformer.distinctPrevious()).toArray() + ).toEqual([]); + expect( + Stream.of(1, 2, 2, 3, 1, 1, 3) + .transform(Transformer.distinctPrevious()) + .toArray() + ).toEqual([1, 2, 3, 1, 3]); + }); +});