Permalink
Browse files

[pegasus] Netty HTTP/2 client implementation

RB=727325
G=si-core-reviewers
R=dhoa,axu
A=axu
  • Loading branch information...
1 parent 6d1e432 commit a481ef278062c466e81491e68c11699bc38fe869 @ssheng ssheng committed Aug 1, 2016
Showing with 5,475 additions and 1,485 deletions.
  1. +1 −1 build.gradle
  2. +66 −0 r2-core/src/main/java/com/linkedin/r2/transport/common/bridge/common/RequestWithCallback.java
  3. +56 −0 r2-core/src/main/java/com/linkedin/r2/transport/common/bridge/common/ResponseWithCallback.java
  4. +39 −0 r2-core/src/main/java/com/linkedin/r2/transport/http/client/AsyncPoolHandle.java
  5. +1 −1 r2-core/src/main/java/com/linkedin/r2/transport/http/client/AsyncSharedPoolImpl.java
  6. +89 −0 r2-core/src/main/java/com/linkedin/r2/transport/http/client/TimeoutAsyncPoolHandle.java
  7. +182 −0 r2-core/src/test/java/test/r2/transport/http/client/TestTimeoutAsyncPoolHandle.java
  8. +53 −0 r2-int-test/src/test/java/test/r2/integ/AbstractHttp2EchoServiceTest.java
  9. +244 −0 r2-int-test/src/test/java/test/r2/integ/AbstractHttpServerTest.java
  10. +30 −9 r2-int-test/src/test/java/test/r2/integ/AbstractStreamTest.java
  11. +22 −0 r2-int-test/src/test/java/test/r2/integ/EchoHandler.java
  12. +11 −0 r2-int-test/src/test/java/test/r2/integ/HttpProtocolVersion.java
  13. +35 −34 r2-int-test/src/test/java/test/r2/integ/TestClientShutdown.java
  14. +30 −12 r2-int-test/src/test/java/test/r2/integ/TestClientTimeout.java
  15. +68 −0 r2-int-test/src/test/java/test/r2/integ/TestHttp2RestEcho.java
  16. +61 −0 r2-int-test/src/test/java/test/r2/integ/TestHttp2Server.java
  17. +136 −103 r2-int-test/src/test/java/test/r2/integ/TestHttpClient.java
  18. +11 −198 r2-int-test/src/test/java/test/r2/integ/TestHttpServer.java
  19. +1 −1 r2-int-test/src/test/java/test/r2/integ/TestJetty404.java
  20. +21 −10 r2-int-test/src/test/java/test/r2/integ/TestQueryTunnel.java
  21. +49 −35 r2-int-test/src/test/java/test/r2/integ/TestRequestCompression.java
  22. +50 −36 r2-int-test/src/test/java/test/r2/integ/TestResponseCompression.java
  23. +30 −12 r2-int-test/src/test/java/test/r2/integ/TestRestCompressionEcho.java
  24. +86 −67 r2-int-test/src/test/java/test/r2/integ/TestStreamEcho.java
  25. +115 −94 r2-int-test/src/test/java/test/r2/integ/TestStreamRequest.java
  26. +71 −57 r2-int-test/src/test/java/test/r2/integ/TestStreamResponse.java
  27. +11 −2 r2-jetty/src/main/java/com/linkedin/r2/transport/http/server/HttpServerFactory.java
  28. +7 −0 r2-netty/build.gradle
  29. +330 −0 r2-netty/src/main/java/com/linkedin/r2/transport/http/client/AbstractNettyStreamClient.java
  30. +164 −0 r2-netty/src/main/java/com/linkedin/r2/transport/http/client/Http2AlpnHandler.java
  31. +51 −0 r2-netty/src/main/java/com/linkedin/r2/transport/http/client/Http2ChannelPoolHandler.java
  32. +143 −0 r2-netty/src/main/java/com/linkedin/r2/transport/http/client/Http2ClientPipelineInitializer.java
  33. +407 −0 r2-netty/src/main/java/com/linkedin/r2/transport/http/client/Http2FrameListener.java
  34. +200 −0 r2-netty/src/main/java/com/linkedin/r2/transport/http/client/Http2InitializerHandler.java
  35. +334 −0 r2-netty/src/main/java/com/linkedin/r2/transport/http/client/Http2NettyStreamClient.java
  36. +68 −0 r2-netty/src/main/java/com/linkedin/r2/transport/http/client/Http2SchemeHandler.java
  37. +306 −0 r2-netty/src/main/java/com/linkedin/r2/transport/http/client/Http2StreamCodec.java
  38. +153 −0 r2-netty/src/main/java/com/linkedin/r2/transport/http/client/Http2StreamCodecBuilder.java
  39. +82 −0 r2-netty/src/main/java/com/linkedin/r2/transport/http/client/Http2StreamResponseHandler.java
  40. +150 −0 r2-netty/src/main/java/com/linkedin/r2/transport/http/client/Http2UpgradeHandler.java
  41. +55 −23 r2-netty/src/main/java/com/linkedin/r2/transport/http/client/HttpClientFactory.java
  42. +195 −502 r2-netty/src/main/java/com/linkedin/r2/transport/http/client/HttpNettyStreamClient.java
  43. +86 −5 r2-netty/src/main/java/com/linkedin/r2/transport/http/client/NettyRequestAdapter.java
  44. +6 −6 r2-netty/src/main/java/com/linkedin/r2/transport/http/client/RAPClientCodec.java
  45. +140 −0 r2-netty/src/main/java/com/linkedin/r2/transport/http/client/RAPClientPipelineInitializer.java
  46. +10 −9 r2-netty/src/main/java/com/linkedin/r2/transport/http/client/RAPResponseDecoder.java
  47. +6 −6 r2-netty/src/main/java/com/linkedin/r2/transport/http/server/RAPServerCodec.java
  48. +20 −0 r2-netty/src/test/java/com/linkedin/r2/transport/http/client/HttpClientBuilder.java
  49. +201 −0 r2-netty/src/test/java/com/linkedin/r2/transport/http/client/HttpServerBuilder.java
  50. +165 −112 r2-netty/src/test/java/com/linkedin/r2/transport/http/client/TestHttpClientFactory.java
  51. +503 −138 r2-netty/src/test/java/com/linkedin/r2/transport/http/client/TestHttpNettyStreamClient.java
  52. +97 −7 r2-netty/src/test/java/com/linkedin/r2/transport/http/client/TestNettyRequestAdapter.java
  53. +5 −4 r2-netty/src/test/java/com/linkedin/r2/transport/http/client/TestRAPClientCodec.java
  54. +22 −1 r2-sample/src/main/java/com/linkedin/r2/sample/Bootstrap.java
