Skip to content

Commit

Permalink
Improve abstraction over internal state of UniOperatorProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
Sanne committed Aug 24, 2021
1 parent 4704146 commit 9be8567
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void onSubscribe(UniSubscription subscription) {
timeoutFuture = executor.schedule(this::doTimeout, timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
// Executor out of service.
upstream.set(CANCELLED);
getAndSetUpstreamSubscription(CANCELLED);
subscription.cancel();
downstream.onSubscribe(EmptyUniSubscription.DONE);
downstream.onFailure(e);
Expand All @@ -61,7 +61,7 @@ public void onSubscribe(UniSubscription subscription) {

@Override
public void onItem(I item) {
UniSubscription sub = upstream.getAndSet(CANCELLED);
UniSubscription sub = getAndSetUpstreamSubscription(CANCELLED);
if (sub != CANCELLED) {
if (timeoutFuture != null) {
timeoutFuture.cancel(false);
Expand All @@ -72,7 +72,7 @@ public void onItem(I item) {

@Override
public void onFailure(Throwable failure) {
UniSubscription sub = upstream.getAndSet(CANCELLED);
UniSubscription sub = getAndSetUpstreamSubscription(CANCELLED);
if (sub != CANCELLED) {
if (timeoutFuture != null) {
timeoutFuture.cancel(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void onFailure(Throwable failure) {
@Override
public void cancel() {
if (state.compareAndSet(State.INIT, State.CANCELLED)) {
UniSubscription sub = upstream.getAndSet(CANCELLED);
UniSubscription sub = getAndSetUpstreamSubscription(CANCELLED);
callback.run();
if (sub != null) {
sub.cancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void onFailure(Throwable failure) {
@Override
public void cancel() {
if (state.compareAndSet(State.INIT, State.CANCELLED)) {
UniSubscription sub = upstream.getAndSet(CANCELLED);
UniSubscription sub = getAndSetUpstreamSubscription(CANCELLED);
execute().subscribe().with(
ignoredItem -> {
if (sub != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public UniOnFailureFlatMapProcessor(UniSubscriber<? super I> downstream) {

@Override
public void onSubscribe(UniSubscription subscription) {
if (upstream.get() == null) {
if (getCurrentUpstreamSubscription() == null) {
super.onSubscribe(subscription);
} else if (innerSubscription == null) {
this.innerSubscription = subscription;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public UniOnItemOrFailureFlatMapProcessor(UniSubscriber<? super O> downstream) {

@Override
public void onSubscribe(UniSubscription subscription) {
if (upstream.get() == null) {
if (getCurrentUpstreamSubscription() == null) {
super.onSubscribe(subscription);
} else if (innerSubscription == null) {
this.innerSubscription = subscription;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public UniOnItemTransformToUniProcessor(UniSubscriber<? super O> downstream) {

@Override
public void onSubscribe(UniSubscription subscription) {
if (upstream.get() == null) {
if (getCurrentUpstreamSubscription() == null) {
super.onSubscribe(subscription);
} else if (innerSubscription == null) {
this.innerSubscription = subscription;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ public abstract class UniOperatorProcessor<I, O> implements UniSubscriber<I>, Un

protected final UniSubscriber<? super O> downstream;

private static final AtomicReferenceFieldUpdater<UniOperatorProcessor,UniSubscription> updater = AtomicReferenceFieldUpdater.newUpdater(UniOperatorProcessor.class, UniSubscription.class, "upstream");
private static final AtomicReferenceFieldUpdater<UniOperatorProcessor, UniSubscription> updater = AtomicReferenceFieldUpdater
.newUpdater(UniOperatorProcessor.class, UniSubscription.class, "upstream");

protected volatile UniSubscription upstream;
private volatile UniSubscription upstream;

public UniOperatorProcessor(UniSubscriber<? super O> downstream) {
this.downstream = ParameterValidation.nonNull(downstream, "downstream");
Expand Down Expand Up @@ -61,4 +62,17 @@ public void cancel() {
public boolean isCancelled() {
return upstream == CANCELLED;
}

protected final UniSubscription getCurrentUpstreamSubscription() {
return upstream;
}

protected final UniSubscription getAndSetUpstreamSubscription(UniSubscription newValue) {
return updater.getAndSet(this, newValue);
}

protected final boolean compareAndSetUpstreamSubscription(UniSubscription expect, UniSubscription update) {
return updater.compareAndSet(this, expect, update);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public UniRetryAtMostProcessor(UniSubscriber<? super T> downstream) {
@Override
public void onSubscribe(UniSubscription subscription) {
int count = counter.incrementAndGet();
if (upstream.compareAndSet(null, subscription)) {
if (compareAndSetUpstreamSubscription(null, subscription)) {
if (count == 1) {
downstream.onSubscribe(this);
}
Expand All @@ -61,7 +61,7 @@ public void onFailure(Throwable failure) {
downstream.onFailure(failure);
return;
}
UniSubscription previousSubscription = upstream.getAndSet(null);
UniSubscription previousSubscription = getAndSetUpstreamSubscription(null);
if (previousSubscription != null) {
previousSubscription.cancel();
}
Expand Down

0 comments on commit 9be8567

Please sign in to comment.