Skip to content

Commit

Permalink
1. New Netty Http2 Channel Abstraction APIs we used to unify the Nett…
Browse files Browse the repository at this point in the history
…y Channel Pipeline has been deprecated from Netty 4.1.37 onwards and refactored in to lightweight APIs. Fix the deprecated APIs before the larger rollout and upgrade to latest Netty 4.1.39

2. When the deprecated APIs are refactored in to new light weight APIs - Unfortunally Netty did miss to special handle the newly created APIS in Http2ClientUpgradeCodec and so did not correctly add Http2MultiplexHandler to the pipeline before calling Http2FrameCodec onHttpClientUpgrade(...). This did lead to the situation that we did not correctly receive the event on the Http2MultiplexHandler and so did not correctly created the Http2StreamChannel for the upgrade stream. Because of this it ended up with an NPE if a frame was dispatched to the upgrade stream later on.

I have created a Netty Pull Request (PR) in github to fix this issue and has been acknwoledged by Norman (One of the Netty maintainers). The details of the bug and the pull request can be found @ netty/netty#9495
For now - I have included the fixed Http2ClientUpgradeCodec in pegasus until the next Netty release is available - to help continue the rollout of this feature !
3. In the previous release - due to known issue with Http2 flowcontrol in Netty - we have disabled backpressure for Http2 streams. Since the flowcontrol is taken care in Netty 4.1.39 - this limitation has been addressed and the new Http2 code pipeline is fully functional now !
4. All  the  special handling of Http2Stream channel test cases has been reverted as the known issues has been taken care now. Now all the integration test cases works for both old and new code path without any special handling - :)

RB=1775062
BUG=SI-11340
G=si-core-reviewers
R=fcapponi,ssheng,bsoetarm,crzhang
A=fcapponi,bsoetarm
  • Loading branch information
nizarm committed Aug 27, 2019
1 parent 0bf6bd3 commit cc9ebef
Show file tree
Hide file tree
Showing 10 changed files with 218 additions and 100 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
27.4.4
------

27.4.3
------
(RB=1775062)
Upgrade to Netty 4.1.39, Fix Deprecated Http2 APIs, Fix Netty Bug

27.4.2
-------
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ project.ext.externalDependency = [
'log4j2Core': 'org.apache.logging.log4j:log4j-core:2.0.2',
'log4jLog4j2': 'org.apache.logging.log4j:log4j-1.2-api:2.0.2',
'mail': 'javax.mail:mail:1.4.1',
'netty': 'io.netty:netty-all:4.1.34.Final',
'netty': 'io.netty:netty-all:4.1.39.Final',
'objenesis': 'org.objenesis:objenesis:1.2',
'parseq': 'com.linkedin.parseq:parseq:2.6.31',
'parseq_tracevis': 'com.linkedin.parseq:parseq-tracevis:2.6.31',
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=27.4.2
version=27.4.3
sonatypeUsername=please_set_in_home_dir_if_uploading_to_maven_central
sonatypePassword=please_set_in_home_dir_if_uploading_to_maven_central
org.gradle.configureondemand=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,8 @@ public void testTimeoutDuringResponse() throws Exception
}

@Test
// TODO need to adjust this test case to generalize for Http2 Stream Channel as well
// we do not need to throw the timeout exception when the response is already reached the client stack...
public void testReadAfterTimeout() throws Exception
{
if(isHttp2StreamBasedChannel())
return;
StreamRequest request = new StreamRequestBuilder(_clientProvider.createHttpURI(_port, NORMAL_URI)).build(EntityStreams.emptyStream());
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<StreamResponse> response = new AtomicReference<StreamResponse>();
Expand Down Expand Up @@ -157,60 +153,6 @@ public void onError(Throwable ex)
Assert.assertTrue(rootCause instanceof TimeoutException);
}