View
@@ -62,7 +62,7 @@ project.ext.externalDependency = [
'log4j2Core': 'org.apache.logging.log4j:log4j-core:2.0.2',
'log4jLog4j2': 'org.apache.logging.log4j:log4j-1.2-api:2.0.2',
'mail': 'javax.mail:mail:1.4.1',
- 'netty': 'io.netty:netty-all:4.0.27.Final',
+ 'netty': 'io.netty:netty-all:4.1.4.Final',
'objenesis': 'org.objenesis:objenesis:1.2',
'parseq': 'com.linkedin.parseq:parseq:1.4.2',
'parseq_tracevis': 'com.linkedin.parseq:parseq-tracevis:2.3.4',
@@ -0,0 +1,66 @@
+/*
+ Copyright (c) 2016 LinkedIn Corp.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+/**
+ * $Id: $
+ */
+
+package com.linkedin.r2.transport.common.bridge.common;
+
+import com.linkedin.r2.message.Request;
+import com.linkedin.r2.transport.http.client.AsyncPoolHandle;
+import com.linkedin.util.ArgumentUtil;
+
+
+/**
+ * Simple wrapper of an R2 {@link Request} implementation and a {@link TransportCallback}
+ *
+ * @param <R> An implementation of R2 {@link Request}
+ * @param <C> An implementation of {@link TransportCallback}
+ * @param <H> An implementation of {@link AsyncPoolHandle}
+ */
+public class RequestWithCallback<R extends Request, C extends TransportCallback<?>, H extends AsyncPoolHandle<?>>
+{
+ private final R _request;
+ private final C _callback;
+ private final H _handle;
+
+ public RequestWithCallback(R request, C callback, H handle)
+ {
+ ArgumentUtil.notNull(request, "request");
+ ArgumentUtil.notNull(callback, "callback");
+ ArgumentUtil.notNull(handle, "handle");
+
+ _request = request;
+ _callback = callback;
+ _handle = handle;
+ }
+
+ public R request()
+ {
+ return _request;
+ }
+
+ public C callback()
+ {
+ return _callback;
+ }
+
+ public H handle()
+ {
+ return _handle;
+ }
+}
@@ -0,0 +1,56 @@
+/*
+ Copyright (c) 2016 LinkedIn Corp.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+/**
+ * $Id: $
+ */
+
+package com.linkedin.r2.transport.common.bridge.common;
+
+import com.linkedin.r2.message.Response;
+import com.linkedin.util.ArgumentUtil;
+
+
+/**
+ * Simple wrapper of an R2 {@link Response} implementation and a {@link TransportCallback}
+ *
+ * @param <R> An implementation of R2 {@link Response}
+ * @param <C> An implementation of {@link TransportCallback}
+ */
+public class ResponseWithCallback<R extends Response, C extends TransportCallback<?>>
+{
+ private final R _response;
+ private final C _callback;
+
+ public ResponseWithCallback(R response, C callback)
+ {
+ ArgumentUtil.notNull(response, "response");
+ ArgumentUtil.notNull(callback, "callback");
+
+ _response = response;
+ _callback = callback;
+ }
+
+ public R response()
+ {
+ return _response;
+ }
+
+ public C callback()
+ {
+ return _callback;
+ }
+}
@@ -0,0 +1,39 @@
+/*
+ Copyright (c) 2016 LinkedIn Corp.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+/**
+ * $Id: $
+ */
+
+package com.linkedin.r2.transport.http.client;
+
+/**
+ * Provides a handle for each {@link AsyncPool} object to be returned or disposed
+ * back to the pool they were created.
+ */
+public interface AsyncPoolHandle<T>
+{
+ /**
+ * Releases the handle and returns the object back to the pool
+ */
+ void release();
+
+ /**
+ * Gets the reference to the {@link AsyncPool} where the object was originally created
+ * @return Reference to the async pool
+ */
+ AsyncPool<T> pool();
+}
@@ -600,7 +600,7 @@ private boolean doDispose(T item)
*/
private void doDestroy(final T item, final boolean bad, SimpleCallback callback)
{
- LOG.debug("{}: disposing an item {}", _name, item);
+ LOG.debug("{}: destroying an item {}", _name, item);
if (bad)
{
_statsTracker.incrementBadDestroyed();
@@ -0,0 +1,89 @@
+/*
+ Copyright (c) 2016 LinkedIn Corp.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+/**
+ * $Id: $
+ */
+
+package com.linkedin.r2.transport.http.client;
+
+import com.linkedin.r2.util.Timeout;
+import com.linkedin.r2.util.TimeoutExecutor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Wraps an {@link AsyncPool} object with an associated timeout. Provides an interface to return or
+ * dispose the pool object by invoking #put or #dispose respectively. If either #put or #dispose method
+ * is invoked prior to timeout expires, the timeout is cancelled. Otherwise, when timeout expires, the
+ * wrapped item is disposed to the async pool and subsequent invocations to #put and #dispose become no-op.
+ *
+ * @author Sean Sheng
+ * @param <T>
+ */
+public class TimeoutAsyncPoolHandle<T> implements AsyncPoolHandle<T>, TimeoutExecutor
+{
+ private final AsyncPool<T> _pool;
+ private final Timeout<T> _timeout;
+
+ private volatile boolean _error = false;
+
+ public TimeoutAsyncPoolHandle(
+ AsyncPool<T> pool, ScheduledExecutorService scheduler, long timeout, TimeUnit unit, T item)
+ {
+ _pool = pool;
+ _timeout = new Timeout<>(scheduler, timeout, unit, item);
+ _timeout.addTimeoutTask(() -> _pool.dispose(item));
+ }
+
+ public TimeoutAsyncPoolHandle<T> error()
+ {
+ _error = true;
+ return this;
+ }
+
+ @Override
+ public void release()
+ {
+ T item = _timeout.getItem();
+ if (item == null)
+ {
+ return;
+ }
+
+ if (_error)
+ {
+ _pool.dispose(item);
+ }
+ else
+ {
+ _pool.put(item);
+ }
+ }
+
+ @Override
+ public AsyncPool<T> pool()
+ {
+ return _pool;
+ }
+
+ @Override
+ public void addTimeoutTask(Runnable task)
+ {
+ _timeout.addTimeoutTask(task);
+ }
+}
Oops, something went wrong.

0 comments on commit a481ef2

Please sign in to comment.