Skip to content

Commit

Permalink
Optimize field access #394
Browse files Browse the repository at this point in the history
Increase field visibility to package level to prevent synthetic accessors. Use AtomicLongFieldUpdater for demand updating that was previously an AtomicLong to reduce memory footprint. Use CommandDispatch state to perform command dispatching. That change prevents conditional code to be executed if data is requested (no demand -> demand) while command execution is allowed to happen only once.
  • Loading branch information
mp911de committed Jan 8, 2017
1 parent 48e54b6 commit 832b5c8
Showing 1 changed file with 61 additions and 27 deletions.
88 changes: 61 additions & 27 deletions src/main/java/com/lambdaworks/redis/RedisPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
import java.util.Collection;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

Expand All @@ -32,6 +30,7 @@

import reactor.core.Exceptions;
import reactor.core.publisher.Operators;
import reactor.util.concurrent.QueueSupplier;

import com.lambdaworks.redis.api.StatefulConnection;
import com.lambdaworks.redis.api.StatefulRedisConnection;
Expand Down Expand Up @@ -125,23 +124,26 @@ public void subscribe(Subscriber<? super T> subscriber) {
*
* @param <T> data element type
*/
private static class RedisSubscription<T> implements Subscription, StreamingOutput.Subscriber<T> {
static class RedisSubscription<T> implements Subscription, StreamingOutput.Subscriber<T> {

private static final InternalLogger LOG = InternalLoggerFactory.getInstance(RedisPublisher.class);
private final boolean traceEnabled = LOG.isTraceEnabled();
static final InternalLogger LOG = InternalLoggerFactory.getInstance(RedisPublisher.class);

private final AtomicLong demand = new AtomicLong();
private final Queue<T> data = new ConcurrentLinkedQueue<T>();
private final AtomicBoolean dispatched = new AtomicBoolean();
private volatile boolean allDataRead = false;
static final AtomicLongFieldUpdater<RedisSubscription> DEMAND = AtomicLongFieldUpdater.newUpdater(
RedisSubscription.class, "demand");

private final StatefulConnection<?, ?> connection;
private final RedisCommand<?, ?, T> command;
private final boolean dissolve;
final boolean traceEnabled = LOG.isTraceEnabled();

final Queue<T> data = QueueSupplier.<T> unbounded().get();
final StatefulConnection<?, ?> connection;
final RedisCommand<?, ?, T> command;
final boolean dissolve;
final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED);
final AtomicReference<CommandDispatch> commandDispatch = new AtomicReference<>(CommandDispatch.UNDISPATCHED);

private Subscriber<? super T> subscriber;
volatile long demand;
volatile boolean allDataRead = false;

private final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED);
Subscriber<? super T> subscriber;

RedisSubscription(StatefulConnection<?, ?> connection, RedisCommand<?, ?, T> command, boolean dissolve) {

Expand Down Expand Up @@ -279,7 +281,7 @@ private boolean readAndPublish() throws IOException {

this.subscriber.onNext(data);

if (Operators.addAndGet(this.demand, -1) == 0) {
if (Operators.addAndGet(DEMAND, this, -1) == 0) {
return false;
}
} else {
Expand All @@ -292,26 +294,23 @@ private boolean readAndPublish() throws IOException {

/**
* Reads data from the input, if possible.
*
*
* @return the data that was read or {@literal null}
*/
protected T read() {
return data.poll();
}

private boolean hasDemand() {
return this.demand.get() > 0;
return this.demand > 0;
}

private boolean changeState(State oldState, State newState) {
return this.state.compareAndSet(oldState, newState);
}

void checkCommandDispatch() {

if (!dispatched.get() && dispatched.compareAndSet(false, true)) {
dispatchCommand();
}
commandDispatch.get().dispatch(this);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
Expand Down Expand Up @@ -340,7 +339,42 @@ void checkOnDataAvailable() {
}

/**
* Represents a state for the {@link Subscription} to be in. The following figure indicate the four different states that
* Represents a state for command dispatch of the {@link Subscription}. The following figure indicates the two different
* states that exist, and the relationships between them.
*
* <pre>
* UNDISPATCHED
* |
* v
* DISPATCHED
* </pre>
*
* Refer to the individual states for more information.
*/
private enum CommandDispatch {

/**
* Initial state. Will respond to {@link #dispatch(RedisSubscription)} by changing the state to {@link #DISPATCHED} and
* dispatch the command.
*/
UNDISPATCHED {

@Override
void dispatch(RedisSubscription<?> redisSubscription) {

if (redisSubscription.commandDispatch.compareAndSet(this, DISPATCHED)) {
redisSubscription.dispatchCommand();
}
}
},
DISPATCHED;

void dispatch(RedisSubscription<?> redisSubscription) {
}
}

/**
* Represents a state for the {@link Subscription} to be in. The following figure indicates the four different states that
* exist, and the relationships between them.
*
* <pre>
Expand All @@ -355,7 +389,7 @@ void checkOnDataAvailable() {
* | v |
* ------------> COMPLETED <---------
* </pre>
*
*
* Refer to the individual states for more information.
*/
private enum State {
Expand Down Expand Up @@ -391,7 +425,7 @@ void request(RedisSubscription<?> subscription, long n) {

if (Operators.checkRequest(n, subscription.subscriber)) {

Operators.addAndGet(subscription.demand, n);
Operators.getAndAddCap(subscription.DEMAND, subscription, n);

if (subscription.changeState(this, DEMAND)) {

Expand Down Expand Up @@ -439,7 +473,7 @@ void onDataAvailable(RedisSubscription<?> subscription) {
void request(RedisSubscription<?> subscription, long n) {

if (Operators.checkRequest(n, subscription.subscriber)) {
Operators.addAndGet(subscription.demand, n);
Operators.getAndAddCap(subscription.DEMAND, subscription, n);
}
}

Expand All @@ -450,7 +484,7 @@ void request(RedisSubscription<?> subscription, long n) {
void request(RedisSubscription<?> subscription, long n) {

if (Operators.checkRequest(n, subscription.subscriber)) {
Operators.addAndGet(subscription.demand, n);
Operators.getAndAddCap(subscription.DEMAND, subscription, n);
}
}
},
Expand Down

0 comments on commit 832b5c8

Please sign in to comment.