Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add handlers to provide some generic backpressure implementations. #6662

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.backpressure;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;

/**
* {@link ChannelInboundHandlerAdapter} implementation which empowers back-pressure by stop reading from the remote peer
* once a {@link Channel} becomes unwritable. In this case {@link ChannelHandlerContext#flush()} is called and
* reads are continueed once the {@link Channel} becomes writable again.
* This ensures we stop reading from the remote peer if we are writing faster then the remote peer can handle.
*
* Use this handler if {@link ChannelOption#AUTO_READ} is set to {@code true}, which is the
* default. If {@link ChannelOption#AUTO_READ} is set to {@code false} you must use {@link BackPressureHandler}.
*/
public final class BackPressureAutoReadHandler extends ChannelInboundHandlerAdapter {
// Keep track of the auto read state that we changed things to. This will be used when removing the
// handler to detect if we need to restore the auto read setting.
private boolean autoReadState;

@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isWritable()) {
ctx.channel().config().setAutoRead(true);
autoReadState = true;
} else {
ctx.channel().config().setAutoRead(false);
autoReadState = false;
ctx.flush();
}
// Propergate the event as the user may still want to do something based on it.
ctx.fireChannelWritabilityChanged();
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (!ctx.channel().config().isAutoRead()) {
throw new IllegalStateException(ChannelOption.AUTO_READ.name() + " must be set to true");
}
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
if (autoReadState) {
// We should restore the auto-read setting as otherwise we may produce a stale as the user will
// not know we stopped reading.
ctx.channel().config().setAutoRead(true);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.backpressure;

import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;

/**
* {@link ChannelDuplexHandler} implementation which empowers back-pressure by stop reading from the remote peer
* once a {@link Channel} becomes unwritable. In this case {@link ChannelHandlerContext#flush()} is called and
* reads are continueed once the {@link Channel} becomes writable again.
* This ensures we stop reading from the remote peer if we are writing faster then the remote peer can handle.
*
* Use this handler if {@link ChannelOption#AUTO_READ} is set to {@code false}, which is <strong>NOT</strong> the
* default. If {@link ChannelOption#AUTO_READ} is set to {@code true} use {@link BackPressureAutoReadHandler} for
* maximal performance.
*/
public final class BackPressureHandler extends ChannelDuplexHandler {

private boolean readPending;
private boolean writable = true;

@Override
public void read(ChannelHandlerContext ctx) throws Exception {
if (writable) {
ctx.read();
} else {
readPending = true;
}
}

@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
writable = ctx.channel().isWritable();
if (writable) {
if (readPending) {
readPending = false;
ctx.read();
}
} else {
ctx.flush();
}
// Propergate the event as the user may still want to do something based on it.
ctx.fireChannelWritabilityChanged();
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
if (readPending) {
ctx.read();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

/**
* Package to control back-pressure.
*/
package io.netty.handler.backpressure;
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.backpressure;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Test;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertEquals;

public class BackPressureAutoReadHandlerTest {

@Test
public void testBackpressure() {
final AtomicBoolean writableOnFlush = new AtomicBoolean(true);
final AtomicInteger flushCount = new AtomicInteger();
EmbeddedChannel channel = new EmbeddedChannel(new ChannelOutboundHandlerAdapter() {
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
flushCount.incrementAndGet();
writableOnFlush.set(ctx.channel().isWritable());
ctx.flush();
}
}, new BackPressureAutoReadHandler());
channel.config().setWriteBufferWaterMark(new WriteBufferWaterMark(128, 512));
assertTrue(channel.config().isAutoRead());
assertEquals(0, flushCount.get());
ChannelFuture future = channel.write(Unpooled.buffer().writeZero(129));
assertFalse(future.isDone());
assertEquals(0, flushCount.get());
assertTrue(channel.config().isAutoRead());
// This should trigger a flush and while the flush is executed the Channel should still be non-writable
ChannelFuture future2 = channel.write(Unpooled.buffer().writeZero(1024));
assertEquals(1, flushCount.get());
assertFalse(writableOnFlush.get());

assertTrue(future.isDone());
assertTrue(future2.isDone());

ByteBuf buffer = channel.readOutbound();
buffer.release();

buffer = channel.readOutbound();
buffer.release();

assertFalse(channel.finish());
}

@Test
public void testThrowIfNotAutoRead() {
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
EmbeddedChannel channel = new EmbeddedChannel();
channel.config().setAutoRead(false);
channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
error.compareAndSet(null, cause);
}
});
channel.pipeline().addFirst(new BackPressureAutoReadHandler());
assertTrue(error.get().getCause() instanceof IllegalStateException);
assertFalse(channel.finish());
}

@Test
public void testNotResetAutoRead() {
EmbeddedChannel channel = new EmbeddedChannel();
channel.pipeline().addFirst("backpressure", new BackPressureAutoReadHandler());
channel.config().setAutoRead(false);
channel.pipeline().remove("backpressure");
assertFalse(channel.config().isAutoRead());
assertFalse(channel.finish());
}
}
Loading