Skip to content

Commit

Permalink
Allow response entities that are streamed asynchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
rchodava committed Apr 10, 2016
1 parent 6ed87e7 commit be92910
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package org.chodavarapu.datamill.http;

import rx.Observer;

import java.util.function.Consumer;

/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
Expand All @@ -14,6 +18,7 @@ public interface ResponseBuilder {
Response ok();
Response ok(String content);
Response ok(byte[] content);
ResponseBuilder streamingEntity(Consumer<Observer<byte[]>> entityStreamer);
Response unauthorized();
Response unauthorized(String content);
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void channelRead(ChannelHandlerContext context, Object message) {
}

entityStream = ReplaySubject.create();
serverRequest = ServerRequestBuilder.buildServerRequest(request, entityStream);
serverRequest = ServerRequestBuilder.buildServerRequest(request, entityStream, threadPool);

processRequest(context, request);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,38 @@

import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Multimap;
import org.chodavarapu.datamill.http.Entity;
import org.chodavarapu.datamill.http.Response;
import org.chodavarapu.datamill.http.ResponseBuilder;
import org.chodavarapu.datamill.http.Status;
import org.chodavarapu.datamill.values.StringValue;
import rx.Observer;
import rx.subjects.PublishSubject;

import java.nio.charset.Charset;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;

/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
public class ResponseBuilderImpl implements ResponseBuilder {
private final ExecutorService streamingEntityThreadPool;
private final Multimap<String, String> headers = LinkedListMultimap.create();
private Entity entity;

public ResponseBuilderImpl(ExecutorService threadPool) {
this.streamingEntityThreadPool = threadPool;
}

// Test hook
ResponseBuilderImpl() {
this.streamingEntityThreadPool = null;
}

@Override
public Response badRequest() {
return new ResponseImpl(Status.BAD_REQUEST, headers);
return new ResponseImpl(Status.BAD_REQUEST, headers, entity);
}

@Override
Expand All @@ -31,7 +49,7 @@ public <T> ResponseBuilder header(String name, T value) {

@Override
public Response internalServerError() {
return new ResponseImpl(Status.INTERNAL_SERVER_ERROR, headers);
return new ResponseImpl(Status.INTERNAL_SERVER_ERROR, headers, entity);
}

@Override
Expand All @@ -46,12 +64,12 @@ public Response noContent() {

@Override
public Response notFound() {
return new ResponseImpl(Status.NOT_FOUND, headers);
return new ResponseImpl(Status.NOT_FOUND, headers, entity);
}

@Override
public Response ok() {
return new ResponseImpl(Status.OK, headers);
return new ResponseImpl(Status.OK, headers, entity);
}

@Override
Expand All @@ -64,9 +82,19 @@ public Response ok(byte[] content) {
return new ResponseImpl(Status.OK, headers, new BytesEntity(content));
}

@Override
public ResponseBuilder streamingEntity(Consumer<Observer<byte[]>> entityStreamer) {
PublishSubject<byte[]> entitySubject = PublishSubject.create();

streamingEntityThreadPool.execute(() -> entityStreamer.accept(entitySubject));

this.entity = new StreamedChunksEntity(entitySubject, Charset.defaultCharset());
return this;
}

@Override
public Response unauthorized() {
return new ResponseImpl(Status.UNAUTHORIZED, headers);
return new ResponseImpl(Status.UNAUTHORIZED, headers, entity);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,21 @@

import java.nio.charset.Charset;
import java.util.Map;
import java.util.concurrent.ExecutorService;

/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
public class ServerRequestBuilder {
public static ServerRequestImpl buildServerRequest(HttpRequest request, Observable<byte[]> entityStream) {
public static ServerRequestImpl buildServerRequest(HttpRequest request, Observable<byte[]> entityStream, ExecutorService threadPool) {
Charset messageCharset = HttpUtil.getCharset(request);
return new ServerRequestImpl(
request.method().name(),
buildHeadersMap(request.headers()),
request.uri(),
messageCharset,
new RequestEntity(entityStream, messageCharset));
new StreamedChunksEntity(entityStream, messageCharset),
threadPool);
}

public static Multimap<String, String> buildHeadersMap(HttpHeaders headers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,29 @@
import io.netty.handler.codec.http.QueryStringDecoder;
import org.chodavarapu.datamill.http.*;
import org.chodavarapu.datamill.values.Value;
import rx.*;
import rx.Observable;

import java.nio.charset.Charset;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;

/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
public class ServerRequestImpl extends AbstractRequestImpl implements ServerRequest {
private final ExecutorService entityStreamingThreadPool;

private Multimap<String, String> queryParameters;
private QueryStringDecoder queryStringDecoder;
private Multimap<String, String> trailingHeaders;

public ServerRequestImpl(String method, Multimap<String, String> headers, String uri, Charset charset, Entity entity) {
public ServerRequestImpl(String method, Multimap<String, String> headers, String uri, Charset charset, Entity entity,
ExecutorService threadPool) {
super(method, headers, uri, entity);

this.queryStringDecoder = new QueryStringDecoder(uri, charset);
this.entityStreamingThreadPool = threadPool;
}

private Multimap<String, String> extractQueryParameters() {
Expand Down Expand Up @@ -77,7 +81,7 @@ public Map<String, Object> options() {

@Override
public rx.Observable<Response> respond(Function<ResponseBuilder, Response> responseBuilder) {
return Observable.just(responseBuilder.apply(new ResponseBuilderImpl()));
return Observable.just(responseBuilder.apply(new ResponseBuilderImpl(entityStreamingThreadPool)));
}

public void setTrailingHeaders(Multimap<String, String> trailingHeaders) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
public class RequestEntity implements Entity {
public class StreamedChunksEntity implements Entity {
private final Observable<byte[]> chunks;
private final Charset charset;

public RequestEntity(Observable<byte[]> chunks, Charset charset) {
public StreamedChunksEntity(Observable<byte[]> chunks, Charset charset) {
this.chunks = chunks;
this.charset = charset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class ServerRequestImplTest {
public void basicRequestProperties() {
ServerRequestImpl request = new ServerRequestImpl(
"GET", ImmutableMultimap.of("header1", "valueh1v1", "header1", "valueh1v2", "header2", "valueh2v1"),
"http://localhost:8080?param1=value1&param2=value2&param2=value3", Charset.defaultCharset(), null);
"http://localhost:8080?param1=value1&param2=value2&param2=value3", Charset.defaultCharset(), null, null);
assertEquals(Method.GET, request.method());
assertEquals("GET", request.rawMethod());
assertEquals("valueh1v1", request.firstHeader("header1").asString());
Expand Down

0 comments on commit be92910

Please sign in to comment.