diff --git a/core/pom.xml b/core/pom.xml
index 5ba7c1f..1951a21 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -19,14 +19,34 @@
19.0
- io.vertx
- vertx-web
- 3.2.1
+ org.json
+ json
+ 20160212
- io.vertx
- vertx-rx-java
- 3.2.1
+ io.netty
+ netty-common
+ 4.1.0.CR6
+
+
+ io.netty
+ netty-buffer
+ 4.1.0.CR6
+
+
+ io.netty
+ netty-handler
+ 4.1.0.CR6
+
+
+ io.netty
+ netty-transport
+ 4.1.0.CR6
+
+
+ io.netty
+ netty-codec-http
+ 4.1.0.CR6
com.github.davidmoten
@@ -68,18 +88,6 @@
objenesis
2.2
-
- junit
- junit
- 4.12
- test
-
-
- org.mockito
- mockito-all
- 1.10.19
- test
-
io.reactivex
rxjava
@@ -95,5 +103,23 @@
logback-classic
1.1.3
+
+ junit
+ junit
+ 4.12
+ test
+
+
+ org.mockito
+ mockito-all
+ 1.10.19
+ test
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+ 2.7.3
+ test
+
diff --git a/core/src/main/java/org/chodavarapu/datamill/http/Client.java b/core/src/main/java/org/chodavarapu/datamill/http/Client.java
index d5b2ef2..97b7e42 100644
--- a/core/src/main/java/org/chodavarapu/datamill/http/Client.java
+++ b/core/src/main/java/org/chodavarapu/datamill/http/Client.java
@@ -1,6 +1,8 @@
package org.chodavarapu.datamill.http;
import com.google.common.base.Joiner;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
import org.chodavarapu.datamill.http.impl.*;
import org.chodavarapu.datamill.values.Value;
import org.slf4j.Logger;
@@ -12,10 +14,13 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLConnection;
+import java.net.URLEncoder;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@@ -29,7 +34,8 @@ public class Client {
public Observable request(Function builder) {
Request request = builder.apply(new RequestBuilderImpl());
- return request(request.method(), request.headers(), request.uri(), request.uriParameters(), request.options(), request.entity());
+ return request(request.method(), request.headers(), request.uri(), request.uriParameters(),
+ request.queryParameters(), request.options(), request.entity());
}
public Observable request(Method method, Map headers, String uri, Value entity) {
@@ -41,20 +47,23 @@ protected URLConnection createConnection(String uri) throws IOException {
}
public Observable request(Method method, Map headers, String uri, Entity entity) {
- return request(method, headers, uri, null, null, entity);
+ return request(method, headers != null ? Multimaps.forMap(headers) : null, uri, null, null, null, entity);
}
public Observable request(
Method method,
- Map headers,
+ Multimap headers,
String uri,
Map uriParameters,
+ Multimap queryParameters,
Map options,
Entity entity) {
if (uriParameters != null && uriParameters.size() > 0) {
uri = uriBuilder.build(uri, uriParameters);
}
+ uri = appendQueryParameters(uri, queryParameters);
+
final String composedUri = uri;
return Async.fromCallable(() -> {
@@ -71,7 +80,7 @@ public Observable request(
}
if (headers != null) {
- for (Map.Entry header : headers.entrySet()) {
+ for (Map.Entry header : headers.entries()) {
httpConnection.addRequestProperty(header.getKey(), header.getValue());
}
}
@@ -83,7 +92,7 @@ public Observable request(
logger.debug("Making HTTP request {} {}", method.name(), composedUri);
if (headers != null && logger.isDebugEnabled()) {
logger.debug(" HTTP request headers:");
- for (Map.Entry header : headers.entrySet()) {
+ for (Map.Entry header : headers.entries()) {
logger.debug(" {}: {}", header.getKey(), header.getValue());
}
}
@@ -105,6 +114,33 @@ public Observable request(
}, Schedulers.io());
}
+ private String appendQueryParameters(String uri, Multimap queryParameters) {
+ if (queryParameters != null && queryParameters.size() > 0) {
+ try {
+ StringBuilder queryBuilder = new StringBuilder("?");
+ Iterator> iterator = queryParameters.entries().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry parameter = iterator.next();
+
+ queryBuilder.append(URLEncoder.encode(parameter.getKey(), "UTF-8"));
+ queryBuilder.append('=');
+
+ if (parameter.getValue() != null) {
+ queryBuilder.append(URLEncoder.encode(parameter.getValue()));
+ }
+
+ if (iterator.hasNext()) {
+ queryBuilder.append('&');
+ }
+ }
+
+ uri = uri + queryBuilder.toString();
+ } catch (UnsupportedEncodingException e) {
+ }
+ }
+ return uri;
+ }
+
private void writeEntityOutOverConnection(Entity entity, HttpURLConnection httpConnection) throws IOException {
httpConnection.setDoOutput(true);
OutputStream outputStream = httpConnection.getOutputStream();
diff --git a/core/src/main/java/org/chodavarapu/datamill/http/Method.java b/core/src/main/java/org/chodavarapu/datamill/http/Method.java
index ec4c569..5078997 100644
--- a/core/src/main/java/org/chodavarapu/datamill/http/Method.java
+++ b/core/src/main/java/org/chodavarapu/datamill/http/Method.java
@@ -7,7 +7,7 @@
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
public enum Method {
- OPTIONS, GET, HEAD, POST, PUT, DELETE, TRACE, CONNECT, PATCH;
+ OPTIONS, GET, HEAD, POST, PUT, DELETE, TRACE, CONNECT, PATCH, UNKNOWN;
private static final Set methods =
EnumSet.allOf(Method.class);
diff --git a/core/src/main/java/org/chodavarapu/datamill/http/Request.java b/core/src/main/java/org/chodavarapu/datamill/http/Request.java
index f5afad6..f0ca91c 100644
--- a/core/src/main/java/org/chodavarapu/datamill/http/Request.java
+++ b/core/src/main/java/org/chodavarapu/datamill/http/Request.java
@@ -1,9 +1,9 @@
package org.chodavarapu.datamill.http;
+import com.google.common.collect.Multimap;
import org.chodavarapu.datamill.values.Value;
import java.util.Map;
-import java.util.Optional;
/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
@@ -13,16 +13,22 @@ public interface Request {
Entity entity();
- Map headers();
+ Multimap headers();
- Optional header(String header);
+ Value firstHeader(String header);
- Optional header(RequestHeader header);
+ Value firstHeader(RequestHeader header);
+
+ Value firstQueryParameter(String name);
Method method();
Map options();
+ Multimap queryParameters();
+
+ String rawMethod();
+
String uri();
Value uriParameter(String parameter);
diff --git a/core/src/main/java/org/chodavarapu/datamill/http/RequestBuilder.java b/core/src/main/java/org/chodavarapu/datamill/http/RequestBuilder.java
index fd1b9c6..f706817 100644
--- a/core/src/main/java/org/chodavarapu/datamill/http/RequestBuilder.java
+++ b/core/src/main/java/org/chodavarapu/datamill/http/RequestBuilder.java
@@ -14,6 +14,7 @@ public interface RequestBuilder {
RequestBuilder header(RequestHeader header, String value);
RequestBuilder method(Method method);
RequestBuilder method(String method);
+ RequestBuilder queryParameter(String name, String value);
RequestBuilder uri(String uri);
RequestBuilder uriParameter(String name, T value);
}
diff --git a/core/src/main/java/org/chodavarapu/datamill/http/Response.java b/core/src/main/java/org/chodavarapu/datamill/http/Response.java
index e3d14c0..46e773b 100644
--- a/core/src/main/java/org/chodavarapu/datamill/http/Response.java
+++ b/core/src/main/java/org/chodavarapu/datamill/http/Response.java
@@ -1,12 +1,12 @@
package org.chodavarapu.datamill.http;
-import java.util.Map;
+import com.google.common.collect.Multimap;
/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
public interface Response {
Entity entity();
- Map headers();
+ Multimap headers();
Status status();
}
diff --git a/core/src/main/java/org/chodavarapu/datamill/http/Server.java b/core/src/main/java/org/chodavarapu/datamill/http/Server.java
index 70c4890..b6ba374 100644
--- a/core/src/main/java/org/chodavarapu/datamill/http/Server.java
+++ b/core/src/main/java/org/chodavarapu/datamill/http/Server.java
@@ -1,30 +1,41 @@
package org.chodavarapu.datamill.http;
-import com.github.davidmoten.rx.Obs;
-import io.vertx.core.AbstractVerticle;
-import io.vertx.core.Future;
-import io.vertx.core.Vertx;
-import io.vertx.core.VertxOptions;
-import io.vertx.core.buffer.Buffer;
-import io.vertx.core.http.HttpServer;
-import io.vertx.core.http.HttpServerRequest;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.util.SelfSignedCertificate;
import org.chodavarapu.datamill.http.builder.RouteBuilder;
-import org.chodavarapu.datamill.http.impl.ServerRequestImpl;
+import org.chodavarapu.datamill.http.impl.ClientToServerChannelInitializer;
import org.chodavarapu.datamill.http.impl.RouteBuilderImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import rx.Observable;
-import rx.schedulers.Schedulers;
-import java.util.Map;
+import javax.net.ssl.SSLException;
+import java.security.cert.CertificateException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.function.BiFunction;
import java.util.function.Function;
/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
-public class Server extends AbstractVerticle {
- private final Function routeConstructor;
+public class Server {
+ private static final Logger logger = LoggerFactory.getLogger(Server.class);
+
private final BiFunction> errorResponseConstructor;
- private HttpServer server;
+ private EventLoopGroup eventLoopGroup;
+ private final Function routeConstructor;
+ private Channel serverChannel;
+ private final ExecutorService threadPool = Executors.newCachedThreadPool();
public Server(Function routeConstructor) {
this(routeConstructor, null);
@@ -37,72 +48,42 @@ public Server(
this.errorResponseConstructor = errorResponseConstructor;
}
- private Observable sendResponse(Response response, HttpServerRequest originalRequest) {
- if (response != null) {
- originalRequest.response().setStatusCode(response.status().getCode());
-
- if (response.headers() != null) {
- for (Map.Entry header : response.headers().entrySet()) {
- originalRequest.response().headers().add(header.getKey(), header.getValue());
- }
+ public Server listen(String host, int port, boolean secure) {
+ SslContext sslContext = null;
+ try {
+ if (secure) {
+ SelfSignedCertificate certificate = new SelfSignedCertificate();
+ sslContext = SslContextBuilder.forServer(certificate.certificate(), certificate.privateKey()).build();
}
+ } catch (SSLException | CertificateException e) {
- if (response.entity() == null) {
- originalRequest.response().end();
- } else {
- return response.entity().asBytes()
- .doOnNext(bytes -> originalRequest.response().end(Buffer.buffer(bytes)))
- .doOnError(throwable -> originalRequest.response().end());
- }
}
- return Observable.just(null);
- }
-
- private void sendGeneralServerError(HttpServerRequest originalRequest) {
- originalRequest.response().setStatusCode(500).end();
- }
-
- @Override
- public void start(Future startFuture) throws Exception {
Route route = routeConstructor.apply(new RouteBuilderImpl());
- server = vertx.createHttpServer();
- server.requestHandler(r -> {
- Observable responseObservable = route.apply(new ServerRequestImpl(r));
- if (responseObservable != null) {
- responseObservable.flatMap(routeResponse -> sendResponse(routeResponse, r))
- .onErrorResumeNext(t -> {
- if (errorResponseConstructor != null) {
- Observable errorResponseObservable =
- errorResponseConstructor.apply(new ServerRequestImpl(r), t);
- if (errorResponseObservable != null) {
- return errorResponseObservable.flatMap(errorResponse -> sendResponse(errorResponse, r))
- .doOnError(secondError -> sendGeneralServerError(r))
- .map(__ -> null);
- } else {
- sendGeneralServerError(r);
- }
- } else {
- sendGeneralServerError(r);
- }
-
- return Observable.just(null);
- })
- .subscribe();
- } else {
- r.response().setStatusCode(404).end();
- }
- });
-
- startFuture.complete();
- }
+ eventLoopGroup = new NioEventLoopGroup();
+
+ ServerBootstrap bootstrap = new ServerBootstrap()
+ .group(eventLoopGroup)
+ .channel(NioServerSocketChannel.class)
+ .option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.SO_BACKLOG, 8)
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 15000)
+ .handler(new LoggingHandler())
+ .childHandler(new ClientToServerChannelInitializer(null, threadPool, route, errorResponseConstructor))
+ .childOption(ChannelOption.TCP_NODELAY, true)
+ .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+
+ try {
+ logger.debug("Starting HTTP server on {}:{}", host, port);
+ serverChannel = bootstrap.bind(host, port).sync().channel();
+ logger.debug("HTTP server listening on port {}:{}", host, port);
+ } catch (InterruptedException e) {
+ logger.debug("Error occurred while HTTP server was listening on {}:{}", host, port, e);
+ stop();
+ }
- public Server listen(String host, int port, boolean secure) {
- Vertx.vertx(new VertxOptions().setBlockedThreadCheckInterval(1000 * 60 * 60))
- .deployVerticle(this, (verticle) -> {
- server.listen(port, host);
- });
return this;
}
@@ -117,4 +98,16 @@ public Server listen(int port) {
public Server listen(int port, boolean secure) {
return listen("localhost", port, secure);
}
+
+ public void stop() {
+ try {
+ logger.debug("Shutting down HTTP server");
+ serverChannel.close().sync();
+ } catch (InterruptedException e) {
+ logger.debug("Error occurred during HTTP server shut down", e);
+ } finally {
+ eventLoopGroup.shutdownGracefully();
+ logger.debug("HTTP server was shut down");
+ }
+ }
}
diff --git a/core/src/main/java/org/chodavarapu/datamill/http/ServerRequest.java b/core/src/main/java/org/chodavarapu/datamill/http/ServerRequest.java
index a5c6b88..e943cc1 100644
--- a/core/src/main/java/org/chodavarapu/datamill/http/ServerRequest.java
+++ b/core/src/main/java/org/chodavarapu/datamill/http/ServerRequest.java
@@ -1,8 +1,14 @@
package org.chodavarapu.datamill.http;
+import com.google.common.collect.Multimap;
+import org.chodavarapu.datamill.values.Value;
+
/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
public interface ServerRequest extends Request {
+ Value firstTrailingHeader(String header);
+ Value firstTrailingHeader(RequestHeader header);
ResponseBuilder respond();
+ Multimap trailingHeaders();
}
diff --git a/core/src/main/java/org/chodavarapu/datamill/http/impl/AbstractRequestImpl.java b/core/src/main/java/org/chodavarapu/datamill/http/impl/AbstractRequestImpl.java
new file mode 100644
index 0000000..b8a571d
--- /dev/null
+++ b/core/src/main/java/org/chodavarapu/datamill/http/impl/AbstractRequestImpl.java
@@ -0,0 +1,106 @@
+package org.chodavarapu.datamill.http.impl;
+
+import com.google.common.collect.Multimap;
+import org.chodavarapu.datamill.http.Entity;
+import org.chodavarapu.datamill.http.Method;
+import org.chodavarapu.datamill.http.Request;
+import org.chodavarapu.datamill.http.RequestHeader;
+import org.chodavarapu.datamill.values.StringValue;
+import org.chodavarapu.datamill.values.Value;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * @author Ravi Chodavarapu (rchodava@gmail.com)
+ */
+public abstract class AbstractRequestImpl implements Request {
+ protected static Value firstValue(Multimap entries, String name) {
+ if (entries != null) {
+ Collection values = entries.get(name);
+ if (values.size() > 0) {
+ return new StringValue(values.iterator().next());
+ }
+ }
+
+ return null;
+ }
+
+ private final Multimap headers;
+ private final String method;
+ private final String uri;
+ private Map uriParameters;
+ private final Entity entity;
+
+ protected AbstractRequestImpl(String method, Multimap headers, String uri, Entity entity) {
+ this.method = method;
+ this.headers = headers;
+ this.uri = uri;
+ this.entity = entity;
+ }
+
+ @Override
+ public Entity entity() {
+ return entity;
+ }
+
+ @Override
+ public Value firstHeader(String header) {
+ return firstValue(headers, header);
+ }
+
+ @Override
+ public Value firstHeader(RequestHeader header) {
+ return firstHeader(header.getName());
+ }
+
+ @Override
+ public Value firstQueryParameter(String name) {
+ return firstValue(queryParameters(), name);
+ }
+
+ @Override
+ public Multimap headers() {
+ return headers;
+ }
+
+ @Override
+ public Method method() {
+ try {
+ return Method.valueOf(method);
+ } catch (IllegalArgumentException e) {
+ return Method.UNKNOWN;
+ }
+ }
+
+ @Override
+ public String rawMethod() {
+ return method;
+ }
+
+ protected void setUriParameters(Map uriParameters) {
+ this.uriParameters = uriParameters;
+ }
+
+ @Override
+ public String uri() {
+ return uri;
+ }
+
+ @Override
+ public Value uriParameter(String parameter) {
+ if (uriParameters != null) {
+ String value = uriParameters.get(parameter);
+ if (value != null) {
+ return new StringValue(value);
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public Map uriParameters() {
+ return uriParameters;
+ }
+}
diff --git a/core/src/main/java/org/chodavarapu/datamill/http/impl/ClientToServerChannelHandler.java b/core/src/main/java/org/chodavarapu/datamill/http/impl/ClientToServerChannelHandler.java
new file mode 100644
index 0000000..78b7c85
--- /dev/null
+++ b/core/src/main/java/org/chodavarapu/datamill/http/impl/ClientToServerChannelHandler.java
@@ -0,0 +1,230 @@
+package org.chodavarapu.datamill.http.impl;
+
+import com.google.common.collect.Multimap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.http.*;
+import org.chodavarapu.datamill.http.Entity;
+import org.chodavarapu.datamill.http.Response;
+import org.chodavarapu.datamill.http.Route;
+import org.chodavarapu.datamill.http.ServerRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.subjects.ReplaySubject;
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.function.BiFunction;
+
+/**
+ * @author Ravi Chodavarapu (rchodava@gmail.com)
+ */
+public class ClientToServerChannelHandler extends ChannelInboundHandlerAdapter {
+ private static final Logger logger = LoggerFactory.getLogger(ClientToServerChannelHandler.class);
+
+ private final BiFunction> errorResponseConstructor;
+ private final Route route;
+ private final ExecutorService threadPool;
+
+ private ReplaySubject entityStream;
+ private ServerRequestImpl serverRequest;
+
+ public ClientToServerChannelHandler(
+ ExecutorService threadPool,
+ Route route,
+ BiFunction> errorResponseConstructor) {
+ this.threadPool = threadPool;
+ this.route = route;
+ this.errorResponseConstructor = errorResponseConstructor;
+ }
+
+ private void sendGeneralServerError(ChannelHandlerContext context) {
+ context.write(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR));
+ }
+
+ @Override
+ public void channelReadComplete(ChannelHandlerContext context) {
+ context.flush();
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext context, Object message) {
+ if (message instanceof HttpRequest) {
+ HttpRequest request = (HttpRequest) message;
+
+ if (HttpUtil.is100ContinueExpected(request)) {
+ sendContinueResponse(context);
+ }
+
+ entityStream = ReplaySubject.create();
+ serverRequest = ServerRequestBuilder.buildServerRequest(request, entityStream);
+
+ processRequest(context, request);
+
+ if (request.decoderResult().isFailure()) {
+ entityStream.onError(request.decoderResult().cause());
+ }
+ }
+
+ if (message instanceof HttpContent) {
+ HttpContent httpContent = (HttpContent) message;
+
+ ByteBuf content = httpContent.content();
+ if (content.isReadable()) {
+ byte[] chunk = new byte[content.readableBytes()];
+ content.readBytes(chunk);
+ entityStream.onNext(chunk);
+
+ if (httpContent.decoderResult().isFailure()) {
+ entityStream.onError(httpContent.decoderResult().cause());
+ }
+ }
+
+ if (message instanceof LastHttpContent) {
+ LastHttpContent trailer = (LastHttpContent) message;
+ if (!trailer.trailingHeaders().isEmpty()) {
+ serverRequest.setTrailingHeaders(ServerRequestBuilder.buildHeadersMap(trailer.trailingHeaders()));
+ }
+
+ entityStream.onCompleted();
+ }
+ }
+ }
+
+ private void processRequest(ChannelHandlerContext context, HttpRequest originalRequest) {
+ threadPool.execute(() -> {
+ try {
+ Observable responseObservable = route.apply(serverRequest);
+ if (responseObservable != null) {
+ threadPool.execute(() -> {
+ Response response = responseObservable.onErrorResumeNext(throwable -> {
+ Observable errorResponse = errorResponseConstructor.apply(serverRequest, throwable);
+ if (errorResponse != null) {
+ logger.debug("Error occurred handling request, invoking application error handler");
+ return errorResponse.onErrorResumeNext(Observable.just(null));
+ }
+
+ return Observable.just(null);
+ }).toBlocking().lastOrDefault(null);
+
+ sendResponse(context, originalRequest, response);
+ });
+ } else {
+ logger.debug("Error occurred handling request, sending a generic server error (500)");
+ sendGeneralServerError(context);
+ }
+ } catch (Exception e) {
+ logger.debug("Error occurred handling request, sending a generic server error (500)", e);
+ sendGeneralServerError(context);
+ }
+ });
+ }
+
+ private void fillResponse(HttpRequest originalRequest, HttpResponse response,
+ Multimap headers, int contentLength) {
+ boolean keepAlive = HttpUtil.isKeepAlive(originalRequest);
+ if (keepAlive) {
+ // http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
+ response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
+ }
+
+ if (contentLength > -1) {
+ response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, contentLength);
+ }
+
+ if (headers != null && headers.size() > 0) {
+ for (Map.Entry header : headers.entries()) {
+ response.headers().add(header.getKey(), header.getValue());
+ }
+ }
+
+ }
+
+ private void sendResponseStart(ChannelHandlerContext context, HttpRequest originalRequest,
+ int status, Multimap headers, int contentLength) {
+ HttpResponse response = new DefaultHttpResponse(
+ originalRequest.protocolVersion(),
+ HttpResponseStatus.valueOf(status));
+
+ fillResponse(originalRequest, response, headers, contentLength);
+
+ context.write(response);
+ }
+
+ private void sendContent(ChannelHandlerContext context, byte[] responseBytes) {
+ HttpContent content = new DefaultHttpContent(responseBytes == null ?
+ Unpooled.EMPTY_BUFFER :
+ Unpooled.wrappedBuffer(responseBytes));
+
+ context.write(content);
+ }
+
+ private void sendResponseEnd(ChannelHandlerContext context, HttpRequest originalRequest) {
+ writeAndFlush(context, originalRequest, LastHttpContent.EMPTY_LAST_CONTENT);
+ }
+
+ private void sendFullResponse(ChannelHandlerContext context, HttpRequest originalRequest,
+ int status, Multimap headers) {
+ FullHttpResponse response = new DefaultFullHttpResponse(
+ originalRequest.protocolVersion(),
+ HttpResponseStatus.valueOf(status),
+ Unpooled.EMPTY_BUFFER);
+
+ fillResponse(originalRequest, response, headers, 0);
+
+ writeAndFlush(context, originalRequest, response);
+ }
+
+ private void sendResponse(ChannelHandlerContext context, HttpRequest originalRequest, Response serverResponse) {
+ Entity responseEntity = serverResponse.entity();
+ if (responseEntity != null) {
+ threadPool.execute(() -> {
+ boolean[] first = {true};
+ responseEntity.asChunks()
+ .doOnNext(bytes -> {
+ if (first[0]) {
+ sendResponseStart(context, originalRequest,
+ serverResponse.status().getCode(),
+ serverResponse.headers(),
+ bytes == null ? -1 : bytes.length);
+ sendContent(context, bytes);
+
+ first[0] = false;
+ } else {
+ sendContent(context, bytes);
+ }
+ })
+ .finallyDo(() -> {
+ sendResponseEnd(context, originalRequest);
+ })
+ .toBlocking().lastOrDefault(null);
+ });
+ } else {
+ sendFullResponse(context, originalRequest, serverResponse.status().getCode(), serverResponse.headers());
+ }
+ }
+
+ private static void sendContinueResponse(ChannelHandlerContext context) {
+ FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
+ context.write(response);
+ }
+
+ private void writeAndFlush(ChannelHandlerContext context, HttpRequest originalRequest, HttpObject response) {
+ ChannelFuture writeFuture = context.writeAndFlush(response);
+ boolean keepAlive = HttpUtil.isKeepAlive(originalRequest);
+ if (!keepAlive) {
+ writeFuture.addListener(ChannelFutureListener.CLOSE);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
+ cause.printStackTrace();
+ context.close();
+ }
+}
diff --git a/core/src/main/java/org/chodavarapu/datamill/http/impl/ClientToServerChannelInitializer.java b/core/src/main/java/org/chodavarapu/datamill/http/impl/ClientToServerChannelInitializer.java
new file mode 100644
index 0000000..2d3f3d0
--- /dev/null
+++ b/core/src/main/java/org/chodavarapu/datamill/http/impl/ClientToServerChannelInitializer.java
@@ -0,0 +1,55 @@
+package org.chodavarapu.datamill.http.impl;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.http.HttpContentCompressor;
+import io.netty.handler.codec.http.HttpContentDecompressor;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.ssl.SslContext;
+import org.chodavarapu.datamill.http.Response;
+import org.chodavarapu.datamill.http.Route;
+import org.chodavarapu.datamill.http.ServerRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+
+import java.util.concurrent.ExecutorService;
+import java.util.function.BiFunction;
+
+/**
+ * @author Ravi Chodavarapu (rchodava@gmail.com)
+ */
+public class ClientToServerChannelInitializer extends ChannelInitializer {
+ private static final Logger logger = LoggerFactory.getLogger(ClientToServerChannelInitializer.class);
+
+ private final BiFunction> errorResponseConstructor;
+ private final Route route;
+ private final SslContext sslContext;
+ private final ExecutorService threadPool;
+
+ public ClientToServerChannelInitializer(SslContext sslContext, ExecutorService threadPool,
+ Route route, BiFunction> errorResponseConstructor) {
+ this.sslContext = sslContext;
+ this.threadPool = threadPool;
+
+ this.route = route;
+ this.errorResponseConstructor = errorResponseConstructor;
+ }
+
+ @Override
+ protected void initChannel(SocketChannel channel) throws Exception {
+ logger.debug("Initializing channel from client {} to the server", channel.remoteAddress());
+
+ ChannelPipeline pipeline = channel.pipeline();
+
+ if (sslContext != null) {
+ pipeline.addLast(sslContext.newHandler(channel.alloc()));
+ }
+
+ pipeline.addLast(new HttpServerCodec(4096, 8192, 65536));
+ pipeline.addLast(new HttpContentDecompressor());
+ pipeline.addLast(new HttpContentCompressor());
+ pipeline.addLast(new ClientToServerChannelHandler(threadPool, route, errorResponseConstructor));
+ }
+}
diff --git a/core/src/main/java/org/chodavarapu/datamill/http/impl/InputStreamEntity.java b/core/src/main/java/org/chodavarapu/datamill/http/impl/InputStreamEntity.java
index f2f0637..79ff847 100644
--- a/core/src/main/java/org/chodavarapu/datamill/http/impl/InputStreamEntity.java
+++ b/core/src/main/java/org/chodavarapu/datamill/http/impl/InputStreamEntity.java
@@ -9,7 +9,6 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.InputStreamReader;
/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
diff --git a/core/src/main/java/org/chodavarapu/datamill/http/impl/RequestBuilderImpl.java b/core/src/main/java/org/chodavarapu/datamill/http/impl/RequestBuilderImpl.java
index 532b0cd..161507a 100644
--- a/core/src/main/java/org/chodavarapu/datamill/http/impl/RequestBuilderImpl.java
+++ b/core/src/main/java/org/chodavarapu/datamill/http/impl/RequestBuilderImpl.java
@@ -1,5 +1,7 @@
package org.chodavarapu.datamill.http.impl;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Multimap;
import org.chodavarapu.datamill.http.*;
import org.chodavarapu.datamill.values.Value;
@@ -11,15 +13,16 @@
*/
public class RequestBuilderImpl implements RequestBuilder {
private Entity entity;
- private final Map headers = new HashMap<>();
+ private final Multimap headers = LinkedListMultimap.create();
private String method;
private final Map options = new HashMap<>();
+ private final Multimap queryParameters = LinkedListMultimap.create();
private String uri;
private final Map uriParameters = new HashMap<>();
@Override
public Request build() {
- return new RequestImpl(method, headers, uri, uriParameters, options, entity);
+ return new RequestImpl(method, headers, uri, queryParameters, uriParameters, options, entity);
}
@Override
@@ -63,6 +66,12 @@ public RequestBuilder method(String method) {
return this;
}
+ @Override
+ public RequestBuilder queryParameter(String name, String value) {
+ queryParameters.put(name, value);
+ return this;
+ }
+
@Override
public RequestBuilder uri(String uri) {
this.uri = uri;
diff --git a/core/src/main/java/org/chodavarapu/datamill/http/impl/RequestEntity.java b/core/src/main/java/org/chodavarapu/datamill/http/impl/RequestEntity.java
index 4840d11..30a8997 100644
--- a/core/src/main/java/org/chodavarapu/datamill/http/impl/RequestEntity.java
+++ b/core/src/main/java/org/chodavarapu/datamill/http/impl/RequestEntity.java
@@ -1,7 +1,5 @@
package org.chodavarapu.datamill.http.impl;
-import io.vertx.core.http.HttpServerRequest;
-import io.vertx.rx.java.RxHelper;
import org.chodavarapu.datamill.http.Entity;
import org.chodavarapu.datamill.http.HttpException;
import org.chodavarapu.datamill.json.JsonObject;
@@ -9,15 +7,18 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.nio.charset.Charset;
/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
public class RequestEntity implements Entity {
- private final HttpServerRequest request;
+ private final Observable chunks;
+ private final Charset charset;
- public RequestEntity(HttpServerRequest request) {
- this.request = request;
+ public RequestEntity(Observable chunks, Charset charset) {
+ this.chunks = chunks;
+ this.charset = charset;
}
@Override
@@ -35,7 +36,7 @@ public Observable asBytes() {
@Override
public Observable asChunks() {
- return RxHelper.toObservable(request).map(b -> b.getBytes());
+ return chunks;
}
@Override
@@ -45,6 +46,6 @@ public Observable asJson() {
@Override
public Observable asString() {
- return asBytes().map(bytes -> new String(bytes));
+ return asBytes().map(bytes -> new String(bytes, charset));
}
}
diff --git a/core/src/main/java/org/chodavarapu/datamill/http/impl/RequestImpl.java b/core/src/main/java/org/chodavarapu/datamill/http/impl/RequestImpl.java
index 6a76151..538b7b2 100644
--- a/core/src/main/java/org/chodavarapu/datamill/http/impl/RequestImpl.java
+++ b/core/src/main/java/org/chodavarapu/datamill/http/impl/RequestImpl.java
@@ -1,65 +1,31 @@
package org.chodavarapu.datamill.http.impl;
-import org.chodavarapu.datamill.http.Method;
-import org.chodavarapu.datamill.http.Request;
+import com.google.common.collect.Multimap;
import org.chodavarapu.datamill.http.Entity;
-import org.chodavarapu.datamill.http.RequestHeader;
-import org.chodavarapu.datamill.values.StringValue;
-import org.chodavarapu.datamill.values.Value;
import java.util.Map;
-import java.util.Optional;
/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
-public class RequestImpl implements Request {
- private final Entity entity;
- private final String method;
- private final Map headers;
+public class RequestImpl extends AbstractRequestImpl {
private final Map options;
- private final String uri;
+ private final Multimap queryParameters;
private final Map uriParameters;
public RequestImpl(
String method,
- Map headers,
+ Multimap headers,
String uri,
+ Multimap queryParameters,
Map uriParameters,
Map options,
Entity entity) {
- this.method = method;
+ super(method, headers, uri, entity);
+
+ this.queryParameters = queryParameters;
this.options = options;
- this.uri = uri;
this.uriParameters = uriParameters;
- this.headers = headers;
- this.entity = entity;
- }
-
- @Override
- public Entity entity() {
- return entity;
- }
-
- @Override
- public Map headers() {
- return headers;
- }
-
- @Override
- public Optional header(String header) {
- String value = headers.get(header);
- return Optional.ofNullable(value != null ? new StringValue(value) : null);
- }
-
- @Override
- public Optional header(RequestHeader header) {
- return header(header.getName());
- }
-
- @Override
- public Method method() {
- return Method.valueOf(method);
}
@Override
@@ -68,18 +34,8 @@ public Map options() {
}
@Override
- public String uri() {
- return uri;
- }
-
- @Override
- public Value uriParameter(String parameter) {
- String value = uriParameters.get(parameter);
- if (value == null) {
- return null;
- }
-
- return new StringValue(value);
+ public Multimap queryParameters() {
+ return queryParameters;
}
@Override
diff --git a/core/src/main/java/org/chodavarapu/datamill/http/impl/ResponseBuilderImpl.java b/core/src/main/java/org/chodavarapu/datamill/http/impl/ResponseBuilderImpl.java
index 4537bd0..a294fde 100644
--- a/core/src/main/java/org/chodavarapu/datamill/http/impl/ResponseBuilderImpl.java
+++ b/core/src/main/java/org/chodavarapu/datamill/http/impl/ResponseBuilderImpl.java
@@ -1,19 +1,18 @@
package org.chodavarapu.datamill.http.impl;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Multimap;
import org.chodavarapu.datamill.http.Response;
import org.chodavarapu.datamill.http.ResponseBuilder;
import org.chodavarapu.datamill.http.Status;
import org.chodavarapu.datamill.values.StringValue;
import rx.Observable;
-import java.util.HashMap;
-import java.util.Map;
-
/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
public class ResponseBuilderImpl implements ResponseBuilder {
- private final Map headers = new HashMap<>();
+ private final Multimap headers = LinkedListMultimap.create();
@Override
public Observable badRequest() {
diff --git a/core/src/main/java/org/chodavarapu/datamill/http/impl/ResponseImpl.java b/core/src/main/java/org/chodavarapu/datamill/http/impl/ResponseImpl.java
index 635ff63..5dae2f3 100644
--- a/core/src/main/java/org/chodavarapu/datamill/http/impl/ResponseImpl.java
+++ b/core/src/main/java/org/chodavarapu/datamill/http/impl/ResponseImpl.java
@@ -1,5 +1,7 @@
package org.chodavarapu.datamill.http.impl;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
import org.chodavarapu.datamill.http.Entity;
import org.chodavarapu.datamill.http.Response;
import org.chodavarapu.datamill.http.Status;
@@ -10,23 +12,31 @@
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
public class ResponseImpl implements Response {
- private Map headers;
+ private Multimap headers;
private Status status;
private Entity entity;
public ResponseImpl(Status status) {
- this(status, null, null);
+ this(status, (Multimap) null, null);
}
public ResponseImpl(Status status, Map headers) {
this(status, headers, null);
}
+ public ResponseImpl(Status status, Multimap headers) {
+ this(status, headers, null);
+ }
+
public ResponseImpl(Status status, Entity entity) {
- this(status, null, entity);
+ this(status, (Multimap) null, entity);
}
public ResponseImpl(Status status, Map headers, Entity entity) {
+ this(status, Multimaps.forMap(headers), entity);
+ }
+
+ public ResponseImpl(Status status, Multimap headers, Entity entity) {
this.status = status;
this.headers = headers;
this.entity = entity;
@@ -38,7 +48,7 @@ public Entity entity() {
}
@Override
- public Map headers() {
+ public Multimap headers() {
return headers;
}
diff --git a/core/src/main/java/org/chodavarapu/datamill/http/impl/ServerRequestBuilder.java b/core/src/main/java/org/chodavarapu/datamill/http/impl/ServerRequestBuilder.java
new file mode 100644
index 0000000..0df5b8c
--- /dev/null
+++ b/core/src/main/java/org/chodavarapu/datamill/http/impl/ServerRequestBuilder.java
@@ -0,0 +1,50 @@
+package org.chodavarapu.datamill.http.impl;
+
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpUtil;
+import rx.Observable;
+
+import java.nio.charset.Charset;
+import java.util.Map;
+
+/**
+ * @author Ravi Chodavarapu (rchodava@gmail.com)
+ */
+public class ServerRequestBuilder {
+ public static ServerRequestImpl buildServerRequest(HttpRequest request, Observable entityStream) {
+ Charset messageCharset = HttpUtil.getCharset(request);
+ return new ServerRequestImpl(
+ request.method().name(),
+ buildHeadersMap(request.headers()),
+ request.uri(),
+ messageCharset,
+ new RequestEntity(entityStream, messageCharset));
+ }
+
+ public static Multimap buildHeadersMap(HttpHeaders headers) {
+ Multimap headersMap;
+
+ HttpHeaders requestHeaders = headers;
+ if (!requestHeaders.isEmpty()) {
+ ImmutableMultimap.Builder builder = ImmutableMultimap.builder();
+
+ for (Map.Entry header : requestHeaders) {
+ String key = header.getKey();
+ String value = header.getValue();
+
+ if (key != null && value != null) {
+ builder.put(key, value);
+ }
+ }
+
+ headersMap = builder.build();
+ } else {
+ headersMap = null;
+ }
+
+ return headersMap;
+ }
+}
diff --git a/core/src/main/java/org/chodavarapu/datamill/http/impl/ServerRequestImpl.java b/core/src/main/java/org/chodavarapu/datamill/http/impl/ServerRequestImpl.java
index 0a8077d..5c18966 100644
--- a/core/src/main/java/org/chodavarapu/datamill/http/impl/ServerRequestImpl.java
+++ b/core/src/main/java/org/chodavarapu/datamill/http/impl/ServerRequestImpl.java
@@ -1,77 +1,70 @@
package org.chodavarapu.datamill.http.impl;
-import com.google.common.base.Joiner;
-import io.vertx.core.MultiMap;
-import io.vertx.core.http.HttpServerRequest;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
+import io.netty.handler.codec.http.QueryStringDecoder;
import org.chodavarapu.datamill.http.*;
-import org.chodavarapu.datamill.values.StringValue;
import org.chodavarapu.datamill.values.Value;
+import java.nio.charset.Charset;
import java.util.*;
/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
-public class ServerRequestImpl implements ServerRequest {
- private Entity entity;
+public class ServerRequestImpl extends AbstractRequestImpl implements ServerRequest {
+ private Multimap queryParameters;
+ private QueryStringDecoder queryStringDecoder;
+ private Multimap trailingHeaders;
- private final HttpServerRequest request;
+ public ServerRequestImpl(String method, Multimap headers, String uri, Charset charset, Entity entity) {
+ super(method, headers, uri, entity);
- private Map uriParameters;
-
- public ServerRequestImpl(HttpServerRequest request) {
- this.request = request;
- this.entity = new RequestEntity(request);
+ this.queryStringDecoder = new QueryStringDecoder(uri, charset);
}
- @Override
- public Entity entity() {
- return entity;
- }
+ private Multimap extractQueryParameters() {
+ Multimap queryParameters;
- @Override
- public Map headers() {
- Map headers = new HashMap<>();
-
- MultiMap requestHeaders = request.headers();
- for (String header : requestHeaders.names()) {
- List values = requestHeaders.getAll(header);
- if (values.size() > 1) {
- headers.put(header, Joiner.on(',').join(values));
- } else if (values.size() > 0) {
- headers.put(header, values.get(0));
+ Map> params = queryStringDecoder.parameters();
+ if (!params.isEmpty()) {
+ ImmutableMultimap.Builder builder = ImmutableMultimap.builder();
+
+ for (Map.Entry> p: params.entrySet()) {
+ String key = p.getKey();
+
+ List values = p.getValue();
+ for (String value : values) {
+ builder.put(key, value);
+ }
}
+
+ queryParameters = builder.build();
+ } else {
+ queryParameters = null;
+ queryStringDecoder = null;
}
- return headers;
+ return queryParameters;
}
@Override
- public Optional header(String header) {
- String value = request.getHeader(header);
- return Optional.ofNullable(value != null ? new StringValue(value) : null);
+ public Value firstTrailingHeader(String header) {
+ return firstValue(trailingHeaders, header);
}
@Override
- public Optional header(RequestHeader header) {
- return header(header.getName());
+ public Value firstTrailingHeader(RequestHeader header) {
+ return firstTrailingHeader(header.getName());
}
@Override
- public Method method() {
- switch (request.method()) {
- case OPTIONS: return Method.OPTIONS;
- case GET: return Method.GET;
- case HEAD: return Method.HEAD;
- case POST: return Method.POST;
- case PUT: return Method.PUT;
- case DELETE: return Method.DELETE;
- case TRACE: return Method.TRACE;
- case CONNECT: return Method.CONNECT;
- case PATCH: return Method.PATCH;
+ public Multimap queryParameters() {
+ if (queryParameters == null && queryStringDecoder != null) {
+ queryParameters = extractQueryParameters();
}
- return null;
+ return queryParameters;
}
@Override
@@ -84,28 +77,12 @@ public ResponseBuilder respond() {
return new ResponseBuilderImpl();
}
- @Override
- public String uri() {
- return request.uri();
- }
-
- void setUriParameters(Map uriParameters) {
- this.uriParameters = uriParameters;
- }
-
- @Override
- public Value uriParameter(String parameter) {
- if (uriParameters != null) {
- String value = uriParameters.get(parameter);
- if (value != null) {
- return new StringValue(value);
- }
- }
- return null;
+ public void setTrailingHeaders(Multimap trailingHeaders) {
+ this.trailingHeaders = trailingHeaders;
}
@Override
- public Map uriParameters() {
- return uriParameters;
+ public Multimap trailingHeaders() {
+ return trailingHeaders;
}
}
diff --git a/core/src/main/java/org/chodavarapu/datamill/json/JsonArray.java b/core/src/main/java/org/chodavarapu/datamill/json/JsonArray.java
index e208edd..bb9b8d3 100644
--- a/core/src/main/java/org/chodavarapu/datamill/json/JsonArray.java
+++ b/core/src/main/java/org/chodavarapu/datamill/json/JsonArray.java
@@ -2,6 +2,8 @@
import org.chodavarapu.datamill.values.ReflectableValue;
import org.chodavarapu.datamill.values.Value;
+import org.json.JSONArray;
+import org.json.JSONObject;
import java.time.LocalDateTime;
import java.util.ArrayList;
@@ -13,34 +15,35 @@
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
public class JsonArray implements ReflectableValue {
- final io.vertx.core.json.JsonArray array;
+ final JSONArray array;
public JsonArray() {
- array = new io.vertx.core.json.JsonArray();
+ array = new JSONArray();
}
public JsonArray(String json) {
- array = new io.vertx.core.json.JsonArray(json);
+ array = new JSONArray(json);
}
- JsonArray(io.vertx.core.json.JsonArray array) {
+ JsonArray(JSONArray array) {
this.array = array;
}
public JsonArray(List values) {
- ArrayList objects = new ArrayList<>();
+ ArrayList objects = new ArrayList<>();
for (JsonObject value : values) {
objects.add(value.object);
}
- array = new io.vertx.core.json.JsonArray(objects);
+
+ array = new JSONArray(objects);
}
public JsonArray(String[] values) {
- array = new io.vertx.core.json.JsonArray(Arrays.asList(values));
+ array = new JSONArray(Arrays.asList(values));
}
public JsonArray add(JsonObject value) {
- array.add(value.object);
+ array.put(value.object);
return this;
}
@@ -96,7 +99,7 @@ public short asShort() {
@Override
public String asString() {
- return array.encode();
+ return array.toString();
}
@Override
diff --git a/core/src/main/java/org/chodavarapu/datamill/json/JsonMappers.java b/core/src/main/java/org/chodavarapu/datamill/json/JsonMappers.java
deleted file mode 100644
index 3036f92..0000000
--- a/core/src/main/java/org/chodavarapu/datamill/json/JsonMappers.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.chodavarapu.datamill.json;
-
-import org.chodavarapu.datamill.json.patch.OperationType;
-import org.chodavarapu.datamill.json.patch.Operation;
-
-import java.util.function.Function;
-import java.util.stream.Stream;
-
-/**
- * @author Ravi Chodavarapu (rchodava@gmail.com)
- */
-public interface JsonMappers {
-// Function> JSON_TO_JSON_PATCH_OPERATIONS =
-// j -> j.children().stream().map(e ->
-// new Operation(
-// OperationType.fromString(e.get("op").asString()),
-// e.get("path").asString(),
-// e.get("value")));
-}
diff --git a/core/src/main/java/org/chodavarapu/datamill/json/JsonObject.java b/core/src/main/java/org/chodavarapu/datamill/json/JsonObject.java
index f423b8d..dd16f12 100644
--- a/core/src/main/java/org/chodavarapu/datamill/json/JsonObject.java
+++ b/core/src/main/java/org/chodavarapu/datamill/json/JsonObject.java
@@ -3,6 +3,9 @@
import org.chodavarapu.datamill.reflection.Member;
import org.chodavarapu.datamill.values.ReflectableValue;
import org.chodavarapu.datamill.values.Value;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
import java.time.LocalDateTime;
import java.util.Map;
@@ -12,22 +15,22 @@
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
public class JsonObject implements ReflectableValue {
- final io.vertx.core.json.JsonObject object;
+ final JSONObject object;
- private JsonObject(io.vertx.core.json.JsonObject object) {
+ private JsonObject(JSONObject object) {
this.object = object;
}
public JsonObject() {
- object = new io.vertx.core.json.JsonObject();
+ object = new JSONObject();
}
public JsonObject(String json) {
- object = new io.vertx.core.json.JsonObject(json);
+ object = new JSONObject(json);
}
public JsonObject(Map values) {
- object = new io.vertx.core.json.JsonObject(values);
+ object = new JSONObject(values);
}
@Override
@@ -82,7 +85,7 @@ public short asShort() {
@Override
public String asString() {
- return object.encode();
+ return object.toString();
}
public JsonProperty get(String property) {
@@ -181,19 +184,36 @@ public boolean asBoolean() {
@Override
public byte asByte() {
- return (byte) (int) object.getInteger(name);
+ return (byte) (int) object.getInt(name);
}
@Override
public byte[] asByteArray() {
- return object.getBinary(name);
+ try {
+ JSONArray array = object.getJSONArray(name);
+ if (array != null) {
+ byte[] bytes = new byte[array.length()];
+ for (int i = 0; i < bytes.length; i++) {
+ bytes[i] = (byte) array.getInt(i);
+ }
+
+ return bytes;
+ }
+ } catch (JSONException e) {
+ String value = asString();
+ if (value != null) {
+ return value.getBytes();
+ }
+ }
+
+ return null;
}
@Override
public char asCharacter() {
try {
- return (char) (int) object.getInteger(name);
- } catch (ClassCastException e) {
+ return (char) (int) object.getInt(name);
+ } catch (JSONException e) {
String value = object.getString(name);
if (value.length() == 1) {
return value.charAt(0);
@@ -210,25 +230,40 @@ public double asDouble() {
@Override
public float asFloat() {
- return object.getFloat(name);
+ return object.getBigDecimal(name).floatValue();
}
@Override
public int asInteger() {
- return object.getInteger(name);
+ return object.getInt(name);
}
public JsonArray asJsonArray() {
- return new JsonArray(object.getJsonArray(name));
+ JSONArray array = object.optJSONArray(name);
+ if (array != null) {
+ return new JsonArray(array);
+ }
+
+ return null;
}
public JsonObject asJson() {
- return new JsonObject(object.getJsonObject(name));
+ JSONObject json = object.optJSONObject(name);
+ if (json != null) {
+ return new JsonObject(json);
+ }
+
+ return null;
}
@Override
public LocalDateTime asLocalDateTime() {
- return LocalDateTime.parse(object.getString(name));
+ String value = object.optString(name);
+ if (value != null) {
+ return LocalDateTime.parse(value);
+ }
+
+ return null;
}
@Override
@@ -238,12 +273,12 @@ public long asLong() {
@Override
public short asShort() {
- return (short) (int) object.getInteger(name);
+ return (short) (int) object.getInt(name);
}
@Override
public String asString() {
- return object.getString(name);
+ return object.optString(name);
}
@Override
diff --git a/core/src/main/java/org/chodavarapu/datamill/json/patch/Operation.java b/core/src/main/java/org/chodavarapu/datamill/json/patch/Operation.java
deleted file mode 100644
index b2fed65..0000000
--- a/core/src/main/java/org/chodavarapu/datamill/json/patch/Operation.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package org.chodavarapu.datamill.json.patch;
-
-import org.chodavarapu.datamill.json.JsonObject;
-
-/**
- * @author Ravi Chodavarapu (rchodava@gmail.com)
- */
-public class Operation {
- private final OperationType type;
-
- public Operation(OperationType type, String path, JsonObject value) {
- this.type = type;
- }
-
- public OperationType getType() {
- return type;
- }
-}
diff --git a/core/src/main/java/org/chodavarapu/datamill/json/patch/OperationType.java b/core/src/main/java/org/chodavarapu/datamill/json/patch/OperationType.java
deleted file mode 100644
index 4e43c72..0000000
--- a/core/src/main/java/org/chodavarapu/datamill/json/patch/OperationType.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.chodavarapu.datamill.json.patch;
-
-/**
- * @author Ravi Chodavarapu (rchodava@gmail.com)
- */
-public enum OperationType {
- ADD,
- COPY,
- MOVE,
- REMOVE,
- REPLACE,
- TEST;
-
- public static OperationType fromString(String type) {
- switch (type) {
- case "add": return ADD;
- case "copy": return COPY;
- case "move": return MOVE;
- case "remove": return REMOVE;
- case "replace": return REPLACE;
- case "test": return TEST;
- }
-
- return null;
- }
-}
diff --git a/core/src/test/java/org/chodavarapu/datamill/http/impl/ClientToServerChannelHandlerTest.java b/core/src/test/java/org/chodavarapu/datamill/http/impl/ClientToServerChannelHandlerTest.java
new file mode 100644
index 0000000..fd41bbf
--- /dev/null
+++ b/core/src/test/java/org/chodavarapu/datamill/http/impl/ClientToServerChannelHandlerTest.java
@@ -0,0 +1,164 @@
+package org.chodavarapu.datamill.http.impl;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.*;
+import org.chodavarapu.datamill.http.Route;
+import org.chodavarapu.datamill.http.ServerRequest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * @author Ravi Chodavarapu (rchodava@gmail.com)
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class ClientToServerChannelHandlerTest {
+ @Mock
+ private ChannelHandlerContext context;
+
+ @Captor
+ private ArgumentCaptor lastContentCaptor;
+
+ @Mock
+ private Route route;
+
+ @Mock
+ private ExecutorService threadPool;
+
+ @Captor
+ private ArgumentCaptor requestCaptor;
+
+ @Captor
+ private ArgumentCaptor responseCaptor;
+
+ @Captor
+ private ArgumentCaptor responseFragmentsCaptor;
+
+ private void waitForExecutorToFinishAllTasks(ExecutorService executor) throws Exception {
+ for (int i = 0; i < 5; i++) {
+ // We submit an empty task and wait for it so that other tasks submitted ahead of this get executed first
+ // and we wait for their completion. But each of those may have submitted others which we need to wait for
+ // as well which is why this is done in a loop.
+ executor.submit(() -> {}).get();
+ }
+ }
+
+ @Test
+ public void sendContinueResponseIfRequested() {
+ ClientToServerChannelHandler handler = new ClientToServerChannelHandler(threadPool, route, null);
+
+ DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "localhost");
+ request.headers().add(HttpHeaderNames.EXPECT, HttpHeaderValues.CONTINUE);
+
+ handler.channelRead(context, request);
+
+ verify(context).write(responseCaptor.capture());
+
+ FullHttpResponse response = responseCaptor.getValue();
+ assertEquals(HttpVersion.HTTP_1_1, response.protocolVersion());
+ assertEquals(HttpResponseStatus.CONTINUE, response.status());
+ }
+
+ @Test
+ public void readEntitySentWithFullRequest() throws Exception {
+ ExecutorService service = Executors.newSingleThreadExecutor();
+
+ ClientToServerChannelHandler handler = new ClientToServerChannelHandler(service, route, null);
+
+ DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "localhost");
+ request.content().writeBytes("Test Content".getBytes());
+
+ when(route.apply(any())).thenReturn(new ResponseBuilderImpl().ok());
+
+ handler.channelRead(context, request);
+
+ waitForExecutorToFinishAllTasks(service);
+
+ verify(route).apply(requestCaptor.capture());
+
+ ServerRequest appliedRequest = requestCaptor.getValue();
+ assertEquals("Test Content", appliedRequest.entity().asString().toBlocking().last());
+ }
+
+ @Test
+ public void readEntitySentWithMultipleChunks() throws Exception {
+ when(route.apply(any())).thenReturn(new ResponseBuilderImpl().ok());
+
+ ExecutorService service = Executors.newSingleThreadExecutor();
+
+ ClientToServerChannelHandler handler = new ClientToServerChannelHandler(service, route, null);
+
+ DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "localhost");
+ handler.channelRead(context, request);
+
+ DefaultHttpContent content = new DefaultHttpContent(Unpooled.wrappedBuffer("Test Content ".getBytes()));
+ handler.channelRead(context, content);
+
+ content = new DefaultHttpContent(Unpooled.wrappedBuffer("Additional Content".getBytes()));
+ handler.channelRead(context, content);
+
+ DefaultLastHttpContent last = new DefaultLastHttpContent();
+ handler.channelRead(context, last);
+
+ waitForExecutorToFinishAllTasks(service);
+
+ verify(route).apply(requestCaptor.capture());
+
+ ServerRequest appliedRequest = requestCaptor.getValue();
+ assertEquals("Test Content Additional Content", appliedRequest.entity().asString().toBlocking().last());
+ }
+
+ @Test
+ public void singleChunkResponseSent() throws Exception {
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ ClientToServerChannelHandler handler = new ClientToServerChannelHandler(service, route, null);
+
+ DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "localhost");
+
+ when(route.apply(any())).thenReturn(new ResponseBuilderImpl().ok());
+
+ handler.channelRead(context, request);
+
+ waitForExecutorToFinishAllTasks(service);
+
+ verify(context).writeAndFlush(responseCaptor.capture());
+
+ assertEquals(HttpResponseStatus.OK, responseCaptor.getValue().status());
+ }
+
+ @Test
+ public void multipeResponseChunksSent() throws Exception {
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ ClientToServerChannelHandler handler = new ClientToServerChannelHandler(service, route, null);
+
+ DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "localhost");
+
+ when(route.apply(any())).thenReturn(new ResponseBuilderImpl().ok("Test Content"));
+
+ handler.channelRead(context, request);
+
+ waitForExecutorToFinishAllTasks(service);
+
+ verify(context, times(2)).write(responseFragmentsCaptor.capture());
+ verify(context).writeAndFlush(lastContentCaptor.capture());
+
+ assertEquals(HttpResponseStatus.OK, ((HttpResponse) responseFragmentsCaptor.getAllValues().get(0)).status());
+ byte[] bytes = new byte[((HttpContent) responseFragmentsCaptor.getAllValues().get(1)).content().readableBytes()];
+ ((HttpContent) responseFragmentsCaptor.getAllValues().get(1)).content().readBytes(bytes);
+ assertArrayEquals("Test Content".getBytes(), bytes);
+ }
+}
diff --git a/core/src/test/java/org/chodavarapu/datamill/http/impl/RequestBuilderImplTest.java b/core/src/test/java/org/chodavarapu/datamill/http/impl/RequestBuilderImplTest.java
index 6588bf4..d27bb53 100644
--- a/core/src/test/java/org/chodavarapu/datamill/http/impl/RequestBuilderImplTest.java
+++ b/core/src/test/java/org/chodavarapu/datamill/http/impl/RequestBuilderImplTest.java
@@ -25,8 +25,8 @@ public void requestBuilding() {
.uri("http://sample.com")
.build();
- assertEquals("application/json", request.header("Accept").get().asString());
- assertEquals("application/json", request.header(RequestHeader.CONTENT_TYPE).get().asString());
+ assertEquals("application/json", request.firstHeader("Accept").asString());
+ assertEquals("application/json", request.firstHeader(RequestHeader.CONTENT_TYPE).asString());
assertEquals(Method.GET, request.method());
assertEquals("http://sample.com", request.uri());
assertEquals(500, request.options().get(Request.OPTION_CONNECT_TIMEOUT));
diff --git a/core/src/test/java/org/chodavarapu/datamill/http/impl/ServerRequestImplTest.java b/core/src/test/java/org/chodavarapu/datamill/http/impl/ServerRequestImplTest.java
new file mode 100644
index 0000000..e9a7b0d
--- /dev/null
+++ b/core/src/test/java/org/chodavarapu/datamill/http/impl/ServerRequestImplTest.java
@@ -0,0 +1,28 @@
+package org.chodavarapu.datamill.http.impl;
+
+import com.google.common.collect.ImmutableMultimap;
+import org.chodavarapu.datamill.http.Method;
+import org.junit.Test;
+
+import java.nio.charset.Charset;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author Ravi Chodavarapu (rchodava@gmail.com)
+ */
+public class ServerRequestImplTest {
+ @Test
+ public void basicRequestProperties() {
+ ServerRequestImpl request = new ServerRequestImpl(
+ "GET", ImmutableMultimap.of("header1", "valueh1v1", "header1", "valueh1v2", "header2", "valueh2v1"),
+ "http://localhost:8080?param1=value1¶m2=value2¶m2=value3", Charset.defaultCharset(), null);
+ assertEquals(Method.GET, request.method());
+ assertEquals("GET", request.rawMethod());
+ assertEquals("valueh1v1", request.firstHeader("header1").asString());
+ assertEquals("valueh2v1", request.firstHeader("header2").asString());
+ assertEquals(null, request.firstHeader("header3"));
+ assertEquals("value1", request.firstQueryParameter("param1").asString());
+ assertEquals("value2", request.firstQueryParameter("param2").asString());
+ }
+}
diff --git a/core/src/test/java/org/chodavarapu/datamill/json/JsonObjectTest.java b/core/src/test/java/org/chodavarapu/datamill/json/JsonObjectTest.java
index ca4c517..7681afe 100644
--- a/core/src/test/java/org/chodavarapu/datamill/json/JsonObjectTest.java
+++ b/core/src/test/java/org/chodavarapu/datamill/json/JsonObjectTest.java
@@ -6,9 +6,7 @@
import java.util.function.Consumer;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
@@ -36,6 +34,7 @@ public void unsupportedConversions() {
testThrowsConversionException(j -> j.asDouble());
testThrowsConversionException(j -> j.asFloat());
testThrowsConversionException(j -> j.asInteger());
+ testThrowsConversionException(j -> j.asLocalDateTime());
testThrowsConversionException(j -> j.asLong());
testThrowsConversionException(j -> j.asShort());
@@ -46,6 +45,7 @@ public void unsupportedConversions() {
assertFalse(testObject.isFloat());
assertFalse(testObject.isInteger());
assertFalse(testObject.isLong());
+ assertFalse(testObject.isNumeric());
assertFalse(testObject.isShort());
assertFalse(testObject.isString());
}
@@ -64,7 +64,7 @@ public void stringConversion() throws Exception {
@Test
public void propertyConversions() {
- JsonObject json = new JsonObject("{\"character\": \"v\", \"numeric\": 2, \"boolean\": true, \"string\": \"value\"}");
+ JsonObject json = new JsonObject("{\"character\": \"v\", \"numeric\": 2, \"boolean\": true, \"string\": \"value\", \"bytes\": [1, 2]}");
assertEquals(true, json.get("boolean").asBoolean());
assertEquals(2, json.get("numeric").asByte());
assertEquals('v', json.get("character").asCharacter());
@@ -75,6 +75,8 @@ public void propertyConversions() {
assertEquals(2l, json.get("numeric").asLong());
assertEquals(2, json.get("numeric").asShort());
assertEquals("value", json.get("string").asString());
+ assertArrayEquals("value".getBytes(), json.get("string").asByteArray());
+ assertArrayEquals(new byte[] {(byte) 1, (byte) 2}, json.get("bytes").asByteArray());
testThrowsConversionException(json, j -> j.get("string").asCharacter());
}