1
1
'use strict' ;
2
2
3
3
const {
4
+ ArrayPrototypePush,
5
+ ArrayPrototypeShift,
6
+ Error,
4
7
FunctionPrototypeCall,
5
8
ObjectDefineProperty,
6
9
ObjectSetPrototypeOf,
@@ -12,9 +15,11 @@ const {
12
15
AbortError,
13
16
UVException,
14
17
codes : {
18
+ ERR_FS_WATCH_QUEUE_OVERFLOW ,
15
19
ERR_INVALID_ARG_VALUE ,
16
20
} ,
17
21
} = require ( 'internal/errors' ) ;
22
+
18
23
const {
19
24
kEmptyObject,
20
25
} = require ( 'internal/util' ) ;
@@ -45,6 +50,8 @@ const {
45
50
validateBoolean,
46
51
validateObject,
47
52
validateUint32,
53
+ validateInteger,
54
+ validateOneOf,
48
55
} = require ( 'internal/validators' ) ;
49
56
50
57
const {
@@ -309,11 +316,15 @@ async function* watch(filename, options = kEmptyObject) {
309
316
persistent = true ,
310
317
recursive = false ,
311
318
encoding = 'utf8' ,
319
+ maxQueue = 2048 ,
320
+ overflow = 'ignore' ,
312
321
signal,
313
322
} = options ;
314
323
315
324
validateBoolean ( persistent , 'options.persistent' ) ;
316
325
validateBoolean ( recursive , 'options.recursive' ) ;
326
+ validateInteger ( maxQueue , 'options.maxQueue' ) ;
327
+ validateOneOf ( overflow , 'options.overflow' , [ 'ignore' , 'error' ] ) ;
317
328
validateAbortSignal ( signal , 'options.signal' ) ;
318
329
319
330
if ( encoding && ! isEncoding ( encoding ) ) {
@@ -325,10 +336,11 @@ async function* watch(filename, options = kEmptyObject) {
325
336
throw new AbortError ( undefined , { cause : signal . reason } ) ;
326
337
327
338
const handle = new FSEvent ( ) ;
328
- let { promise, resolve, reject } = PromiseWithResolvers ( ) ;
339
+ let { promise, resolve } = PromiseWithResolvers ( ) ;
340
+ const queue = [ ] ;
329
341
const oncancel = ( ) => {
330
342
handle . close ( ) ;
331
- reject ( new AbortError ( undefined , { cause : signal ?. reason } ) ) ;
343
+ resolve ( ) ;
332
344
} ;
333
345
334
346
try {
@@ -345,11 +357,20 @@ async function* watch(filename, options = kEmptyObject) {
345
357
} ) ;
346
358
error . filename = filename ;
347
359
handle . close ( ) ;
348
- reject ( error ) ;
360
+ ArrayPrototypePush ( queue , error ) ;
361
+ resolve ( ) ;
349
362
return ;
350
363
}
351
-
352
- resolve ( { eventType, filename } ) ;
364
+ if ( queue . length < maxQueue ) {
365
+ ArrayPrototypePush ( queue , { __proto__ : null , eventType, filename } ) ;
366
+ resolve ( ) ;
367
+ } else if ( overflow === 'error' ) {
368
+ queue . length = 0 ;
369
+ ArrayPrototypePush ( queue , new ERR_FS_WATCH_QUEUE_OVERFLOW ( maxQueue ) ) ;
370
+ resolve ( ) ;
371
+ } else {
372
+ process . emitWarning ( 'fs.watch maxQueue exceeded' ) ;
373
+ }
353
374
} ;
354
375
355
376
const err = handle . start ( path , persistent , recursive , encoding ) ;
@@ -367,10 +388,20 @@ async function* watch(filename, options = kEmptyObject) {
367
388
}
368
389
369
390
while ( ! signal ?. aborted ) {
370
- yield await promise ;
371
- ( { promise, resolve, reject } = PromiseWithResolvers ( ) ) ;
391
+ await promise ;
392
+ while ( queue . length ) {
393
+ const item = ArrayPrototypeShift ( queue ) ;
394
+ if ( item instanceof Error ) {
395
+ throw item ;
396
+ } else {
397
+ yield item ;
398
+ }
399
+ }
400
+ ( { promise, resolve } = PromiseWithResolvers ( ) ) ;
401
+ }
402
+ if ( signal ?. aborted ) {
403
+ throw new AbortError ( undefined , { cause : signal ?. reason } ) ;
372
404
}
373
- throw new AbortError ( undefined , { cause : signal ?. reason } ) ;
374
405
} finally {
375
406
handle . close ( ) ;
376
407
signal ?. removeEventListener ( 'abort' , oncancel ) ;
0 commit comments