Skip to content

Commit

Permalink
fix(stream): improve Stream and AsyncStream API and fix and extend ts…
Browse files Browse the repository at this point in the history
…d tests
  • Loading branch information
vitoke committed Jun 10, 2022
1 parent fcda14c commit 050facc
Show file tree
Hide file tree
Showing 10 changed files with 335 additions and 54 deletions.
2 changes: 1 addition & 1 deletion deno_dist/stream/async-custom/async-stream-custom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1731,7 +1731,7 @@ export const AsyncStreamConstructorsImpl: AsyncStreamConstructors =
};
},
zip(...sources): any {
return AsyncStreamConstructorsImpl.zipWith(...sources)(Array);
return AsyncStreamConstructorsImpl.zipWith(...(sources as any))(Array);
},
zipAllWith(...sources): any {
return (fillValue: any, zipFun: any): any => {
Expand Down
16 changes: 8 additions & 8 deletions deno_dist/stream/async-custom/constructors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ export interface AsyncStreamConstructors {
* ```
* @note ends the AsyncStream when any of the given streams ends
*/
zipWith<I extends readonly unknown[]>(
zipWith<I extends readonly [unknown, ...unknown[]]>(
...sources: { [K in keyof I]: AsyncStreamSource.NonEmpty<I[K]> } & unknown[]
): <R>(zipFun: (...values: I) => R) => AsyncStream.NonEmpty<R>;
zipWith<I extends readonly unknown[]>(
zipWith<I extends readonly [unknown, ...unknown[]]>(
...sources: { [K in keyof I]: AsyncStreamSource<I[K]> } & unknown[]
): <R>(zipFun: (...values: I) => R) => AsyncStream<R>;
/**
Expand All @@ -55,10 +55,10 @@ export interface AsyncStreamConstructors {
* ```
* @note ends the AsyncStream when any of the given streams ends
*/
zip<I extends readonly unknown[]>(
zip<I extends readonly [unknown, ...unknown[]]>(
...sources: { [K in keyof I]: AsyncStreamSource.NonEmpty<I[K]> } & unknown[]
): AsyncStream.NonEmpty<I>;
zip<I extends readonly unknown[]>(
zip<I extends readonly [unknown, ...unknown[]]>(
...sources: { [K in keyof I]: AsyncStreamSource<I[K]> } & unknown[]
): AsyncStream<I>;
/**
Expand All @@ -80,13 +80,13 @@ export interface AsyncStreamConstructors {
* // => [10, 13, 5]
* ```
*/
zipAllWith<I extends readonly unknown[]>(
zipAllWith<I extends readonly [unknown, ...unknown[]]>(
...sources: { [K in keyof I]: AsyncStreamSource.NonEmpty<I[K]> } & unknown[]
): <O, R>(
fillValue: AsyncOptLazy<O>,
zipFun: (...values: { [K in keyof I]: I[K] | O }) => MaybePromise<R>
) => AsyncStream.NonEmpty<R>;
zipAllWith<I extends readonly unknown[]>(
zipAllWith<I extends readonly [unknown, ...unknown[]]>(
...sources: { [K in keyof I]: AsyncStreamSource<I[K]> } & unknown[]
): <O, R>(
fillValue: AsyncOptLazy<O>,
Expand All @@ -110,11 +110,11 @@ export interface AsyncStreamConstructors {
* ```
* @note ends the AsyncStream when any of the given streams ends
*/
zipAll<I extends readonly unknown[], O>(
zipAll<I extends readonly [unknown, ...unknown[]], O>(
fillValue: AsyncOptLazy<O>,
...sources: { [K in keyof I]: AsyncStreamSource.NonEmpty<I[K]> } & unknown[]
): AsyncStream.NonEmpty<{ [K in keyof I]: I[K] | O }>;
zipAll<I extends readonly unknown[], O>(
zipAll<I extends readonly [unknown, ...unknown[]], O>(
fillValue: AsyncOptLazy<O>,
...sources: { [K in keyof I]: AsyncStreamSource<I[K]> } & unknown[]
): AsyncStream<{ [K in keyof I]: I[K] | O }>;
Expand Down
12 changes: 12 additions & 0 deletions deno_dist/stream/async/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1237,6 +1237,18 @@ export namespace AsyncStream {
start?: AsyncStreamSource<T>;
end?: AsyncStreamSource<T>;
}): AsyncStream.NonEmpty<T>;
/**
* Returns a non-empty AsyncStream containing non-repetitive elements of the source stream, where repetitive elements
* are compared using the optionally given `eq` equality function.
* @param eq - (default: `Eq.objectIs`) the `Eq` instance to use to test equality of elements
* @example
* ```ts
* await AsyncStream.of(1, 1, 2, 2, 3, 1).distinctPrevious().toArray()
* // => [1, 2, 3, 1]
* ```
*/
distinctPrevious(eq?: Eq<T>): AsyncStream.NonEmpty<T>;

/**
* Returns an AsyncStream containing the values resulting from applying the given the given `next` function to a current state (initially the given `init` value),
* and the next stream value, and returning the new state.
Expand Down
37 changes: 37 additions & 0 deletions deno_dist/stream/main/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1182,6 +1182,43 @@ export namespace Stream {
start?: StreamSource<T>;
end?: StreamSource<T>;
}): Stream.NonEmpty<T>;
/**
* Returns a non-empty Stream containing non-repetitive elements of the source stream, where repetitive elements
* are compared using the optionally given `eq` equality function.
* @param eq - the `Eq` instance to use to test equality of elements
* @example
* ```ts
* Stream.of(1, 1, 2, 2, 3, 1).distinctPrevious().toArray()
* // => [1, 2, 3, 1]
* ```
*/
distinctPrevious(eq?: Eq<T>): Stream.NonEmpty<T>;
/**
* Returns a Stream containing the values resulting from applying the given the given `next` function to a current state (initially the given `init` value),
* and the next Stream value, and returning the new state.
* @typeparam R - the resulting element type
* @param init - the initial result/state value
* @param next - a function taking the parameters below and returning the new result/state value<br/>
* - current: the current result/state value, initially `init`.<br/>
* - value: the next Stream value<br/>
* - index: the index of the given value<br/>
* - halt: a function that, if called, ensures that no new elements are passed
* @example
* ```ts
* console.log(
* Stream.empty<number>()
* .foldStream(5, (current, value) => current + value)
* .toArray()
* )
* // => []
* console.log(
* Stream.of(1, 2, 3)
* .foldStream(5, (current, value) => current + value)
* .toArray()
* )
* // => [6, 8, 11]
* ```
*/
foldStream<R>(
init: OptLazy<R>,
next: (current: R, value: T, index: number) => R
Expand Down
2 changes: 1 addition & 1 deletion packages/stream/src/async-custom/async-stream-custom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1731,7 +1731,7 @@ export const AsyncStreamConstructorsImpl: AsyncStreamConstructors =
};
},
zip(...sources): any {
return AsyncStreamConstructorsImpl.zipWith(...sources)(Array);
return AsyncStreamConstructorsImpl.zipWith(...(sources as any))(Array);
},
zipAllWith(...sources): any {
return (fillValue: any, zipFun: any): any => {
Expand Down
16 changes: 8 additions & 8 deletions packages/stream/src/async-custom/constructors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ export interface AsyncStreamConstructors {
* ```
* @note ends the AsyncStream when any of the given streams ends
*/
zipWith<I extends readonly unknown[]>(
zipWith<I extends readonly [unknown, ...unknown[]]>(
...sources: { [K in keyof I]: AsyncStreamSource.NonEmpty<I[K]> } & unknown[]
): <R>(zipFun: (...values: I) => R) => AsyncStream.NonEmpty<R>;
zipWith<I extends readonly unknown[]>(
zipWith<I extends readonly [unknown, ...unknown[]]>(
...sources: { [K in keyof I]: AsyncStreamSource<I[K]> } & unknown[]
): <R>(zipFun: (...values: I) => R) => AsyncStream<R>;
/**
Expand All @@ -55,10 +55,10 @@ export interface AsyncStreamConstructors {
* ```
* @note ends the AsyncStream when any of the given streams ends
*/
zip<I extends readonly unknown[]>(
zip<I extends readonly [unknown, ...unknown[]]>(
...sources: { [K in keyof I]: AsyncStreamSource.NonEmpty<I[K]> } & unknown[]
): AsyncStream.NonEmpty<I>;
zip<I extends readonly unknown[]>(
zip<I extends readonly [unknown, ...unknown[]]>(
...sources: { [K in keyof I]: AsyncStreamSource<I[K]> } & unknown[]
): AsyncStream<I>;
/**
Expand All @@ -80,13 +80,13 @@ export interface AsyncStreamConstructors {
* // => [10, 13, 5]
* ```
*/
zipAllWith<I extends readonly unknown[]>(
zipAllWith<I extends readonly [unknown, ...unknown[]]>(
...sources: { [K in keyof I]: AsyncStreamSource.NonEmpty<I[K]> } & unknown[]
): <O, R>(
fillValue: AsyncOptLazy<O>,
zipFun: (...values: { [K in keyof I]: I[K] | O }) => MaybePromise<R>
) => AsyncStream.NonEmpty<R>;
zipAllWith<I extends readonly unknown[]>(
zipAllWith<I extends readonly [unknown, ...unknown[]]>(
...sources: { [K in keyof I]: AsyncStreamSource<I[K]> } & unknown[]
): <O, R>(
fillValue: AsyncOptLazy<O>,
Expand All @@ -110,11 +110,11 @@ export interface AsyncStreamConstructors {
* ```
* @note ends the AsyncStream when any of the given streams ends
*/
zipAll<I extends readonly unknown[], O>(
zipAll<I extends readonly [unknown, ...unknown[]], O>(
fillValue: AsyncOptLazy<O>,
...sources: { [K in keyof I]: AsyncStreamSource.NonEmpty<I[K]> } & unknown[]
): AsyncStream.NonEmpty<{ [K in keyof I]: I[K] | O }>;
zipAll<I extends readonly unknown[], O>(
zipAll<I extends readonly [unknown, ...unknown[]], O>(
fillValue: AsyncOptLazy<O>,
...sources: { [K in keyof I]: AsyncStreamSource<I[K]> } & unknown[]
): AsyncStream<{ [K in keyof I]: I[K] | O }>;
Expand Down
12 changes: 12 additions & 0 deletions packages/stream/src/async/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1237,6 +1237,18 @@ export namespace AsyncStream {
start?: AsyncStreamSource<T>;
end?: AsyncStreamSource<T>;
}): AsyncStream.NonEmpty<T>;
/**
* Returns a non-empty AsyncStream containing non-repetitive elements of the source stream, where repetitive elements
* are compared using the optionally given `eq` equality function.
* @param eq - (default: `Eq.objectIs`) the `Eq` instance to use to test equality of elements
* @example
* ```ts
* await AsyncStream.of(1, 1, 2, 2, 3, 1).distinctPrevious().toArray()
* // => [1, 2, 3, 1]
* ```
*/
distinctPrevious(eq?: Eq<T>): AsyncStream.NonEmpty<T>;

/**
* Returns an AsyncStream containing the values resulting from applying the given the given `next` function to a current state (initially the given `init` value),
* and the next stream value, and returning the new state.
Expand Down
37 changes: 37 additions & 0 deletions packages/stream/src/main/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1182,6 +1182,43 @@ export namespace Stream {
start?: StreamSource<T>;
end?: StreamSource<T>;
}): Stream.NonEmpty<T>;
/**
* Returns a non-empty Stream containing non-repetitive elements of the source stream, where repetitive elements
* are compared using the optionally given `eq` equality function.
* @param eq - the `Eq` instance to use to test equality of elements
* @example
* ```ts
* Stream.of(1, 1, 2, 2, 3, 1).distinctPrevious().toArray()
* // => [1, 2, 3, 1]
* ```
*/
distinctPrevious(eq?: Eq<T>): Stream.NonEmpty<T>;
/**
* Returns a Stream containing the values resulting from applying the given the given `next` function to a current state (initially the given `init` value),
* and the next Stream value, and returning the new state.
* @typeparam R - the resulting element type
* @param init - the initial result/state value
* @param next - a function taking the parameters below and returning the new result/state value<br/>
* - current: the current result/state value, initially `init`.<br/>
* - value: the next Stream value<br/>
* - index: the index of the given value<br/>
* - halt: a function that, if called, ensures that no new elements are passed
* @example
* ```ts
* console.log(
* Stream.empty<number>()
* .foldStream(5, (current, value) => current + value)
* .toArray()
* )
* // => []
* console.log(
* Stream.of(1, 2, 3)
* .foldStream(5, (current, value) => current + value)
* .toArray()
* )
* // => [6, 8, 11]
* ```
*/
foldStream<R>(
init: OptLazy<R>,
next: (current: R, value: T, index: number) => R
Expand Down

0 comments on commit 050facc

Please sign in to comment.