Skip to content

Commit

Permalink
improves onDiscard support in sinks
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: OlegDokuka <odokuka@vmware.com>
  • Loading branch information
Oleh Dokuka committed Jun 2, 2023
1 parent 8549191 commit 7bc2fd2
Show file tree
Hide file tree
Showing 7 changed files with 268 additions and 41 deletions.
178 changes: 154 additions & 24 deletions reactor-core/src/main/java/reactor/core/publisher/EmitterProcessor.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.stream.Stream;

import org.reactivestreams.Publisher;
Expand All @@ -33,6 +34,8 @@
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.publisher.Sinks.EmitResult;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;
Expand Down Expand Up @@ -70,6 +73,8 @@
public final class EmitterProcessor<T> extends FluxProcessor<T, T> implements InternalManySink<T>,
Sinks.ManyWithUpstream<T> {

static final Logger log = Loggers.getLogger(EmitterProcessor.class);

@SuppressWarnings("rawtypes")
static final FluxPublish.PubSubInner[] EMPTY = new FluxPublish.PublishInner[0];

Expand Down Expand Up @@ -133,13 +138,32 @@ public static <E> EmitterProcessor<E> create(int bufferSize) {
*/
@Deprecated
public static <E> EmitterProcessor<E> create(int bufferSize, boolean autoCancel) {
return new EmitterProcessor<>(autoCancel, bufferSize);
return new EmitterProcessor<>(autoCancel, bufferSize, null);
}

/**
* Create a new {@link EmitterProcessor} using the provided backlog size and auto-cancellation.
*
* @param <E> Type of processed signals
* @param bufferSize the internal buffer size to hold signals
* @param autoCancel automatically cancel
*
* @return a fresh processor
* @deprecated use {@link Sinks.MulticastSpec#onBackpressureBuffer(int, boolean) Sinks.many().multicast().onBackpressureBuffer(bufferSize, autoCancel)}
* (or the unsafe variant if you're sure about external synchronization). To be removed in 3.5.
*/
@Deprecated
public static <E> EmitterProcessor<E> create(int bufferSize, boolean autoCancel, Consumer<? super E> onDiscardHook) {
return new EmitterProcessor<>(autoCancel, bufferSize, onDiscardHook);
}

final int prefetch;

final boolean autoCancel;

@Nullable
final Consumer<? super T> onDiscard;

volatile Subscription s;
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<EmitterProcessor, Subscription> S =
Expand Down Expand Up @@ -182,12 +206,13 @@ public static <E> EmitterProcessor<E> create(int bufferSize, boolean autoCancel)
Throwable.class,
"error");

EmitterProcessor(boolean autoCancel, int prefetch) {
EmitterProcessor(boolean autoCancel, int prefetch, @Nullable Consumer<? super T> onDiscard) {
if (prefetch < 1) {
throw new IllegalArgumentException("bufferSize must be strictly positive, " + "was: " + prefetch);
}
this.autoCancel = autoCancel;
this.prefetch = prefetch;
this.onDiscard = onDiscard;
//doesn't use INIT/CANCELLED distinction, contrary to FluxPublish)
//see remove()
SUBSCRIBERS.lazySet(this, EMPTY);
Expand All @@ -200,7 +225,12 @@ public Stream<? extends Scannable> inners() {

@Override
public Context currentContext() {
return Operators.multiSubscribersContext(subscribers);
if (onDiscard != null) {
return Operators.enableOnDiscard(Operators.multiSubscribersContext(subscribers), onDiscard);
}
else {
return Operators.multiSubscribersContext(subscribers);
}
}


Expand All @@ -213,9 +243,18 @@ private boolean detach() {
done = true;
CancellationException detachException = new CancellationException("the ManyWithUpstream sink had a Subscription to an upstream which has been manually cancelled");
if (ERROR.compareAndSet(EmitterProcessor.this, null, detachException)) {
if (WIP.getAndIncrement(this) != 0) {
return true;
}
Queue<T> q = queue;
if (q != null) {
q.clear();
Consumer<? super T> hook = this.onDiscard;
if (hook != null) {
discardQueue(q, hook);
}
else {
q.clear();
}
}
for (FluxPublish.PubSubInner<T> inner : terminate()) {
inner.actual.onError(detachException);
Expand Down Expand Up @@ -250,7 +289,7 @@ public void subscribe(CoreSubscriber<? super T> actual) {
if (inner.isCancelled()) {
remove(inner);
}
drain();
drain(null);
}
else {
Throwable e = error;
Expand All @@ -275,7 +314,7 @@ public EmitResult tryEmitComplete() {
return EmitResult.FAIL_TERMINATED;
}
done = true;
drain();
drain(null);
return EmitResult.OK;
}

Expand All @@ -292,7 +331,7 @@ public EmitResult tryEmitError(Throwable t) {
}
if (Exceptions.addThrowable(ERROR, this, t)) {
done = true;
drain();
drain(null);
return EmitResult.OK;
}
else {
Expand All @@ -303,7 +342,7 @@ public EmitResult tryEmitError(Throwable t) {
@Override
public void onNext(T t) {
if (sourceMode == Fuseable.ASYNC) {
drain();
drain(t);
return;
}
emitNext(t, Sinks.EmitFailureHandler.FAIL_FAST);
Expand Down Expand Up @@ -340,7 +379,8 @@ public EmitResult tryEmitNext(T t) {
if (!q.offer(t)) {
return subscribers == EMPTY ? EmitResult.FAIL_ZERO_SUBSCRIBER : EmitResult.FAIL_OVERFLOW;
}
drain();

drain(t);
return EmitResult.OK;
}

Expand Down Expand Up @@ -387,7 +427,7 @@ public void onSubscribe(final Subscription s) {
if (m == Fuseable.SYNC) {
sourceMode = m;
queue = f;
drain();
drain(null);
return;
}
else if (m == Fuseable.ASYNC) {
Expand Down Expand Up @@ -443,8 +483,23 @@ public Object scanUnsafe(Attr key) {
return super.scanUnsafe(key);
}

final void drain() {
final void drain(@Nullable T dataSignalOfferedBeforeDrain) {
if (WIP.getAndIncrement(this) != 0) {
Consumer<? super T> discardHook = this.onDiscard;
if (dataSignalOfferedBeforeDrain != null) {
if (discardHook != null && isCancelled()) {
try {
discardHook.accept(dataSignalOfferedBeforeDrain);
}
catch (Throwable t) {
log.warn("Error in discard hook", t);
}
}
else if (done) {
Operators.onNextDropped(dataSignalOfferedBeforeDrain,
currentContext());
}
}
return;
}

Expand All @@ -458,11 +513,12 @@ final void drain() {

boolean empty = q == null || q.isEmpty();

if (checkTerminated(d, empty)) {
if (checkTerminated(d, empty, null)) {
return;
}

FluxPublish.PubSubInner<T>[] a = subscribers;
Consumer<? super T> onDiscardHook = onDiscard;

if (a != EMPTY && !empty) {
long maxRequested = Long.MAX_VALUE;
Expand Down Expand Up @@ -492,9 +548,13 @@ final void drain() {
d = true;
v = null;
}
if (checkTerminated(d, v == null)) {
empty = v == null;
if (checkTerminated(d, empty, v)) {
return;
}
if (!empty && onDiscardHook != null) {
discard(v, onDiscardHook);
}
if (sourceMode != Fuseable.SYNC) {
s.request(1);
}
Expand All @@ -519,7 +579,7 @@ final void drain() {

empty = v == null;

if (checkTerminated(d, empty)) {
if (checkTerminated(d, empty, v)) {
return;
}

Expand All @@ -528,7 +588,7 @@ final void drain() {
if (sourceMode == Fuseable.SYNC) {
//the q is empty
done = true;
checkTerminated(true, true);
checkTerminated(true, true, null);
}
break;
}
Expand All @@ -555,7 +615,7 @@ final void drain() {
}
else if ( sourceMode == Fuseable.SYNC ) {
done = true;
if (checkTerminated(true, empty)) { //empty can be true if no subscriber
if (checkTerminated(true, empty, null)) { //empty can be true if no subscriber
break;
}
}
Expand All @@ -572,13 +632,23 @@ FluxPublish.PubSubInner<T>[] terminate() {
return SUBSCRIBERS.getAndSet(this, TERMINATED);
}

boolean checkTerminated(boolean d, boolean empty) {
boolean checkTerminated(boolean d, boolean empty, @Nullable T value) {
if (s == Operators.cancelledSubscription()) {
if (autoCancel) {
terminate();

Queue<T> q = queue;
if (q != null) {
q.clear();
Consumer<? super T> hook = this.onDiscard;
if (hook != null) {
if (value != null) {
discard(value, hook);
}
discardQueue(q, hook);
}
else {
q.clear();
}
}
}
return true;
Expand All @@ -588,7 +658,16 @@ boolean checkTerminated(boolean d, boolean empty) {
if (e != null && e != Exceptions.TERMINATED) {
Queue<T> q = queue;
if (q != null) {
q.clear();
Consumer<? super T> hook = this.onDiscard;
if (hook != null) {
if (value != null) {
discard(value, hook);
}
discardQueue(q, hook);
}
else {
q.clear();
}
}
for (FluxPublish.PubSubInner<T> inner : terminate()) {
inner.actual.onError(e);
Expand All @@ -605,6 +684,31 @@ else if (empty) {
return false;
}

static <T> void discardQueue(Queue<T> q, Consumer<? super T> hook) {
for (; ; ) {
T toDiscard = q.poll();
if (toDiscard == null) {
break;
}

try {
hook.accept(toDiscard);
}
catch (Throwable t) {
log.warn("Error while discarding a queue element, continuing with next queue element", t);
}
}
}

static <T> void discard(T value, Consumer<? super T> hook) {
try {
hook.accept(value);
}
catch (Throwable t) {
log.warn("Error while discarding a queue element, continuing with next queue element", t);
}
}

final boolean add(EmitterInner<T> inner) {
for (; ; ) {
FluxPublish.PubSubInner<T>[] a = subscribers;
Expand All @@ -624,7 +728,27 @@ final boolean add(EmitterInner<T> inner) {
final void remove(FluxPublish.PubSubInner<T> inner) {
for (; ; ) {
FluxPublish.PubSubInner<T>[] a = subscribers;
if (a == TERMINATED || a == EMPTY) {
if (a == EMPTY) {
// means cancelled without adding
if (autoCancel && Operators.terminate(S, this)) {
if (WIP.getAndIncrement(this) != 0) {
return;
}
terminate();
Queue<T> q = queue;
if (q != null) {
Consumer<? super T> hook = this.onDiscard;
if (hook != null) {
discardQueue(q, hook);
}
else {
q.clear();
}
}
}
return;
}
else if (a == TERMINATED) {
return;
}
int n = a.length;
Expand Down Expand Up @@ -659,7 +783,13 @@ final void remove(FluxPublish.PubSubInner<T> inner) {
terminate();
Queue<T> q = queue;
if (q != null) {
q.clear();
Consumer<? super T> hook = this.onDiscard;
if (hook != null) {
discardQueue(q, hook);
}
else {
q.clear();
}
}
}
return;
Expand All @@ -683,13 +813,13 @@ static final class EmitterInner<T> extends FluxPublish.PubSubInner<T> {

@Override
void drainParent() {
parent.drain();
parent.drain(null);
}

@Override
void removeAndDrainParent() {
parent.remove(this);
parent.drain();
parent.drain(null);
}
}

Expand Down
Loading

0 comments on commit 7bc2fd2

Please sign in to comment.