@Test
// The below test case work only in the new code path...we are not timing out if the entire request is already reached
// client side
public void testReadAfterTimeoutHttp2StreamChannel() throws Exception
{
if(!isHttp2StreamBasedChannel())
return;
StreamRequest request = new StreamRequestBuilder(_clientProvider.createHttpURI(_port, NORMAL_URI)).build(EntityStreams.emptyStream());
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<StreamResponse> response = new AtomicReference<StreamResponse>();
_client.streamRequest(request, new Callback<StreamResponse>()
{
@Override
public void onError(Throwable e)
{
latch.countDown();
}

@Override
public void onSuccess(StreamResponse result)
{
response.set(result);
latch.countDown();
}
});
latch.await(5000, TimeUnit.MILLISECONDS);
Assert.assertNotNull(response.get());

// let it timeout before we read
Thread.sleep(5000);

final AtomicReference<Throwable> throwable = new AtomicReference<Throwable>();
final AtomicReference<Boolean> readDone = new AtomicReference<Boolean>(false);
final CountDownLatch errorLatch = new CountDownLatch(1);
Reader reader = new DrainReader()
{
@Override
public void onError(Throwable ex)
{
throwable.set(ex);
errorLatch.countDown();
}

public void onDone()
{
readDone.set(true);
}
};
response.get().getEntityStream().setReader(reader);
errorLatch.await(5000, TimeUnit.MILLISECONDS);
Assert.assertNull(throwable.get());
Assert.assertTrue(readDone.get());
}

