Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Polishing #824
Add synchronization to subscription termination to guarantee only a single signal (error/completion) is sent and to avoid duplicate signals.
  • Loading branch information
mp911de committed Jul 31, 2018
1 parent dea99cb commit 08d2836
Showing 1 changed file with 26 additions and 18 deletions.
44 changes: 26 additions & 18 deletions src/main/java/io/lettuce/core/ScanStream.java
Expand Up @@ -18,6 +18,7 @@
import java.util.Iterator;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;

Expand Down Expand Up @@ -255,14 +256,23 @@ private static <V, C extends ScanCursor> void scan(FluxSink<V> sink, Mono<C> ini
*/
static class SubscriptionAdapter<T, C extends ScanCursor> implements Completable {

private static final AtomicReferenceFieldUpdater<SubscriptionAdapter, ScanSubscriber> SUBSCRIBER_ACCESSOR = AtomicReferenceFieldUpdater
private static final AtomicReferenceFieldUpdater<SubscriptionAdapter, ScanSubscriber> SUBSCRIBER = AtomicReferenceFieldUpdater
.newUpdater(SubscriptionAdapter.class, ScanSubscriber.class, "currentSubscription");

// Access via SUBSCRIBER_UPDATER.
private static final AtomicIntegerFieldUpdater<SubscriptionAdapter> STATUS = AtomicIntegerFieldUpdater.newUpdater(
SubscriptionAdapter.class, "status");

private static final int STATUS_ACTIVE = 0;
private static final int STATUS_TERMINATED = 0;

// Access via SUBSCRIBER.
@SuppressWarnings("unused")
private volatile ScanSubscriber<T, C> currentSubscription;
private volatile boolean canceled;
private volatile boolean terminated = false;

// Access via STATUS.
@SuppressWarnings("unused")
private volatile int status = STATUS_ACTIVE;

private final FluxSink<T> sink;
private final Context context;
Expand Down Expand Up @@ -301,7 +311,7 @@ void onDemand(long n) {

if (current == null) {
current = new ScanSubscriber<>(this, sink, context, manyMapper);
if (SUBSCRIBER_ACCESSOR.compareAndSet(this, null, current)) {
if (SUBSCRIBER.compareAndSet(this, null, current)) {
initial.subscribe(current);
}

Expand All @@ -328,7 +338,7 @@ void onDemand(long n) {
Mono<C> next = scanFunction.apply(cursor);

ScanSubscriber<T, C> nextSubscriber = new ScanSubscriber<>(this, sink, context, manyMapper);
if (SUBSCRIBER_ACCESSOR.compareAndSet(this, current, nextSubscriber)) {
if (SUBSCRIBER.compareAndSet(this, current, nextSubscriber)) {
next.subscribe(nextSubscriber);
}
}
Expand Down Expand Up @@ -361,29 +371,27 @@ public void chunkCompleted() {
}
}

if (!this.terminated) {
this.terminated = true;
if (terminate()) {
sink.complete();
} else {
this.terminated = true;
}
}
}

ScanSubscriber<T, C> getCurrentSubscriber() {
return SUBSCRIBER_ACCESSOR.get(this);
return SUBSCRIBER.get(this);
}

@Override
public void onError(Throwable throwable) {

if (!this.canceled && !this.terminated) {
this.terminated = true;
if (!this.canceled && terminate()) {
sink.error(throwable);
} else {
this.terminated = true;
}
}

protected boolean terminate() {
return STATUS.compareAndSet(this, STATUS_ACTIVE, STATUS_TERMINATED);
}
}

/**
Expand All @@ -394,7 +402,7 @@ public void onError(Throwable throwable) {
*/
static class ScanSubscriber<T, C extends ScanCursor> extends BaseSubscriber<C> {

private static final AtomicReferenceFieldUpdater<ScanSubscriber, ScanCursor> CURSOR_ACCESSOR = AtomicReferenceFieldUpdater
private static final AtomicReferenceFieldUpdater<ScanSubscriber, ScanCursor> CURSOR = AtomicReferenceFieldUpdater
.newUpdater(ScanSubscriber.class, ScanCursor.class, "cursor");

private final Completable completable;
Expand All @@ -406,7 +414,7 @@ static class ScanSubscriber<T, C extends ScanCursor> extends BaseSubscriber<C> {
volatile boolean exhausted = false;
volatile boolean canceled;

// see CURSOR_UPDATER
// see CURSOR
@SuppressWarnings("unused")
private volatile C cursor;

Expand All @@ -425,7 +433,7 @@ public Context currentContext() {
@Override
protected void hookOnNext(C cursor) {

if (!CURSOR_ACCESSOR.compareAndSet(this, null, cursor)) {
if (!CURSOR.compareAndSet(this, null, cursor)) {
Operators.onOperatorError(this, new IllegalStateException("Cannot propagate Cursor"), cursor, context);
return;
}
Expand Down Expand Up @@ -517,7 +525,7 @@ protected void hookOnCancel() {
}

public ScanCursor getCursor() {
return CURSOR_ACCESSOR.get(this);
return CURSOR.get(this);
}
}

Expand Down

0 comments on commit 08d2836

Please sign in to comment.