@@ -476,10 +476,14 @@ function onStreamReady(streamHandle, id, push_id) {
476
476
const stream = new QuicStream ( {
477
477
...session [ kStreamOptions ] ,
478
478
writable : ! ( id & 0b10 ) ,
479
- } , session , push_id ) ;
480
- stream [ kSetHandle ] ( streamHandle ) ;
481
- session [ kAddStream ] ( id , stream ) ;
482
- process . nextTick ( emit . bind ( session , 'stream' , stream ) ) ;
479
+ } , session , streamHandle , push_id ) ;
480
+ process . nextTick ( ( ) => {
481
+ try {
482
+ session . emit ( 'stream' , stream ) ;
483
+ } catch ( error ) {
484
+ stream . destroy ( error ) ;
485
+ }
486
+ } ) ;
483
487
}
484
488
485
489
// Called by the C++ internals when a stream is closed and
@@ -2188,16 +2192,11 @@ class QuicSession extends EventEmitter {
2188
2192
if ( handle === undefined )
2189
2193
throw new ERR_OPERATION_FAILED ( 'Unable to create QuicStream' ) ;
2190
2194
2191
- const stream = new QuicStream ( {
2195
+ return new QuicStream ( {
2192
2196
highWaterMark,
2193
2197
defaultEncoding,
2194
2198
readable : ! halfOpen
2195
- } , this ) ;
2196
-
2197
- stream [ kSetHandle ] ( handle ) ;
2198
- this [ kAddStream ] ( stream . id , stream ) ;
2199
-
2200
- return stream ;
2199
+ } , this , handle ) ;
2201
2200
}
2202
2201
2203
2202
get duration ( ) {
@@ -2556,7 +2555,7 @@ class QuicStream extends Duplex {
2556
2555
stats : undefined ,
2557
2556
} ;
2558
2557
2559
- constructor ( options , session , push_id ) {
2558
+ constructor ( options , session , handle , push_id ) {
2560
2559
const {
2561
2560
highWaterMark,
2562
2561
defaultEncoding,
@@ -2583,11 +2582,7 @@ class QuicStream extends Duplex {
2583
2582
this . _readableState . readingMore = true ;
2584
2583
this . on ( 'pause' , streamOnPause ) ;
2585
2584
2586
- // The QuicStream writes are corked until kSetHandle
2587
- // is set, ensuring that writes are buffered in JavaScript
2588
- // until we have somewhere to send them.
2589
- // TODO(@jasnell): We need a better mechanism for this.
2590
- this . cork ( ) ;
2585
+ this [ kSetHandle ] ( handle ) ;
2591
2586
}
2592
2587
2593
2588
// Set handle is called once the QuicSession has been able
@@ -2607,8 +2602,7 @@ class QuicStream extends Duplex {
2607
2602
state . dataRateHistogram = new Histogram ( handle . rate ) ;
2608
2603
state . dataSizeHistogram = new Histogram ( handle . size ) ;
2609
2604
state . dataAckHistogram = new Histogram ( handle . ack ) ;
2610
- this . uncork ( ) ;
2611
- this . emit ( 'ready' ) ;
2605
+ state . session [ kAddStream ] ( state . id , this ) ;
2612
2606
} else {
2613
2607
if ( state . dataRateHistogram )
2614
2608
state . dataRateHistogram [ kDestroyHistogram ] ( ) ;
@@ -3008,15 +3002,11 @@ class QuicStream extends Duplex {
3008
3002
'Push is either disabled or currently blocked.' ) ;
3009
3003
}
3010
3004
3011
- const stream = new QuicStream ( {
3005
+ return new QuicStream ( {
3012
3006
readable : false ,
3013
3007
highWaterMark,
3014
3008
defaultEncoding,
3015
- } , this . session ) ;
3016
-
3017
- stream [ kSetHandle ] ( handle ) ;
3018
- this . session [ kAddStream ] ( stream . id , stream ) ;
3019
- return stream ;
3009
+ } , this . session , handle ) ;
3020
3010
}
3021
3011
3022
3012
submitInformationalHeaders ( headers = { } ) {
0 commit comments