11/*
2- * Copyright (c) 2017, 2023 , Oracle and/or its affiliates. All rights reserved.
2+ * Copyright (c) 2017, 2024 , Oracle and/or its affiliates. All rights reserved.
33 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44 *
55 * This code is free software; you can redistribute it and/or modify it
2929import java .nio .ByteBuffer ;
3030import java .util .List ;
3131import java .util .Objects ;
32+ import java .util .concurrent .ConcurrentLinkedQueue ;
3233import java .util .concurrent .Flow ;
3334import java .util .concurrent .atomic .AtomicLong ;
3435import java .util .concurrent .atomic .AtomicReference ;
@@ -557,31 +558,33 @@ private final class InternalReadPublisher
557558 implements Flow .Publisher <List <ByteBuffer >> {
558559 private final InternalReadSubscription subscriptionImpl
559560 = new InternalReadSubscription ();
560- AtomicReference <ReadSubscription > pendingSubscription = new AtomicReference <>();
561+ ConcurrentLinkedQueue <ReadSubscription > pendingSubscriptions = new ConcurrentLinkedQueue <>();
561562 private volatile ReadSubscription subscription ;
562563
563564 @ Override
564565 public void subscribe (Flow .Subscriber <? super List <ByteBuffer >> s ) {
565566 Objects .requireNonNull (s );
566567
567568 TubeSubscriber sub = FlowTube .asTubeSubscriber (s );
568- ReadSubscription target = new ReadSubscription (subscriptionImpl , sub );
569- ReadSubscription previous = pendingSubscription .getAndSet (target );
570-
571- if (previous != null && previous != target ) {
569+ ReadSubscription previous ;
570+ while ((previous = pendingSubscriptions .poll ()) != null ) {
572571 if (debug .on ())
573572 debug .log ("read publisher: dropping pending subscriber: "
574573 + previous .subscriber );
575574 previous .errorRef .compareAndSet (null , errorRef .get ());
575+ // make sure no data will be routed to the old subscriber.
576+ previous .stopReading ();
576577 previous .signalOnSubscribe ();
577578 if (subscriptionImpl .completed ) {
578579 previous .signalCompletion ();
579580 } else {
580581 previous .subscriber .dropSubscription ();
581582 }
582583 }
584+ ReadSubscription target = new ReadSubscription (subscriptionImpl , sub );
585+ pendingSubscriptions .offer (target );
583586
584- if (debug .on ()) debug .log ("read publisher got subscriber" );
587+ if (debug .on ()) debug .log ("read publisher got new subscriber: " + s );
585588 subscriptionImpl .signalSubscribe ();
586589 debugState ("leaving read.subscribe: " );
587590 }
@@ -606,6 +609,7 @@ final class ReadSubscription implements Flow.Subscription {
606609 volatile boolean subscribed ;
607610 volatile boolean cancelled ;
608611 volatile boolean completed ;
612+ private volatile boolean stopped ;
609613
610614 public ReadSubscription (InternalReadSubscription impl ,
611615 TubeSubscriber subscriber ) {
@@ -623,11 +627,12 @@ public void cancel() {
623627
624628 @ Override
625629 public void request (long n ) {
626- if (!cancelled ) {
630+ if (!cancelled && !stopped ) {
631+ // should be safe to not synchronize here.
627632 impl .request (n );
628633 } else {
629634 if (debug .on ())
630- debug .log ("subscription cancelled, ignoring request %d" , n );
635+ debug .log ("subscription stopped or cancelled, ignoring request %d" , n );
631636 }
632637 }
633638
@@ -661,6 +666,32 @@ void signalOnSubscribe() {
661666 signalCompletion ();
662667 }
663668 }
669+
670+ /**
671+ * Called when switching subscriber on the {@link InternalReadSubscription}.
672+ * This subscriber is the old subscriber. Demand on the internal
673+ * subscription will be reset and reading will be paused until the
674+ * new subscriber is subscribed.
675+ * This should ensure that no data is routed to this subscriber
676+ * until the new subscriber is subscribed.
677+ */
678+ synchronized void stopReading () {
679+ stopped = true ;
680+ impl .demand .reset ();
681+ }
682+
683+ synchronized boolean tryDecrementDemand () {
684+ if (stopped ) return false ;
685+ return impl .demand .tryDecrement ();
686+ }
687+
688+ synchronized boolean isStopped () {
689+ return stopped ;
690+ }
691+
692+ synchronized void increaseDemand (long n ) {
693+ if (!stopped ) impl .demand .increase (n );
694+ }
664695 }
665696
666697 final class InternalReadSubscription implements Flow .Subscription {
@@ -835,7 +866,7 @@ final void read() {
835866
836867 // If we reach here then we must be in the selector thread.
837868 assert client .isSelectorThread ();
838- if (demand . tryDecrement ()) {
869+ if (current . tryDecrementDemand ()) {
839870 // we have demand.
840871 try {
841872 List <ByteBuffer > bytes = readAvailable (current .bufferSource );
@@ -881,8 +912,10 @@ final void read() {
881912 // event. This ensures that this loop is
882913 // executed again when the socket becomes
883914 // readable again.
884- demand .increase (1 );
885- resumeReadEvent ();
915+ if (!current .isStopped ()) {
916+ current .increaseDemand (1 );
917+ resumeReadEvent ();
918+ }
886919 if (errorRef .get () != null ) continue ;
887920 debugState ("leaving read() loop with no bytes" );
888921 return ;
@@ -922,30 +955,35 @@ final void read() {
922955 }
923956
924957 boolean handlePending () {
925- ReadSubscription pending = pendingSubscription .getAndSet (null );
926- if (pending == null ) return false ;
927- if (debug .on ())
928- debug .log ("handling pending subscription for %s" ,
958+ ReadSubscription pending ;
959+ boolean subscribed = false ;
960+ while ((pending = pendingSubscriptions .poll ()) != null ) {
961+ subscribed = true ;
962+ if (debug .on ())
963+ debug .log ("handling pending subscription for %s" ,
929964 pending .subscriber );
930- ReadSubscription current = subscription ;
931- if (current != null && current != pending && !completed ) {
932- current .subscriber .dropSubscription ();
933- }
934- if (debug .on ()) debug .log ("read demand reset to 0" );
935- subscriptionImpl .demand .reset (); // subscriber will increase demand if it needs to.
936- pending .errorRef .compareAndSet (null , errorRef .get ());
937- if (!readScheduler .isStopped ()) {
938- subscription = pending ;
939- } else {
940- if (debug .on ()) debug .log ("socket tube is already stopped" );
941- }
942- if (debug .on ()) debug .log ("calling onSubscribe" );
943- pending .signalOnSubscribe ();
944- if (completed ) {
965+ ReadSubscription current = subscription ;
966+ if (current != null && current != pending && !completed ) {
967+ debug .log ("dropping pending subscription for current %s" ,
968+ current .subscriber );
969+ current .subscriber .dropSubscription ();
970+ }
971+ if (debug .on ()) debug .log ("read demand reset to 0" );
972+ subscriptionImpl .demand .reset (); // subscriber will increase demand if it needs to.
945973 pending .errorRef .compareAndSet (null , errorRef .get ());
946- pending .signalCompletion ();
974+ if (!readScheduler .isStopped ()) {
975+ subscription = pending ;
976+ } else {
977+ if (debug .on ()) debug .log ("socket tube is already stopped" );
978+ }
979+ if (debug .on ()) debug .log ("calling onSubscribe on " + pending .subscriber );
980+ pending .signalOnSubscribe ();
981+ if (completed ) {
982+ pending .errorRef .compareAndSet (null , errorRef .get ());
983+ pending .signalCompletion ();
984+ }
947985 }
948- return true ;
986+ return subscribed ;
949987 }
950988 }
951989
0 commit comments