Skip to content

Commit

Permalink
Add handlers to provide some generic backpressure implementations.
Browse files Browse the repository at this point in the history
Motivation:

Often what can be done to implement back-pressure is to stop reading from the socket until we were able to flush out enough data. We should provide some generic re-usable handlers.

Modifications:

Add two handlers which will add simple back-pressure when added to the pipeline.

Result:

Include simple handlers to support back-pressure.
  • Loading branch information
normanmaurer committed Apr 24, 2017
1 parent 3ac6d07 commit 918f61d
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.backpressure;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;

/**
* {@link ChannelInboundHandlerAdapter} implementation which empowers back-pressure by stop reading from the remote peer
* once a {@link Channel} becomes unwritable. In this case {@link ChannelHandlerContext#flush()} is called and
* reads are continueed once the {@link Channel} becomes writable again.
* This ensures we stop reading from the remote peer if we are writing faster then the remote peer can handle.
*
* Use this handler if {@link ChannelOption#AUTO_READ} is set to {@code true}, which is the
* default. If {@link ChannelOption#AUTO_READ} is set to {@code false} you must use {@link BackPressureHandler}.
*/
@ChannelHandler.Sharable
public final class BackPressureAutoReadHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isWritable()) {
ctx.channel().config().setAutoRead(true);
} else {
ctx.channel().config().setAutoRead(false);
ctx.flush();
}
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (!ctx.channel().config().isAutoRead()) {
throw new IllegalStateException(ChannelOption.AUTO_READ.name() + " must be set to true");
}
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// We should restore the auto-read setting as otherwise we may produce a stale as the user will not know
// we stopped reading.
ctx.channel().config().setAutoRead(true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.backpressure;

import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;

/**
* {@link ChannelDuplexHandler} implementation which empowers back-pressure by stop reading from the remote peer
* once a {@link Channel} becomes unwritable. In this case {@link ChannelHandlerContext#flush()} is called and
* reads are continueed once the {@link Channel} becomes writable again.
* This ensures we stop reading from the remote peer if we are writing faster then the remote peer can handle.
*
* Use this handler if {@link ChannelOption#AUTO_READ} is set to {@code false}, which is <strong>NOT</strong> the
* default. If {@link ChannelOption#AUTO_READ} is set to {@code true} use {@link BackPressureAutoReadHandler} for
* maximal performance.
*/
public final class BackPressureHandler extends ChannelDuplexHandler {

private boolean readPending;
private boolean writable = true;

@Override
public void read(ChannelHandlerContext ctx) throws Exception {
if (writable) {
ctx.read();
} else {
readPending = true;
}
}

@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
writable = ctx.channel().isWritable();
if (writable) {
if (readPending) {
readPending = false;
ctx.read();
}
} else {
ctx.flush();
}
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
if (readPending) {
ctx.read();
}
}

@Override
public boolean isSharable() {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

/**
* Package to control back-pressure.
*/
package io.netty.handler.backpressure;

0 comments on commit 918f61d

Please sign in to comment.