@@ -24,16 +24,28 @@ export function readdirIterator(dir: string, options: Options & { stats: true })
24
24
export function readdirIterator < T > ( dir : string , options ?: Options ) : AsyncIterableIterator < T > {
25
25
let reader = new DirectoryReader ( dir , options , iteratorFacade ) ;
26
26
let stream = reader . stream ;
27
-
27
+ let pendingValues : T [ ] = [ ] ;
28
+ let pendingNext : Promise < IteratorResult < T > > | undefined ;
29
+ let resolvePendingNext : ( ( result : IteratorResult < T > ) => void ) | undefined ;
30
+ let rejectPendingNext : ( ( error : Error ) => void ) | undefined ;
28
31
let error : Error | undefined ;
29
- stream . on ( "error" , ( err : Error ) => {
32
+ let readable = false ;
33
+ let done = false ;
34
+
35
+ stream . on ( "error" , function streamError ( err : Error ) {
30
36
error = err ;
31
37
stream . pause ( ) ;
38
+ fulfillPendingNextIfPossible ( ) ;
32
39
} ) ;
33
40
34
- let done = false ;
35
- stream . on ( "end" , ( ) => {
41
+ stream . on ( "end" , function streamEnd ( ) {
36
42
done = true ;
43
+ fulfillPendingNextIfPossible ( ) ;
44
+ } ) ;
45
+
46
+ stream . on ( "readable" , function streamReadable ( ) {
47
+ readable = true ;
48
+ fulfillPendingNextIfPossible ( ) ;
37
49
} ) ;
38
50
39
51
return {
@@ -43,40 +55,70 @@ export function readdirIterator<T>(dir: string, options?: Options): AsyncIterabl
43
55
44
56
// tslint:disable-next-line: promise-function-async
45
57
next ( ) {
46
- return new Promise ( ( resolve , reject ) => {
47
- shortCircuit ( ) || stream . on ( "readable" , readNextResult ) ;
48
-
49
- function shortCircuit ( ) {
50
- if ( error ) {
51
- reject ( error ) ;
52
- return true ;
53
- }
54
- else if ( done ) {
55
- resolve ( { done, value : undefined } ) ;
56
- return true ;
57
- }
58
+ if ( ! pendingNext ) {
59
+ pendingNext = new Promise ( ( resolve , reject ) => {
60
+ resolvePendingNext = resolve ;
61
+ rejectPendingNext = reject ;
62
+ } ) ;
63
+ }
64
+
65
+ // tslint:disable-next-line: no-floating-promises
66
+ Promise . resolve ( ) . then ( fulfillPendingNextIfPossible ) ;
67
+ return pendingNext ;
68
+ }
69
+ } ;
70
+
71
+ function fulfillPendingNextIfPossible ( ) {
72
+ let fulfill , result ;
73
+
74
+ if ( resolvePendingNext && rejectPendingNext ) {
75
+ if ( error ) {
76
+ fulfill = rejectPendingNext ;
77
+ result = error ;
78
+ }
79
+ else {
80
+ let value = getNextValue ( ) ;
81
+
82
+ if ( value ) {
83
+ fulfill = resolvePendingNext ;
84
+ result = { value } ;
85
+ }
86
+ else if ( done ) {
87
+ fulfill = resolvePendingNext ;
88
+ result = { done, value } ;
58
89
}
90
+ }
59
91
60
- function readNextResult ( ) {
61
- try {
62
- if ( shortCircuit ( ) ) {
63
- return ;
64
- }
92
+ if ( fulfill ) {
93
+ // NOTE: It's important to clear these BEEFORE fulfilling the Promise;
94
+ // otherwise a sporadic race condition can occur.
95
+ pendingNext = resolvePendingNext = rejectPendingNext = undefined ;
65
96
66
- let value = stream . read ( ) as T | null ;
67
- stream . off ( "readable" , readNextResult ) ;
97
+ // @ts -ignore
98
+ fulfill ( result ) ;
99
+ }
100
+ }
101
+ }
68
102
69
- if ( value === null ) {
70
- done = true ;
71
- }
103
+ function getNextValue ( ) : T | undefined {
104
+ let value = pendingValues . shift ( ) ;
105
+ if ( value ) {
106
+ return value ;
107
+ }
108
+ else if ( readable ) {
109
+ readable = false ;
72
110
73
- shortCircuit ( ) || resolve ( { value : value ! } ) ;
74
- }
75
- catch ( err ) {
76
- reject ( err as Error ) ;
77
- }
111
+ while ( true ) {
112
+ value = stream . read ( ) as T | undefined ;
113
+ if ( value ) {
114
+ pendingValues . push ( value ) ;
78
115
}
79
- } ) ;
80
- } ,
81
- } ;
116
+ else {
117
+ break ;
118
+ }
119
+ }
120
+
121
+ return pendingValues . shift ( ) ;
122
+ }
123
+ }
82
124
}
0 commit comments