Skip to content

Commit 2f46354

Browse files
committed
[netty] Add simple chain for handling http requests
1 parent 83e99ac commit 2f46354

File tree

7 files changed

+439
-0
lines changed

7 files changed

+439
-0
lines changed

java/server/server.iml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
<orderEntry type="library" name="jaeger" level="project" />
2020
<orderEntry type="library" name="jetty" level="project" />
2121
<orderEntry type="library" name="junit" level="project" />
22+
<orderEntry type="library" name="netty" level="project" />
2223
<orderEntry type="library" name="objenesis" level="project" />
2324
<orderEntry type="library" name="okhttp" level="project" />
2425
<orderEntry type="library" name="opentracing" level="project" />
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
java_library(
2+
name = "server",
3+
srcs = glob(["*.java"]),
4+
deps = [
5+
"//java/client/src/org/openqa/selenium/remote/http",
6+
"//third_party/java/guava",
7+
"//third_party/java/netty:netty-all",
8+
],
9+
)
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Licensed to the Software Freedom Conservancy (SFC) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The SFC licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.openqa.selenium.netty.server;
19+
20+
import static org.openqa.selenium.remote.http.Contents.utf8String;
21+
22+
import org.openqa.selenium.remote.http.Contents;
23+
import org.openqa.selenium.remote.http.HttpHandler;
24+
import org.openqa.selenium.remote.http.HttpResponse;
25+
26+
import io.netty.bootstrap.ServerBootstrap;
27+
import io.netty.channel.Channel;
28+
import io.netty.channel.EventLoopGroup;
29+
import io.netty.channel.nio.NioEventLoopGroup;
30+
import io.netty.channel.socket.nio.NioServerSocketChannel;
31+
import io.netty.handler.logging.LogLevel;
32+
import io.netty.handler.logging.LoggingHandler;
33+
34+
import java.util.Objects;
35+
36+
public class NettyServer {
37+
38+
private HttpHandler handler;
39+
40+
public NettyServer(HttpHandler handler) throws InterruptedException {
41+
this.handler = Objects.requireNonNull(handler, "Handler to use must be set.");
42+
43+
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
44+
EventLoopGroup workerGroup = new NioEventLoopGroup();
45+
try {
46+
ServerBootstrap b = new ServerBootstrap();
47+
b.group(bossGroup, workerGroup)
48+
.channel(NioServerSocketChannel.class)
49+
.handler(new LoggingHandler(LogLevel.INFO))
50+
.childHandler(new SeleniumHttpInitializer(handler));
51+
52+
Channel ch = b.bind(4444).sync().channel();
53+
54+
System.err.println("Open your web browser and navigate to http://127.0.0.1:4444/");
55+
56+
ch.closeFuture().sync();
57+
} finally {
58+
bossGroup.shutdownGracefully();
59+
workerGroup.shutdownGracefully();
60+
}
61+
}
62+
63+
public NettyServer start() {
64+
return this;
65+
}
66+
67+
public static void main(String[] args) throws InterruptedException {
68+
NettyServer server = new NettyServer(req -> {
69+
System.out.println(Contents.string(req));
70+
HttpResponse res = new HttpResponse();
71+
res.setContent(utf8String("Hello, World!\n"));
72+
73+
return res;
74+
});
75+
76+
server.start();
77+
}
78+
}
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
// Licensed to the Software Freedom Conservancy (SFC) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The SFC licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.openqa.selenium.netty.server;
19+
20+
import static org.openqa.selenium.remote.http.Contents.bytes;
21+
22+
import com.google.common.io.ByteStreams;
23+
24+
import org.openqa.selenium.remote.http.Contents;
25+
import org.openqa.selenium.remote.http.HttpMethod;
26+
import org.openqa.selenium.remote.http.HttpRequest;
27+
28+
import io.netty.buffer.ByteBuf;
29+
import io.netty.buffer.ByteBufInputStream;
30+
import io.netty.channel.ChannelHandlerContext;
31+
import io.netty.channel.SimpleChannelInboundHandler;
32+
import io.netty.handler.codec.http.FullHttpRequest;
33+
import io.netty.handler.codec.http.HttpContent;
34+
import io.netty.handler.codec.http.HttpObject;
35+
import io.netty.handler.codec.http.LastHttpContent;
36+
37+
import java.io.ByteArrayOutputStream;
38+
import java.io.IOException;
39+
import java.io.InputStream;
40+
import java.io.PipedInputStream;
41+
import java.io.PipedOutputStream;
42+
import java.io.UncheckedIOException;
43+
import java.util.concurrent.ExecutorService;
44+
import java.util.concurrent.Executors;
45+
import java.util.logging.Logger;
46+
47+
48+
class RequestConverter extends SimpleChannelInboundHandler<HttpObject> {
49+
50+
private static final Logger LOG = Logger.getLogger(RequestConverter.class.getName());
51+
private static final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor();
52+
private PipedOutputStream out;
53+
54+
@Override
55+
protected void channelRead0(
56+
ChannelHandlerContext ctx,
57+
HttpObject msg) throws Exception {
58+
LOG.fine("Incoming message: " + msg);
59+
60+
if (msg instanceof FullHttpRequest) {
61+
LOG.fine("Is full http request: " + msg);
62+
reset();
63+
FullHttpRequest nettyRequest = (FullHttpRequest) msg;
64+
HttpRequest req = createRequest(nettyRequest);
65+
66+
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
67+
ByteBufInputStream bis = new ByteBufInputStream(nettyRequest.content())) {
68+
ByteStreams.copy(bis, bos);
69+
byte[] bytes = bos.toByteArray();
70+
req.setContent(bytes(bytes));
71+
}
72+
73+
ctx.fireChannelRead(req);
74+
ctx.flush();
75+
return;
76+
}
77+
78+
if (msg instanceof io.netty.handler.codec.http.HttpRequest) {
79+
LOG.fine("Is start of http request: " + msg);
80+
reset();
81+
io.netty.handler.codec.http.HttpRequest nettyRequest =
82+
(io.netty.handler.codec.http.HttpRequest) msg;
83+
84+
HttpRequest req = new HttpRequest(
85+
HttpMethod.valueOf(nettyRequest.method().name()),
86+
nettyRequest.uri());
87+
88+
nettyRequest.headers().entries().stream()
89+
.filter(entry -> entry.getKey() != null)
90+
.forEach(entry -> req.addHeader(entry.getKey(), entry.getValue()));
91+
92+
out = new PipedOutputStream();
93+
InputStream in = new PipedInputStream(out);
94+
95+
req.setContent(Contents.memoize(() -> in));
96+
ctx.fireChannelRead(req);
97+
ctx.flush();
98+
}
99+
100+
if (msg instanceof HttpContent) {
101+
ByteBuf buf = ((HttpContent) msg).content().retain();
102+
EXECUTOR.submit(() -> {
103+
try (InputStream inputStream = new ByteBufInputStream(buf)) {
104+
ByteStreams.copy(inputStream, out);
105+
} catch (IOException e) {
106+
throw new UncheckedIOException(e);
107+
} finally {
108+
buf.release();
109+
}
110+
});
111+
}
112+
113+
if (msg instanceof LastHttpContent) {
114+
LOG.info("Closing input pipe.");
115+
EXECUTOR.submit(() -> {
116+
try {
117+
out.close();
118+
} catch (IOException e) {
119+
throw new UncheckedIOException(e);
120+
}
121+
});
122+
}
123+
}
124+
125+
@Override
126+
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
127+
ctx.flush();
128+
reset();
129+
super.channelReadComplete(ctx);
130+
}
131+
132+
@Override
133+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
134+
cause.printStackTrace();
135+
ctx.close();
136+
reset();
137+
}
138+
139+
private HttpRequest createRequest(io.netty.handler.codec.http.HttpRequest nettyRequest) {
140+
HttpRequest req = new HttpRequest(
141+
HttpMethod.valueOf(nettyRequest.method().name()),
142+
nettyRequest.uri());
143+
144+
nettyRequest.headers().entries().stream()
145+
.filter(entry -> entry.getKey() != null)
146+
.forEach(entry -> req.addHeader(entry.getKey(), entry.getValue()));
147+
148+
return req;
149+
}
150+
151+
private void reset() throws Exception {
152+
}
153+
154+
155+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Licensed to the Software Freedom Conservancy (SFC) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The SFC licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.openqa.selenium.netty.server;
19+
20+
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
21+
import static io.netty.handler.codec.http.HttpHeaderNames.TRANSFER_ENCODING;
22+
import static io.netty.handler.codec.http.HttpHeaderValues.CHUNKED;
23+
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
24+
25+
import org.openqa.selenium.remote.http.HttpResponse;
26+
27+
import io.netty.buffer.Unpooled;
28+
import io.netty.channel.ChannelFuture;
29+
import io.netty.channel.ChannelHandlerContext;
30+
import io.netty.channel.ChannelOutboundHandlerAdapter;
31+
import io.netty.channel.ChannelPromise;
32+
import io.netty.handler.codec.http.DefaultFullHttpResponse;
33+
import io.netty.handler.codec.http.DefaultHttpContent;
34+
import io.netty.handler.codec.http.DefaultHttpResponse;
35+
import io.netty.handler.codec.http.HttpChunkedInput;
36+
import io.netty.handler.codec.http.HttpResponseStatus;
37+
import io.netty.handler.stream.ChunkedStream;
38+
39+
import java.io.InputStream;
40+
41+
public class ResponseConverter extends ChannelOutboundHandlerAdapter {
42+
43+
private static final int CHUNK_SIZE = 1024 * 1024;
44+
45+
@Override
46+
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
47+
throws Exception {
48+
if (!(msg instanceof HttpResponse)) {
49+
return;
50+
}
51+
52+
HttpResponse seResponse = (HttpResponse) msg;
53+
54+
// We may not know how large the response is, but figure it out if we can.
55+
byte[] ary = new byte[CHUNK_SIZE];
56+
InputStream is = seResponse.getContent().get();
57+
int byteCount = is.read(ary);
58+
59+
DefaultHttpResponse first;
60+
if (byteCount < CHUNK_SIZE) {
61+
is.close();
62+
first = new DefaultFullHttpResponse(
63+
HTTP_1_1,
64+
HttpResponseStatus.valueOf(seResponse.getStatus()),
65+
Unpooled.wrappedBuffer(ary));
66+
first.headers().addInt(CONTENT_LENGTH, byteCount);
67+
copyHeaders(seResponse, first);
68+
ctx.write(first);
69+
} else {
70+
first = new DefaultHttpResponse(
71+
HTTP_1_1,
72+
HttpResponseStatus.valueOf(seResponse.getStatus()));
73+
first.headers().set(TRANSFER_ENCODING, CHUNKED);
74+
copyHeaders(seResponse, first);
75+
ctx.write(first);
76+
77+
// We need to write the first response.
78+
ctx.write(new DefaultHttpContent(Unpooled.wrappedBuffer(ary)));
79+
80+
HttpChunkedInput writer = new HttpChunkedInput(new ChunkedStream(is));
81+
ChannelFuture future = ctx.write(writer);
82+
future.addListener(ignored -> {
83+
is.close();
84+
ctx.flush();
85+
});
86+
}
87+
}
88+
89+
private void copyHeaders(HttpResponse seResponse, DefaultHttpResponse first) {
90+
for (String name : seResponse.getHeaderNames()) {
91+
if (CONTENT_LENGTH.contentEquals(name) || TRANSFER_ENCODING.contentEquals(name)) {
92+
continue;
93+
}
94+
for (String value : seResponse.getHeaders(name)) {
95+
first.headers().add(name, value);
96+
}
97+
}
98+
}
99+
}

0 commit comments

Comments
 (0)