@@ -443,63 +443,76 @@ function createBlobReaderStream(reader) {
443443 // There really should only be one read at a time so using an
444444 // array here is purely defensive.
445445 this . pendingPulls = [ ] ;
446+ // Register a wakeup callback that the C++ side can invoke
447+ // when new data is available after a STATUS_BLOCK.
448+ reader . setWakeup ( ( ) => {
449+ if ( this . pendingPulls . length > 0 ) {
450+ this . readNext ( c ) ;
451+ }
452+ } ) ;
446453 } ,
447454 pull ( c ) {
448455 const { promise, resolve, reject } = PromiseWithResolvers ( ) ;
449456 this . pendingPulls . push ( { resolve, reject } ) ;
450- const readNext = ( ) => {
451- reader . pull ( ( status , buffer ) => {
452- // If pendingPulls is empty here, the stream had to have
453- // been canceled, and we don't really care about the result.
454- // We can simply exit.
455- if ( this . pendingPulls . length === 0 ) {
456- return ;
457- }
458- if ( status === 0 ) {
459- // EOS
460- c . close ( ) ;
461- // This is to signal the end for byob readers
462- // see https://streams.spec.whatwg.org/#example-rbs-pull
463- c . byobRequest ?. respond ( 0 ) ;
464- const pending = this . pendingPulls . shift ( ) ;
465- pending . resolve ( ) ;
466- return ;
467- } else if ( status < 0 ) {
468- // The read could fail for many different reasons when reading
469- // from a non-memory resident blob part (e.g. file-backed blob).
470- // The error details the system error code.
471- const error = lazyDOMException ( 'The blob could not be read' , 'NotReadableError' ) ;
472- const pending = this . pendingPulls . shift ( ) ;
473- c . error ( error ) ;
474- pending . reject ( error ) ;
457+ this . readNext ( c ) ;
458+ return promise ;
459+ } ,
460+ readNext ( c ) {
461+ reader . pull ( ( status , buffer ) => {
462+ // If pendingPulls is empty here, the stream had to have
463+ // been canceled, and we don't really care about the result.
464+ // We can simply exit.
465+ if ( this . pendingPulls . length === 0 ) {
466+ return ;
467+ }
468+ if ( status === 0 ) {
469+ // EOS
470+ c . close ( ) ;
471+ // This is to signal the end for byob readers
472+ // see https://streams.spec.whatwg.org/#example-rbs-pull
473+ c . byobRequest ?. respond ( 0 ) ;
474+ const pending = this . pendingPulls . shift ( ) ;
475+ pending . resolve ( ) ;
476+ return ;
477+ } else if ( status < 0 ) {
478+ // The read could fail for many different reasons when reading
479+ // from a non-memory resident blob part (e.g. file-backed blob).
480+ // The error details the system error code.
481+ const error =
482+ lazyDOMException ( 'The blob could not be read' ,
483+ 'NotReadableError' ) ;
484+ const pending = this . pendingPulls . shift ( ) ;
485+ c . error ( error ) ;
486+ pending . reject ( error ) ;
487+ return ;
488+ } else if ( status === 2 ) {
489+ // STATUS_BLOCK: No data available yet. The wakeup callback
490+ // registered in start() will re-invoke readNext when data
491+ // arrives.
492+ return ;
493+ }
494+ // ReadableByteStreamController.enqueue errors if we submit a
495+ // 0-length buffer. We need to check for that here.
496+ if ( buffer !== undefined && buffer . byteLength !== 0 ) {
497+ c . enqueue ( new Uint8Array ( buffer ) ) ;
498+ }
499+ // We keep reading until we either reach EOS, some error, or
500+ // we hit the flow rate of the stream (c.desiredSize).
501+ // We use setImmediate here because we have to allow the event
502+ // loop to turn in order to process any pending i/o. Using
503+ // queueMicrotask won't allow the event loop to turn.
504+ setImmediate ( ( ) => {
505+ if ( c . desiredSize < 0 ) {
506+ // A manual backpressure check.
507+ if ( this . pendingPulls . length !== 0 ) {
508+ const pending = this . pendingPulls . shift ( ) ;
509+ pending . resolve ( ) ;
510+ }
475511 return ;
476512 }
477- // ReadableByteStreamController.enqueue errors if we submit a 0-length
478- // buffer. We need to check for that here.
479- if ( buffer !== undefined && buffer . byteLength !== 0 ) {
480- c . enqueue ( new Uint8Array ( buffer ) ) ;
481- }
482- // We keep reading until we either reach EOS, some error, or we
483- // hit the flow rate of the stream (c.desiredSize).
484- // We use set immediate here because we have to allow the event
485- // loop to turn in order to process any pending i/o. Using
486- // queueMicrotask won't allow the event loop to turn.
487- setImmediate ( ( ) => {
488- if ( c . desiredSize < 0 ) {
489- // A manual backpressure check.
490- if ( this . pendingPulls . length !== 0 ) {
491- // A case of waiting pull finished (= not yet canceled)
492- const pending = this . pendingPulls . shift ( ) ;
493- pending . resolve ( ) ;
494- }
495- return ;
496- }
497- readNext ( ) ;
498- } ) ;
513+ this . readNext ( c ) ;
499514 } ) ;
500- } ;
501- readNext ( ) ;
502- return promise ;
515+ } ) ;
503516 } ,
504517 cancel ( reason ) {
505518 // Reject any currently pending pulls here.
0 commit comments