Skip to content

Commit

Permalink
Propagate backpressure to the transport channel #394
Browse files Browse the repository at this point in the history
Backpressure of reactive commands, more specific the missing demand, is propagated to the channel. Reactive commands receive a batch of data and are queried whether they still have a demand for data or whether their demand is satisfied (because of no demand/saturated demand). A command without demand will disable further channel reads until the command is completed/canceled or signals further demand.

A significant amount of unread data will eventually cause the TCP receive buffer to overflow. This change moves the buffer responsibility away from the application into the operating system layer which requires then appropriate receive buffer sizing. On the other side, the application is protected against buffer overflows inside the application and reduces the memory footprint because of a reduced amount of data that is held inside the application.
  • Loading branch information
mp911de committed Jan 21, 2017
1 parent d610704 commit af27880
Show file tree
Hide file tree
Showing 8 changed files with 430 additions and 77 deletions.
149 changes: 97 additions & 52 deletions src/main/java/com/lambdaworks/redis/RedisPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.lambdaworks.redis.internal.LettuceAssert;
import com.lambdaworks.redis.output.StreamingOutput;
import com.lambdaworks.redis.protocol.CommandWrapper;
import com.lambdaworks.redis.protocol.DemandAware;
import com.lambdaworks.redis.protocol.RedisCommand;

import io.netty.util.internal.logging.InternalLogger;
Expand Down Expand Up @@ -131,7 +132,8 @@ static class RedisSubscription<T> implements Subscription, StreamingOutput.Subsc
static final AtomicLongFieldUpdater<RedisSubscription> DEMAND = AtomicLongFieldUpdater.newUpdater(
RedisSubscription.class, "demand");

final boolean traceEnabled = LOG.isTraceEnabled();
private final SubscriptionCommand<?, ?, T> subscriptionCommand;
private final boolean traceEnabled = LOG.isTraceEnabled();

final Queue<T> data = QueueSupplier.<T> unbounded().get();
final StatefulConnection<?, ?> connection;
Expand All @@ -145,6 +147,7 @@ static class RedisSubscription<T> implements Subscription, StreamingOutput.Subsc

Subscriber<? super T> subscriber;

@SuppressWarnings("unchecked")
RedisSubscription(StatefulConnection<?, ?> connection, RedisCommand<?, ?, T> command, boolean dissolve) {

LettuceAssert.notNull(connection, "Connection must not be null");
Expand All @@ -153,6 +156,19 @@ static class RedisSubscription<T> implements Subscription, StreamingOutput.Subsc
this.connection = connection;
this.command = command;
this.dissolve = dissolve;

if (command.getOutput() instanceof StreamingOutput<?>) {
StreamingOutput<T> streamingOutput = (StreamingOutput<T>) command.getOutput();

if (connection instanceof StatefulRedisConnection<?, ?> && ((StatefulRedisConnection) connection).isMulti()) {
streamingOutput.setSubscriber(new CompositeSubscriber<>(
Arrays.asList(this, streamingOutput.getSubscriber())));
} else {
streamingOutput.setSubscriber(this);
}
}

this.subscriptionCommand = new SubscriptionCommand<>(command, this, dissolve);
}

/**
Expand Down Expand Up @@ -201,10 +217,6 @@ public final void cancel() {
state().cancel(this);
}

private RedisPublisher.State state() {
return this.state.get();
}

/**
* Called by {@link StreamingOutput} to dispatch data (push).
*
Expand All @@ -215,6 +227,10 @@ public void onNext(T t) {

LettuceAssert.notNull(t, "Data must not be null");

if (state() == State.COMPLETED) {
return;
}

if (!data.offer(t)) {

Throwable e = Operators.onOperatorError(this, Exceptions.failWithOverflow(), t);
Expand All @@ -235,7 +251,7 @@ final void onDataAvailable() {
LOG.trace("{} onDataAvailable()", state());
}

this.state.get().onDataAvailable(this);
state().onDataAvailable(this);
}

/**
Expand All @@ -248,7 +264,7 @@ final void onAllDataRead() {
LOG.trace("{} onAllDataRead()", state());
}

this.state.get().onAllDataRead(this);
state().onAllDataRead(this);
}

/**
Expand All @@ -262,34 +278,7 @@ final void onError(Throwable t) {
LOG.trace("{} onError(): {}", state(), t.toString(), t);
}

this.state.get().onError(this, t);
}

/**
* Reads and publishes data from the input. Continues until either there is no more demand, or until there is no more
* data to be read.
*
* @return {@literal true} if there is more demand, {@literal false} otherwise.
*/
private boolean readAndPublish() throws IOException {

while (hasDemand()) {

T data = read();

if (data != null) {

this.subscriber.onNext(data);

if (Operators.addAndGet(DEMAND, this, -1) == 0) {
return false;
}
} else {
return true;
}
}

return false;
state().onError(this, t);
}

