Skip to content

Commit 62dab89

Browse files
Refactored the iterator interface to support next() being called multiple times without waiting for the promise to resolve between each one
1 parent cc44e9d commit 62dab89

File tree

2 files changed

+69
-35
lines changed

2 files changed

+69
-35
lines changed

src/iterator/index.ts

Lines changed: 25 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import * as fs from "fs";
22
import { asyncForEach as forEach } from "../async/for-each";
33
import { DirectoryReader } from "../directory-reader";
44
import { Options, Stats } from "../types-public";
5+
import { Pending, pending } from "./pending";
56

67
const iteratorFacade = { fs, forEach };
78

@@ -25,27 +26,25 @@ export function readdirIterator<T>(dir: string, options?: Options): AsyncIterabl
2526
let reader = new DirectoryReader(dir, options, iteratorFacade);
2627
let stream = reader.stream;
2728
let pendingValues: T[] = [];
28-
let pendingNext: Promise<IteratorResult<T>> | undefined;
29-
let resolvePendingNext: ((result: IteratorResult<T>) => void) | undefined;
30-
let rejectPendingNext: ((error: Error) => void) | undefined;
29+
let pendingReads: Array<Pending<IteratorResult<T>>> = [];
3130
let error: Error | undefined;
3231
let readable = false;
3332
let done = false;
3433

3534
stream.on("error", function streamError(err: Error) {
3635
error = err;
3736
stream.pause();
38-
fulfillPendingNextIfPossible();
37+
fulfillPendingReads();
3938
});
4039

4140
stream.on("end", function streamEnd() {
4241
done = true;
43-
fulfillPendingNextIfPossible();
42+
fulfillPendingReads();
4443
});
4544

4645
stream.on("readable", function streamReadable() {
4746
readable = true;
48-
fulfillPendingNextIfPossible();
47+
fulfillPendingReads();
4948
});
5049

5150
return {
@@ -55,47 +54,38 @@ export function readdirIterator<T>(dir: string, options?: Options): AsyncIterabl
5554

5655
// tslint:disable-next-line: promise-function-async
5756
next() {
58-
if (!pendingNext) {
59-
pendingNext = new Promise((resolve, reject) => {
60-
resolvePendingNext = resolve;
61-
rejectPendingNext = reject;
62-
});
63-
}
57+
let pendingRead = pending<IteratorResult<T>>();
58+
pendingReads.push(pendingRead);
6459

6560
// tslint:disable-next-line: no-floating-promises
66-
Promise.resolve().then(fulfillPendingNextIfPossible);
67-
return pendingNext;
61+
Promise.resolve().then(fulfillPendingReads);
62+
63+
return pendingRead.promise;
6864
}
6965
};
7066

71-
function fulfillPendingNextIfPossible() {
72-
let fulfill, result;
73-
74-
if (resolvePendingNext && rejectPendingNext) {
75-
if (error) {
76-
fulfill = rejectPendingNext;
77-
result = error;
67+
function fulfillPendingReads() {
68+
if (error) {
69+
while (pendingReads.length > 0) {
70+
let pendingRead = pendingReads.shift()!;
71+
pendingRead.reject(error);
7872
}
79-
else {
73+
}
74+
else if (pendingReads.length > 0) {
75+
while (pendingReads.length > 0) {
76+
let pendingRead = pendingReads.shift()!;
8077
let value = getNextValue();
8178

8279
if (value) {
83-
fulfill = resolvePendingNext;
84-
result = { value };
80+
pendingRead.resolve({ value });
8581
}
8682
else if (done) {
87-
fulfill = resolvePendingNext;
88-
result = { done, value };
83+
pendingRead.resolve({ done, value });
84+
}
85+
else {
86+
pendingReads.unshift(pendingRead);
87+
break;
8988
}
90-
}
91-
92-
if (fulfill) {
93-
// NOTE: It's important to clear these BEEFORE fulfilling the Promise;
94-
// otherwise a sporadic race condition can occur.
95-
pendingNext = resolvePendingNext = rejectPendingNext = undefined;
96-
97-
// @ts-ignore
98-
fulfill(result);
9989
}
10090
}
10191
}

src/iterator/pending.ts

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/**
2+
* Returns a `Promise` and the functions to resolve or reject it.
3+
* @internal
4+
*/
5+
export function pending<T>(): Pending<T> {
6+
let resolve: Resolve<T>, reject: Reject;
7+
8+
let promise = new Promise<T>((res, rej) => {
9+
resolve = res;
10+
reject = rej;
11+
});
12+
13+
return {
14+
promise,
15+
resolve(result) {
16+
// tslint:disable-next-line: no-floating-promises
17+
Promise.resolve(result).then(resolve);
18+
},
19+
reject(reason: Error) {
20+
Promise.reject(reason).catch(reject);
21+
}
22+
};
23+
}
24+
25+
/**
26+
* A pending `Promise`, and the functions to resolve or reject it.
27+
* @internal
28+
*/
29+
export interface Pending<T> {
30+
promise: Promise<T>;
31+
32+
/**
33+
* Resolves the promise with the given value.
34+
*/
35+
resolve(result: T | PromiseLike<T>): void;
36+
37+
/**
38+
* Rejects the promise with the given reason.
39+
*/
40+
reject(reason: Error): void;
41+
}
42+
43+
type Resolve<T> = (result: T | PromiseLike<T>) => void;
44+
type Reject = (error: Error) => void;

0 commit comments

Comments
 (0)