Skip to content

Commit

Permalink
Merge pull request #24 from rchodava/rc/streaming-response-entities
Browse files Browse the repository at this point in the history
Streaming response entities
  • Loading branch information
rchodava committed Apr 11, 2016
2 parents 6ed87e7 + 4a5ef5c commit c75e002
Show file tree
Hide file tree
Showing 14 changed files with 171 additions and 33 deletions.
28 changes: 20 additions & 8 deletions core/src/main/java/org/chodavarapu/datamill/db/DatabaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,27 @@ public DatabaseClient(String url, String username, String password) {
this.password = password;
}

private void setupConnectionProvider() {
if (dataSource != null) {
connectionProvider = new DelegatingConnectionProvider(new ConnectionProviderFromDataSource(dataSource));
database = Database.from(connectionProvider);
} else if (url != null) {
connectionProvider = new DelegatingConnectionProvider(new ConnectionProviderPooled(url, username, password, 0, 10));
database = Database.from(connectionProvider);
}
}

private DelegatingConnectionProvider getConnectionProvider() {
if (connectionProvider == null) {
setupConnectionProvider();
}

return connectionProvider;
}

private Database getDatabase() {
if (database == null) {
if (dataSource != null) {
connectionProvider = new DelegatingConnectionProvider(new ConnectionProviderFromDataSource(dataSource));
database = Database.from(connectionProvider);
} else if (url != null) {
connectionProvider = new DelegatingConnectionProvider(new ConnectionProviderPooled(url, username, password, 0, 10));
database = Database.from(connectionProvider);
}
setupConnectionProvider();
}

return database;
Expand Down Expand Up @@ -115,7 +127,7 @@ public UpdateQueryExecution update(String sql, Object... parameters) {
}

public DatabaseClient changeCatalog(String catalog) {
connectionProvider.setCatalog(catalog);
getConnectionProvider().setCatalog(catalog);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package org.chodavarapu.datamill.http;

import org.chodavarapu.datamill.json.Json;
import rx.Observer;

import java.util.function.Consumer;

/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
Expand All @@ -14,6 +19,8 @@ public interface ResponseBuilder {
Response ok();
Response ok(String content);
Response ok(byte[] content);
ResponseBuilder streamingEntity(Consumer<Observer<byte[]>> entityStreamer);
ResponseBuilder streamingJson(Consumer<Observer<Json>> jsonStreamer);
Response unauthorized();
Response unauthorized(String content);
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void channelRead(ChannelHandlerContext context, Object message) {
}

entityStream = ReplaySubject.create();
serverRequest = ServerRequestBuilder.buildServerRequest(request, entityStream);
serverRequest = ServerRequestBuilder.buildServerRequest(request, entityStream, threadPool);

processRequest(context, request);

Expand Down Expand Up @@ -161,7 +161,7 @@ private void sendContent(ChannelHandlerContext context, byte[] responseBytes) {
Unpooled.EMPTY_BUFFER :
Unpooled.wrappedBuffer(responseBytes));

context.write(content);
context.writeAndFlush(content);
}

private void sendResponseEnd(ChannelHandlerContext context, HttpRequest originalRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,39 @@

import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Multimap;
import org.chodavarapu.datamill.http.Entity;
import org.chodavarapu.datamill.http.Response;
import org.chodavarapu.datamill.http.ResponseBuilder;
import org.chodavarapu.datamill.http.Status;
import org.chodavarapu.datamill.json.Json;
import org.chodavarapu.datamill.values.StringValue;
import rx.Observer;
import rx.subjects.ReplaySubject;

import java.nio.charset.Charset;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;

/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
public class ResponseBuilderImpl implements ResponseBuilder {
private final ExecutorService streamingEntityThreadPool;
private final Multimap<String, String> headers = LinkedListMultimap.create();
private Entity entity;

public ResponseBuilderImpl(ExecutorService threadPool) {
this.streamingEntityThreadPool = threadPool;
}

// Test hook
ResponseBuilderImpl() {
this.streamingEntityThreadPool = null;
}

@Override
public Response badRequest() {
return new ResponseImpl(Status.BAD_REQUEST, headers);
return new ResponseImpl(Status.BAD_REQUEST, headers, entity);
}

@Override
Expand All @@ -31,7 +50,7 @@ public <T> ResponseBuilder header(String name, T value) {

@Override
public Response internalServerError() {
return new ResponseImpl(Status.INTERNAL_SERVER_ERROR, headers);
return new ResponseImpl(Status.INTERNAL_SERVER_ERROR, headers, entity);
}

@Override
Expand All @@ -46,12 +65,12 @@ public Response noContent() {

@Override
public Response notFound() {
return new ResponseImpl(Status.NOT_FOUND, headers);
return new ResponseImpl(Status.NOT_FOUND, headers, entity);
}

@Override
public Response ok() {
return new ResponseImpl(Status.OK, headers);
return new ResponseImpl(Status.OK, headers, entity);
}

@Override
Expand All @@ -64,9 +83,55 @@ public Response ok(byte[] content) {
return new ResponseImpl(Status.OK, headers, new BytesEntity(content));
}

@Override
public ResponseBuilder streamingEntity(Consumer<Observer<byte[]>> entityStreamer) {
ReplaySubject<byte[]> entitySubject = ReplaySubject.create();

streamingEntityThreadPool.execute(() -> entityStreamer.accept(entitySubject));

this.entity = new StreamedChunksEntity(entitySubject, Charset.defaultCharset());
return this;
}

@Override
public ResponseBuilder streamingJson(Consumer<Observer<Json>> jsonStreamer) {
boolean firstJsonObject[] = new boolean[] { true };
return streamingEntity(byteObserver -> {
jsonStreamer.accept(new Observer<Json>() {
{
byteObserver.onNext("[".getBytes());
}

@Override
public void onCompleted() {
byteObserver.onNext("]".getBytes());
byteObserver.onCompleted();
}

@Override
public void onError(Throwable e) {
byteObserver.onError(e);
}

@Override
public void onNext(Json jsonObject) {
if (jsonObject != null) {
if (!firstJsonObject[0]) {
byteObserver.onNext(",".getBytes());
} else {
firstJsonObject[0] = false;
}

byteObserver.onNext(jsonObject.toString().getBytes());
}
}
});
});
}

@Override
public Response unauthorized() {
return new ResponseImpl(Status.UNAUTHORIZED, headers);
return new ResponseImpl(Status.UNAUTHORIZED, headers, entity);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,21 @@

import java.nio.charset.Charset;
import java.util.Map;
import java.util.concurrent.ExecutorService;

/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
public class ServerRequestBuilder {
public static ServerRequestImpl buildServerRequest(HttpRequest request, Observable<byte[]> entityStream) {
public static ServerRequestImpl buildServerRequest(HttpRequest request, Observable<byte[]> entityStream, ExecutorService threadPool) {
Charset messageCharset = HttpUtil.getCharset(request);
return new ServerRequestImpl(
request.method().name(),
buildHeadersMap(request.headers()),
request.uri(),
messageCharset,
new RequestEntity(entityStream, messageCharset));
new StreamedChunksEntity(entityStream, messageCharset),
threadPool);
}

public static Multimap<String, String> buildHeadersMap(HttpHeaders headers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,29 @@
import io.netty.handler.codec.http.QueryStringDecoder;
import org.chodavarapu.datamill.http.*;
import org.chodavarapu.datamill.values.Value;
import rx.*;
import rx.Observable;

import java.nio.charset.Charset;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;

/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
public class ServerRequestImpl extends AbstractRequestImpl implements ServerRequest {
private final ExecutorService entityStreamingThreadPool;

private Multimap<String, String> queryParameters;
private QueryStringDecoder queryStringDecoder;
private Multimap<String, String> trailingHeaders;

public ServerRequestImpl(String method, Multimap<String, String> headers, String uri, Charset charset, Entity entity) {
public ServerRequestImpl(String method, Multimap<String, String> headers, String uri, Charset charset, Entity entity,
ExecutorService threadPool) {
super(method, headers, uri, entity);

this.queryStringDecoder = new QueryStringDecoder(uri, charset);
this.entityStreamingThreadPool = threadPool;
}

private Multimap<String, String> extractQueryParameters() {
Expand Down Expand Up @@ -77,7 +81,7 @@ public Map<String, Object> options() {

@Override
public rx.Observable<Response> respond(Function<ResponseBuilder, Response> responseBuilder) {
return Observable.just(responseBuilder.apply(new ResponseBuilderImpl()));
return Observable.just(responseBuilder.apply(new ResponseBuilderImpl(entityStreamingThreadPool)));
}

public void setTrailingHeaders(Multimap<String, String> trailingHeaders) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
public class RequestEntity implements Entity {
public class StreamedChunksEntity implements Entity {
private final Observable<byte[]> chunks;
private final Charset charset;

public RequestEntity(Observable<byte[]> chunks, Charset charset) {
public StreamedChunksEntity(Observable<byte[]> chunks, Charset charset) {
this.chunks = chunks;
this.charset = charset;
}
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/org/chodavarapu/datamill/json/Json.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.chodavarapu.datamill.json;

/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
public interface Json {
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
public class JsonArray implements ReflectableValue {
public class JsonArray implements Json, ReflectableValue {
final JSONArray array;

public JsonArray() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
public class JsonObject implements ReflectableValue {
public class JsonObject implements Json, ReflectableValue {
final JSONObject object;

private JsonObject(JSONObject object) {
Expand Down
10 changes: 10 additions & 0 deletions core/src/test/java/org/chodavarapu/datamill/http/ClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ public void getRequests() throws Exception {
Method.GET, "http://sample.com", ImmutableMap.of(
"Content-Type", "application/json",
"Accept", "application/json"));
verifyConnectionSetup(createClientAndRequest(c -> c.get(
rb -> rb.uri("http://sample.com")
.header("Content-Type", "application/json")
.header("Accept", "application/json")
.queryParameter("test", "value")
.queryParameter("test2", "value%$@")
.build())),
Method.GET, "http://sample.com?test=value&test2=value%25%24%40", ImmutableMap.of(
"Content-Type", "application/json",
"Accept", "application/json"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ public class ClientToServerChannelHandlerTest {
@Mock
private ChannelHandlerContext context;

@Captor
private ArgumentCaptor<LastHttpContent> lastContentCaptor;

@Mock
private Route route;

Expand All @@ -49,6 +46,9 @@ public class ClientToServerChannelHandlerTest {
@Captor
private ArgumentCaptor<HttpObject> responseFragmentsCaptor;

@Captor
private ArgumentCaptor<HttpObject> responseStartCaptor;

private void waitForExecutorToFinishAllTasks(ExecutorService executor) throws Exception {
for (int i = 0; i < 5; i++) {
// We submit an empty task and wait for it so that other tasks submitted ahead of this get executed first
Expand Down Expand Up @@ -154,12 +154,12 @@ public void multipeResponseChunksSent() throws Exception {

waitForExecutorToFinishAllTasks(service);

verify(context, times(2)).write(responseFragmentsCaptor.capture());
verify(context).writeAndFlush(lastContentCaptor.capture());
verify(context).write(responseStartCaptor.capture());
verify(context, times(2)).writeAndFlush(responseFragmentsCaptor.capture());

assertEquals(HttpResponseStatus.OK, ((HttpResponse) responseFragmentsCaptor.getAllValues().get(0)).status());
byte[] bytes = new byte[((HttpContent) responseFragmentsCaptor.getAllValues().get(1)).content().readableBytes()];
((HttpContent) responseFragmentsCaptor.getAllValues().get(1)).content().readBytes(bytes);
assertEquals(HttpResponseStatus.OK, ((HttpResponse) responseStartCaptor.getValue()).status());
byte[] bytes = new byte[((HttpContent) responseFragmentsCaptor.getAllValues().get(0)).content().readableBytes()];
((HttpContent) responseFragmentsCaptor.getAllValues().get(0)).content().readBytes(bytes);
assertArrayEquals("Test Content".getBytes(), bytes);
}
}
Loading

0 comments on commit c75e002

Please sign in to comment.