/**
Expand All @@ -315,26 +304,51 @@ void checkCommandDispatch() {

@SuppressWarnings({ "unchecked", "rawtypes" })
void dispatchCommand() {
connection.dispatch((RedisCommand) subscriptionCommand);
}

if (command.getOutput() instanceof StreamingOutput<?>) {
StreamingOutput<T> streamingOutput = (StreamingOutput<T>) command.getOutput();
void checkOnDataAvailable() {

if (connection instanceof StatefulRedisConnection<?, ?> && ((StatefulRedisConnection) connection).isMulti()) {
streamingOutput.setSubscriber(new CompositeSubscriber<T>(Arrays.asList(this,
streamingOutput.getSubscriber())));
} else {
streamingOutput.setSubscriber(this);
if (data.isEmpty()) {
if (hasDemand()) {
state().readData(this);
}
}

connection.dispatch(new SubscriptionCommand(command, this, dissolve));
if (!data.isEmpty()) {
onDataAvailable();
}
}

void checkOnDataAvailable() {
/**
* Reads and publishes data from the input. Continues until either there is no more demand, or until there is no more
* data to be read.
*
* @return {@literal true} if there is more demand, {@literal false} otherwise.
*/
private boolean readAndPublish() throws IOException {

if (!data.isEmpty()) {
onDataAvailable();
while (hasDemand()) {

T data = read();

if (data != null) {

this.subscriber.onNext(data);

if (Operators.addAndGet(DEMAND, this, -1) == 0) {
return false;
}
} else {
return true;
}
}

return false;
}

RedisPublisher.State state() {
return this.state.get();
}
}

Expand Down Expand Up @@ -425,7 +439,7 @@ void request(RedisSubscription<?> subscription, long n) {

if (Operators.checkRequest(n, subscription.subscriber)) {

Operators.getAndAddCap(subscription.DEMAND, subscription, n);
Operators.getAndAddCap(RedisSubscription.DEMAND, subscription, n);

if (subscription.changeState(this, DEMAND)) {

Expand Down Expand Up @@ -473,7 +487,7 @@ void onDataAvailable(RedisSubscription<?> subscription) {
void request(RedisSubscription<?> subscription, long n) {

if (Operators.checkRequest(n, subscription.subscriber)) {
Operators.getAndAddCap(subscription.DEMAND, subscription, n);
Operators.getAndAddCap(RedisSubscription.DEMAND, subscription, n);
}
}

Expand All @@ -484,7 +498,7 @@ void request(RedisSubscription<?> subscription, long n) {
void request(RedisSubscription<?> subscription, long n) {

if (Operators.checkRequest(n, subscription.subscriber)) {
Operators.getAndAddCap(subscription.DEMAND, subscription, n);
Operators.getAndAddCap(RedisSubscription.DEMAND, subscription, n);
}
}
},
Expand Down Expand Up @@ -526,7 +540,16 @@ void request(RedisSubscription<?> subscription, long n) {
void cancel(RedisSubscription<?> subscription) {

subscription.command.cancel();
subscription.changeState(this, COMPLETED);
if (subscription.changeState(this, COMPLETED)) {
readData(subscription);
}
}

void readData(RedisSubscription<?> subscription) {
DemandAware.Source source = subscription.subscriptionCommand.source;
if (source != null) {
source.requestMore();
}
}

void onDataAvailable(RedisSubscription<?> subscription) {
Expand All @@ -538,6 +561,7 @@ void onAllDataRead(RedisSubscription<?> subscription) {
subscription.allDataRead = true;

if (subscription.data.isEmpty() && subscription.changeState(this, COMPLETED)) {
readData(subscription);
if (subscription.subscriber != null) {
subscription.subscriber.onComplete();
}
Expand All @@ -547,6 +571,7 @@ void onAllDataRead(RedisSubscription<?> subscription) {
void onError(RedisSubscription<?> subscription, Throwable t) {

if (subscription.changeState(this, COMPLETED)) {
readData(subscription);
if (subscription.subscriber != null) {
subscription.subscriber.onError(t);
}
Expand All @@ -561,11 +586,12 @@ void onError(RedisSubscription<?> subscription, Throwable t) {
* @param <V> value type
* @param <T> response type
*/
private static class SubscriptionCommand<K, V, T> extends CommandWrapper<K, V, T> {
private static class SubscriptionCommand<K, V, T> extends CommandWrapper<K, V, T> implements DemandAware.Sink {

private final boolean dissolve;
private final RedisSubscription<T> subscription;
private boolean completed = false;
private volatile boolean completed = false;
private volatile DemandAware.Source source;

public SubscriptionCommand(RedisCommand<K, V, T> command, RedisSubscription<T> subscription, boolean dissolve) {

Expand All @@ -575,6 +601,13 @@ public SubscriptionCommand(RedisCommand<K, V, T> command, RedisSubscription<T> s
this.dissolve = dissolve;
}

@Override
public boolean hasDemand() {

// signal demand as completed commands just no-op on incoming data.
return completed || subscription.state() == State.COMPLETED || subscription.demand > subscription.data.size();
}

@Override
@SuppressWarnings("unchecked")
public void complete() {
Expand All @@ -598,7 +631,9 @@ public void complete() {
if (!(getOutput() instanceof StreamingOutput<?>) && result != null) {

if (dissolve && result instanceof Collection) {

Collection<T> collection = (Collection<T>) result;

for (T t : collection) {
if (t != null) {
subscription.onNext(t);
Expand All @@ -617,6 +652,16 @@ public void complete() {
}
}

@Override
public void setSource(DemandAware.Source source) {
this.source = source;
}

@Override
public void removeSource() {
this.source = null;
}

@Override
public void cancel() {

Expand Down

0 comments on commit af27880

Please sign in to comment.