Skip to content

Commit

Permalink
Replace manual usage of ExecutorService with RxJava subscribeOn
Browse files Browse the repository at this point in the history
  • Loading branch information
rchodava committed Oct 11, 2016
1 parent 9c88f8a commit c46e81f
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 42 deletions.
Expand Up @@ -76,7 +76,7 @@ public void channelRead(ChannelHandlerContext context, Object message) {
}

bodyStream = ReplaySubject.create();
serverRequest = ServerRequestBuilder.buildServerRequest(request, bodyStream, threadPool);
serverRequest = ServerRequestBuilder.buildServerRequest(request, bodyStream);

processRequest(context, request);

Expand Down
Expand Up @@ -13,28 +13,18 @@
import rx.Observer;
import rx.Subscription;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutorService;

/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
public class ResponseBuilderImpl implements ResponseBuilder {
private final ExecutorService streamingBodyThreadPool;
private final Multimap<String, String> headers = LinkedListMultimap.create();
private Body body;

public ResponseBuilderImpl(ExecutorService threadPool) {
this.streamingBodyThreadPool = threadPool;
}

// Test hook
ResponseBuilderImpl() {
this.streamingBodyThreadPool = null;
}

@Override
public Response badRequest() {
return new ResponseImpl(Status.BAD_REQUEST, headers, body);
Expand Down Expand Up @@ -89,21 +79,14 @@ public Response ok(byte[] content) {
@Override
public ResponseBuilder streamingBodyAsBufferChunks(Func1<Observer<ByteBuffer>, Observable<ByteBuffer>> bodyStreamer) {
Observable<ByteBuffer> chunkStream = Observable.fromEmitter(emitter -> {
Subscription[] subscription = new Subscription[1];

streamingBodyThreadPool.execute(() -> {
subscription[0] = bodyStreamer.call(new PassthroughObserver<>(emitter))
Subscription subscription = bodyStreamer.call(new PassthroughObserver<>(emitter))
.doOnNext(buffer -> emitter.onNext(buffer))
.doOnCompleted(() -> emitter.onCompleted())
.doOnError(e -> emitter.onError(e))
.subscribeOn(Schedulers.io())
.subscribe();
});

emitter.setCancellation(() -> {
if (subscription[0] != null) {
subscription[0].unsubscribe();
}
});
emitter.setCancellation(subscription::unsubscribe);
}, Emitter.BackpressureMode.BUFFER);

this.body = new StreamedChunksBody(chunkStream, Charset.defaultCharset());
Expand Down Expand Up @@ -133,6 +116,7 @@ public ResponseBuilder streamingJson(Func1<Observer<Json>, Observable<Json>> jso
emitter.onCompleted();
})
.doOnError(e -> streamer.onError(e))
.subscribeOn(Schedulers.io())
.subscribe();

emitter.setCancellation(subscription::unsubscribe);
Expand Down
Expand Up @@ -10,7 +10,6 @@
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.concurrent.ExecutorService;

/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
Expand All @@ -19,15 +18,14 @@ public final class ServerRequestBuilder {
private ServerRequestBuilder() {
}

public static ServerRequestImpl buildServerRequest(HttpRequest request, Observable<ByteBuffer> bodyStream, ExecutorService threadPool) {
public static ServerRequestImpl buildServerRequest(HttpRequest request, Observable<ByteBuffer> bodyStream) {
Charset messageCharset = HttpUtil.getCharset(request);
return new ServerRequestImpl(
request.method().name(),
buildHeadersMap(request.headers()),
request.uri(),
messageCharset,
new StreamedChunksBody(bodyStream, messageCharset),
threadPool);
new StreamedChunksBody(bodyStream, messageCharset));
}

public static Multimap<String, String> buildHeadersMap(HttpHeaders headers) {
Expand Down
Expand Up @@ -9,25 +9,20 @@

import java.nio.charset.Charset;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;

/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
public class ServerRequestImpl extends AbstractRequestImpl implements ServerRequest {
private final ExecutorService entityStreamingThreadPool;

private Multimap<String, String> queryParameters;
private QueryStringDecoder queryStringDecoder;
private Multimap<String, String> trailingHeaders;

ServerRequestImpl(String method, Multimap<String, String> headers, String uri, Charset charset, Body body,
ExecutorService threadPool) {
ServerRequestImpl(String method, Multimap<String, String> headers, String uri, Charset charset, Body body) {
super(method, headers, uri, body);

this.queryStringDecoder = new QueryStringDecoder(uri, charset);
this.entityStreamingThreadPool = threadPool;
}

private Multimap<String, String> extractQueryParameters() {
Expand Down Expand Up @@ -81,7 +76,7 @@ public Map<String, Object> options() {

@Override
public rx.Observable<Response> respond(Function<ResponseBuilder, Response> responseBuilder) {
return Observable.just(responseBuilder.apply(new ResponseBuilderImpl(entityStreamingThreadPool)));
return Observable.just(responseBuilder.apply(new ResponseBuilderImpl()));
}

public void setTrailingHeaders(Multimap<String, String> trailingHeaders) {
Expand Down
Expand Up @@ -43,9 +43,7 @@ public void responseCodes() {

@Test
public void streamingEntites() {
ExecutorService threadPool = Executors.newSingleThreadExecutor();

ResponseBuilderImpl builder = new ResponseBuilderImpl(threadPool);
ResponseBuilderImpl builder = new ResponseBuilderImpl();

builder.streamingBody(observer -> {
observer.onNext("Test Content ".getBytes());
Expand All @@ -58,9 +56,7 @@ public void streamingEntites() {

@Test
public void streamingJson() {
ExecutorService threadPool = Executors.newSingleThreadExecutor();

ResponseBuilderImpl builder = new ResponseBuilderImpl(threadPool);
ResponseBuilderImpl builder = new ResponseBuilderImpl();

builder.streamingJson(observer -> {
observer.onNext(new JsonObject().put("test", "value"));
Expand Down
Expand Up @@ -21,7 +21,7 @@ public class ServerRequestImplTest {
public void basicRequestProperties() {
ServerRequestImpl request = new ServerRequestImpl(
"GET", ImmutableMultimap.of("header1", "valueh1v1", "header1", "valueh1v2", "header2", "valueh2v1"),
"http://localhost:8080?param1=value1&param2=value2&param2=value3", Charset.defaultCharset(), null, null);
"http://localhost:8080?param1=value1&param2=value2&param2=value3", Charset.defaultCharset(), null);
assertEquals(Method.GET, request.method());
assertEquals("GET", request.rawMethod());
assertEquals("valueh1v1", request.firstHeader("header1").asString());
Expand All @@ -40,7 +40,7 @@ public void requestHeaderProcessingIsCaseInsensitive() {
defaultFullHttpRequest.headers().add("HEADER2", "valueh2v1");

ServerRequestImpl request = ServerRequestBuilder.buildServerRequest(defaultFullHttpRequest,
Observable.empty(), null);
Observable.empty());
assertEquals(Method.GET, request.method());
assertEquals("GET", request.rawMethod());
assertEquals("valueh1v1", request.firstHeader("header1").asString());
Expand All @@ -52,7 +52,7 @@ public void requestHeaderProcessingIsCaseInsensitive() {
defaultFullHttpRequest.headers().add("header1", "valueh1v2");
defaultFullHttpRequest.headers().add("header2", "valueh2v1");

request = ServerRequestBuilder.buildServerRequest(defaultFullHttpRequest, Observable.empty(), null);
request = ServerRequestBuilder.buildServerRequest(defaultFullHttpRequest, Observable.empty());
assertEquals("valueh1v1", request.firstHeader("header1").asString());
assertEquals("valueh2v1", request.firstHeader("header2").asString());
}
Expand Down

0 comments on commit c46e81f

Please sign in to comment.