Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <oleh.dokuka@icloud.com>
  • Loading branch information
OlegDokuka committed Mar 30, 2023
1 parent 1542a2a commit 5050581
Showing 1 changed file with 5 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.IntConsumer;
import org.reactivestreams.Subscription;
Expand Down Expand Up @@ -59,9 +58,7 @@ final class FluxReceive extends Flux<Object> implements Subscription, Disposable

volatile IntConsumer receiverCancel;

volatile int once;
static final AtomicIntegerFieldUpdater<FluxReceive> ONCE =
AtomicIntegerFieldUpdater.newUpdater(FluxReceive.class, "once");
boolean subscribedOnce;

// Please note, in this specific case WIP is non-volatile since all operation that
// involves work-in-progress pattern is within Netty Event-Loops which guarantees
Expand Down Expand Up @@ -149,11 +146,13 @@ public void subscribe(CoreSubscriber<? super Object> s) {

final void startReceiver(CoreSubscriber<? super Object> s) {
log.info(format(channel, "{}: subscribing before inbound receiver {}"), this, s);
if (once == 0 && ONCE.compareAndSet(this, 0, 1)) {
if (!subscribedOnce) {
subscribedOnce = true;
// if (log.isDebugEnabled()) {
log.info(format(channel, "{}: subscribing inbound receiver"), this);
// }
if (inboundDone && getPending() == 0) {
if ((inboundDone && getPending() == 0) || isCancelled()) {
log.info(format(channel, "{}: delivering terminal to late inbound receiver"), this);
if (inboundError != null) {
Operators.error(s, inboundError);
return;
Expand Down

0 comments on commit 5050581

Please sign in to comment.