Skip to content

Commit

Permalink
Merge #2756 into 2.0.0-M4
Browse files Browse the repository at this point in the history
The test is dropped as it relies that the EmbeddedEventLoop can be accessed by multiple threads.
This is not allowed any more in Netty 5
netty/netty#12871
  • Loading branch information
violetagg committed Apr 4, 2023
2 parents cb26eea + 61d1315 commit 505eb54
Showing 1 changed file with 4 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.IntConsumer;

Expand Down Expand Up @@ -60,9 +59,7 @@ final class FluxReceive extends Flux<Object> implements Subscription, Disposable

volatile IntConsumer receiverCancel;

volatile int once;
static final AtomicIntegerFieldUpdater<FluxReceive> ONCE =
AtomicIntegerFieldUpdater.newUpdater(FluxReceive.class, "once");
boolean subscribedOnce;

// Please note, in this specific case WIP is non-volatile since all operation that
// involves work-in-progress pattern is within Netty Event-Loops which guarantees
Expand Down Expand Up @@ -151,11 +148,12 @@ public void subscribe(CoreSubscriber<? super Object> s) {
}

final void startReceiver(CoreSubscriber<? super Object> s) {
if (once == 0 && ONCE.compareAndSet(this, 0, 1)) {
if (!subscribedOnce) {
subscribedOnce = true;
if (log.isDebugEnabled()) {
log.debug(format(channel, "{}: subscribing inbound receiver"), this);
}
if (inboundDone && getPending() == 0) {
if ((inboundDone && getPending() == 0) || isCancelled()) {
if (inboundError != null) {
Operators.error(s, inboundError);
return;
Expand Down

0 comments on commit 505eb54

Please sign in to comment.