@@ -119,7 +119,7 @@ void connect(Flow.Subscriber<? super List<ByteBuffer>> downReader,
119119 // Connect the read sink first. That's the left-hand side
120120 // downstream subscriber from the HttpConnection (or more
121121 // accurately, the SSLSubscriberWrapper that will wrap it
122- // when SSLTube::connectFlows is called.
122+ // when SSLTube::connectFlows is called) .
123123 reader .subscribe (downReader );
124124
125125 // Connect the right hand side tube (the socket tube).
@@ -191,7 +191,7 @@ public boolean isFinished() {
191191 private volatile Flow .Subscription readSubscription ;
192192
193193 // The DelegateWrapper wraps a subscribed {@code Flow.Subscriber} and
194- // tracks the subscriber's state. In particular it makes sure that
194+ // tracks the subscriber's state. In particular, it makes sure that
195195 // onComplete/onError are not called before onSubscribed.
196196 static final class DelegateWrapper implements FlowTube .TubeSubscriber {
197197 private final FlowTube .TubeSubscriber delegate ;
@@ -302,7 +302,7 @@ public String toString() {
302302
303303 // Used to read data from the SSLTube.
304304 final class SSLSubscriberWrapper implements FlowTube .TubeSubscriber {
305- private AtomicReference <DelegateWrapper > pendingDelegate =
305+ private final AtomicReference <DelegateWrapper > pendingDelegate =
306306 new AtomicReference <>();
307307 private volatile DelegateWrapper subscribed ;
308308 private volatile boolean onCompleteReceived ;
@@ -353,15 +353,15 @@ void setDelegate(Flow.Subscriber<? super List<ByteBuffer>> delegate) {
353353 return ;
354354 }
355355 // sslDelegate field should have been initialized by the
356- // the time we reach here, as there can be no subscriber
356+ // time we reach here, as there can be no subscriber
357357 // until SSLTube is fully constructed.
358358 if (handleNow || !sslDelegate .resumeReader ()) {
359359 processPendingSubscriber ();
360360 }
361361 }
362362
363- // Can be called outside of the flow if an error has already been
364- // raise . Otherwise, must be called within the SSLFlowDelegate
363+ // Can be called outside the flow if an error has already been
364+ // raised . Otherwise, must be called within the SSLFlowDelegate
365365 // downstream reader flow.
366366 // If there is a subscription, and if there is a pending delegate,
367367 // calls dropSubscription() on the previous delegate (if any),
@@ -619,43 +619,73 @@ final class SSLSubscriptionWrapper implements Flow.Subscription {
619619 private volatile boolean cancelled ;
620620
621621 void setSubscription (Flow .Subscription sub ) {
622- long demand = writeDemand .get (); // FIXME: isn't it a racy way of passing the demand?
623- delegate = sub ;
624- if (debug .on ())
625- debug .log ("setSubscription: demand=%d, cancelled:%s" , demand , cancelled );
622+ long demand ;
623+ // Avoid race condition and requesting demand twice if
624+ // request() runs concurrently with setSubscription()
625+ boolean cancelled ;
626+ synchronized (this ) {
627+ demand = writeDemand .get ();
628+ delegate = sub ;
629+ cancelled = this .cancelled ;
630+ }
631+ if (debug .on ()) {
632+ debug .log ("setSubscription: demand=%d, cancelled:%s, new subscription %s" ,
633+ demand , cancelled , sub );
634+ }
626635
627636 if (cancelled )
628- delegate .cancel ();
637+ sub .cancel ();
629638 else if (demand > 0 )
630639 sub .request (demand );
631640 }
632641
633642 @ Override
634643 public void request (long n ) {
635- writeDemand .increase (n );
636- if (debug .on ()) debug .log ("request: n=%d" , n );
637- Flow .Subscription sub = delegate ;
644+ // Avoid race condition and requesting demand twice if
645+ // request() runs concurrently with setSubscription()
646+ Flow .Subscription sub ;
647+ long demanded ;
648+ synchronized (this ) {
649+ sub = delegate ;
650+ demanded = writeDemand .get ();
651+ writeDemand .increase (n );
652+ }
653+ if (debug .on ()) {
654+ debug .log ("request: n=%s to %s (%s already demanded)" ,
655+ n , sub , demanded );
656+ }
638657 if (sub != null && n > 0 ) {
658+ if (debug .on ()) debug .log ("requesting %s from %s" , n , sub );
639659 sub .request (n );
640660 }
641661 }
642662
643663 @ Override
644664 public void cancel () {
645- cancelled = true ;
646- if (delegate != null )
647- delegate .cancel ();
665+ Flow .Subscription sub ;
666+ synchronized (this ) {
667+ cancelled = true ;
668+ sub = delegate ;
669+ }
670+ if (debug .on ()) debug .log ("cancel: cancelling subscription: " + sub );
671+ if (sub != null ) sub .cancel ();
648672 }
649673 }
650674
651675 /* Subscriber - writing side */
652676 @ Override
653677 public void onSubscribe (Flow .Subscription subscription ) {
654678 Objects .requireNonNull (subscription );
655- Flow .Subscription x = writeSubscription .delegate ;
656- if (x != null )
657- x .cancel ();
679+ Flow .Subscription old ;
680+ synchronized (this ) {
681+ old = writeSubscription .delegate ;
682+ }
683+ if (old != null && old != subscription ) {
684+ if (debug .on ()) debug .log ("onSubscribe: cancelling old subscription: " + old );
685+ old .cancel ();
686+ }
658687
688+ if (debug .on ()) debug .log ("onSubscribe: new subscription: " + subscription );
659689 writeSubscription .setSubscription (subscription );
660690 }
661691
@@ -664,8 +694,10 @@ public void onNext(List<ByteBuffer> item) {
664694 Objects .requireNonNull (item );
665695 boolean decremented = writeDemand .tryDecrement ();
666696 assert decremented : "Unexpected writeDemand: " ;
667- if (debug .on ())
668- debug .log ("sending %d buffers to SSL flow delegate" , item .size ());
697+ if (debug .on ()) {
698+ debug .log ("sending %s buffers to SSL flow delegate (%s bytes)" ,
699+ item .size (), Utils .remaining (item ));
700+ }
669701 sslDelegate .upstreamWriter ().onNext (item );
670702 }
671703
0 commit comments