private class DelayBeforeResponseHandler implements StreamRequestHandler
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,6 @@ private void testNormalEcho(long bytesNum, URI uri) throws Exception
@Test
public void testBackPressureEcho() throws Exception
{
if(isHttp2StreamBasedChannel()) // TODO Currently back pressure is not supported
return;

TimedBytesWriter writer = new TimedBytesWriter(SMALL_BYTES_NUM, BYTE);
StreamRequest request = new StreamRequestBuilder(_clientProvider.createHttpURI(_port, ECHO_URI))
.build(EntityStreams.newEntityStream(writer));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,6 @@ public void onSuccess(StreamResponse result)
@Test
public void testBackPressure() throws Exception
{
if(isHttp2StreamBasedChannel()) // TODO Currently back pressure is not supported
return;

final long totalBytes = SMALL_BYTES_NUM;
TimedBytesWriter writer = new TimedBytesWriter(totalBytes, BYTE);
EntityStream entityStream = EntityStreams.newEntityStream(writer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,24 @@
import com.linkedin.r2.netty.handler.http2.Http2MessageEncoders;
import com.linkedin.r2.netty.handler.http2.Http2ProtocolUpgradeHandler;
import com.linkedin.r2.netty.handler.http2.UnsupportedHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpClientUpgradeHandler;
import io.netty.handler.codec.http2.Http2ClientUpgradeCodec;
import io.netty.handler.codec.http2.Http2MultiplexCodec;
import io.netty.handler.codec.http2.Http2MultiplexCodecBuilder;
import io.netty.handler.codec.http2.Http2ClientUpgradeCodecTemporaryFix;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2FrameCodec;
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
import io.netty.handler.codec.http2.Http2MultiplexHandler;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.ssl.ApplicationProtocolConfig;
import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.IdentityCipherSuiteFilter;
import io.netty.handler.ssl.JdkSslContext;
import io.netty.handler.ssl.SslContext;

import java.util.Arrays;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLException;
Expand All @@ -59,27 +61,27 @@
* {@link SessionResumptionSslHandler} and {@link Http2AlpnHandler} to perform application level
* protocol negotiation. If SSL handshake or ALPN failed to negotiate HTTP/2, appropriate failure
* exception will be set to the initialization {@link ChannelPromise}. If SSL handshake and ALPN
* succeed, {@link Http2MultiplexCodec} is added and the pipeline should be setup with the following
* handlers.
* succeed, {@link Http2FrameCodec} and {@link Http2MultiplexHandler}
* are added and the pipeline should be setup with the following handlers.
*
* DefaultChannelPipeline {
* (sslHandler = {@link io.netty.handler.ssl.SslHandler}),
* (CertificateHandler = {@link CertificateHandler}),
* (sslHandshakeTimingHandler = {@link SslHandshakeTimingHandler}),
* (Http2MultiplexCodec#0 = {@link io.netty.handler.codec.http2.Http2MultiplexCodec})
* (Http2FrameCodec = {@link Http2FrameCodec})
* }
*
* During clear text channel initialization, the channel pipeline is first configured with
* {@link HttpClientCodec}, {@link Http2ClientUpgradeCodec}, and {@link Http2ProtocolUpgradeHandler}.
* {@link HttpClientCodec}, {@link Http2ClientUpgradeCodecTemporaryFix}, and {@link Http2ProtocolUpgradeHandler}.
* An upgrade request is sent immediately upon the channel becoming active. If upgrade to
* HTTP/2 fails, appropriate failure exception will be set to the initialization {@link ChannelPromise}.
* If upgrade succeed, {@link Http2MultiplexCodec} is added and the pipeline should be setup with
* the following handlers.
* If upgrade succeed, {@link Http2MultiplexHandler} is added
* and the pipeline should be setup with the following handlers.
*
* DefaultChannelPipeline{
* (HttpClientCodec#0 = {@link io.netty.handler.codec.http.HttpClientCodec}),
* (HttpClientUpgradeHandler#0 = {@link io.netty.handler.codec.http.HttpClientUpgradeHandler}),
* (Http2MultiplexCodec#0 = {@link io.netty.handler.codec.http2.Http2MultiplexCodec})
* (HttpClientCodec#0 = {@link HttpClientCodec}),
* (HttpClientUpgradeHandler#0 = {@link HttpClientUpgradeHandler}),
* (Http2MultiplexHandler#0 = {@link Http2MultiplexHandler})
* }
*
* Common to both SSL and clear text, HTTP/2 streams are represented as child channel of the parent
Expand All @@ -102,7 +104,7 @@
* Remote created streams (even number streams) are not supported on the client side. The pipeline
* of remote created streams are setup with a single handler to log errors.
*
* Http2MultiplexCodec$DefaultHttp2StreamChannel$1{
* Http2MultiplexHandler$DefaultHttp2StreamChannel$1{
* (unsupportedHandler = {@link UnsupportedHandler})
* }
*
Expand All @@ -113,7 +115,6 @@ class Http2ChannelInitializer extends ChannelInitializer<NioSocketChannel>
{
private static final long MAX_INITIAL_STREAM_WINDOW_SIZE = 8 * 1024 * 1024;
private static final boolean IS_CLIENT = true;

private final SSLContext _sslContext;
private final SSLParameters _sslParameters;
private final int _maxInitialLineLength;
Expand Down Expand Up @@ -192,12 +193,16 @@ private void configureClearText(NioSocketChannel channel)
{
final HttpClientCodec sourceCodec = new HttpClientCodec(_maxInitialLineLength, _maxHeaderSize, _maxChunkSize);

Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(
Http2MultiplexCodecBuilder
.forClient(new UnsupportedHandler())
UnsupportedHandler unsupportedHandler = new UnsupportedHandler();
Http2MultiplexHandler multiplexHandler = new Http2MultiplexHandler(unsupportedHandler, unsupportedHandler);

Http2ClientUpgradeCodecTemporaryFix upgradeCodec = new Http2ClientUpgradeCodecTemporaryFix(
(Http2ConnectionHandler) Http2FrameCodecBuilder
.forClient()
.initialSettings(createHttp2Settings())
.withUpgradeStreamHandler(new ChannelInboundHandlerAdapter())
.build());
.build(),
multiplexHandler
);

final ChannelPromise upgradePromise = channel.newPromise();
channel.attr(NettyChannelAttributes.INITIALIZATION_FUTURE).set(upgradePromise);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,9 @@ public void onDataAvailable(ByteString data)

if (_bufferedBytes > BUFFER_HIGH_WATER_MARK && _ctx.channel().config().isAutoRead())
{
//TODO: Currently Netty's Http2StreamChannel is not respecting channel.config().autoRead() property. So the
// flow control/back pressure functionality is currently disabled for Http2StreamChannel as a known limitation.
if (!(_ctx.channel() instanceof Http2StreamChannel))
{
_ctx.channel().config().setAutoRead(false);
}
else
{
LOG.warn("Flow control is not currently available in Http2StreamChannel !");
}
_ctx.channel().config().setAutoRead(false);
}


if (_wh != null)
{
doWrite();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.Http2MultiplexCodecBuilder;
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
import io.netty.handler.codec.http2.Http2MultiplexHandler;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;
Expand Down Expand Up @@ -62,10 +63,11 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol)
switch (protocol)
{
case ApplicationProtocolNames.HTTP_2:
ctx.pipeline().addLast(Http2MultiplexCodecBuilder
.forClient(new UnsupportedHandler())
ctx.pipeline().addLast(Http2FrameCodecBuilder
.forClient()
.initialSettings(_http2Settings)
.build());
ctx.pipeline().addLast(new Http2MultiplexHandler(new UnsupportedHandler()));
_alpnPromise.setSuccess();
break;
default:
Expand Down

0 comments on commit cc9ebef

Please sign in to comment.