Skip to content

Commit

Permalink
feat(stream): add stream transformers and additional methods to Strea…
Browse files Browse the repository at this point in the history
…m and AsyncStream
  • Loading branch information
vitoke committed Jun 10, 2022
1 parent 4d6b741 commit fcda14c
Show file tree
Hide file tree
Showing 24 changed files with 2,326 additions and 160 deletions.
110 changes: 110 additions & 0 deletions deno_dist/stream/async-custom/async-fast-iterator-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1245,3 +1245,113 @@ export class AsyncReduceAllIterator<I, R> extends AsyncFastIteratorBase<R> {
) as any;
}
}

export class AsyncDistinctPreviousIterator<T> extends AsyncFastIteratorBase<T> {
constructor(readonly source: AsyncFastIterator<T>, readonly eq: Eq<T>) {
super();
this.return = (): Promise<void> => closeIters(source);
}

readonly previous = [] as T[];

async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O> {
const done = Symbol('Done');

let next: T | typeof done;
const source = this.source;
const previous = this.previous;

while (done !== (next = await source.fastNext(done))) {
previous.push(next);

if (previous.length === 1) {
return next;
}

const prev = previous.shift()!;

if (!this.eq(prev, next)) {
return next;
}
}

return AsyncOptLazy.toMaybePromise(otherwise!);
}
}

export class AsyncWindowIterator<T, R> extends AsyncFastIteratorBase<R> {
constructor(
readonly source: AsyncFastIterator<T>,
readonly windowSize: number,
readonly skipAmount: number,
readonly collector: AsyncReducer<T, R>
) {
super();
this.return = (): Promise<void> => closeIters(source);
}

state = new Set<{
result: unknown;
size: number;
halted: boolean;
halt: () => void;
}>();
index = 0;

async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<R | O> {
const source = this.source;
const collector = this.collector;
const windowSize = this.windowSize;
const skipAmount = this.skipAmount;
const done = Symbol('Done');
const state = this.state;

let next: T | typeof done;
let result: R | typeof done = done;

while (done !== (next = await source.fastNext(done))) {
for (const current of state) {
current.result = await collector.next(
current.result,
next,
current.size,
current.halt
);
current.size++;

if (current.size >= windowSize || current.halted) {
result = await collector.stateToResult(current.result);
state.delete(current);
}
}

if (this.index % skipAmount === 0) {
const newState = {
result: await AsyncOptLazy.toMaybePromise(collector.init),
size: 1,
halted: false,
halt(): void {
this.halted = true;
},
};

newState.result = await collector.next(
AsyncOptLazy.toMaybePromise(collector.init),
next,
0,
newState.halt
);

state.add(newState);
}

this.index++;

if (done !== result) {
return result;
}
}

return AsyncOptLazy.toMaybePromise(otherwise!);
}
}

0 comments on commit fcda14c

Please sign in to comment.