Skip to content

Commit

Permalink
Bump up version of rxjava to get .fromEmitter
Browse files Browse the repository at this point in the history
Re-implement streaming entities using fromEmitter
  • Loading branch information
rchodava committed Oct 11, 2016
1 parent 19b738b commit 599e7b6
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@
import foundation.stack.datamill.http.Status;
import foundation.stack.datamill.json.Json;
import foundation.stack.datamill.values.StringValue;
import rx.Emitter;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.functions.Func1;
import rx.subjects.ReplaySubject;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
Expand Down Expand Up @@ -88,46 +87,45 @@ public Response ok(byte[] content) {

@Override
public ResponseBuilder streamingBodyAsBufferChunks(Func1<Observer<ByteBuffer>, Observable<ByteBuffer>> bodyStreamer) {
ReplaySubject<ByteBuffer> bodySubject = ReplaySubject.create();

Subscription[] bodyStreamerSubscription = new Subscription[1];

Observable<ByteBuffer> disposingSubject = Observable.using(() -> null,
__ -> bodySubject,
__ -> {
if (bodyStreamerSubscription[0] != null && !bodyStreamerSubscription[0].isUnsubscribed()) {
bodyStreamerSubscription[0].unsubscribe();
}
});

streamingBodyThreadPool.execute(() ->
bodyStreamerSubscription[0] = bodyStreamer.call(bodySubject)
.doOnNext(buffer -> bodySubject.onNext(buffer))
.doOnCompleted(() -> bodySubject.onCompleted())
.subscribe());

this.body = new StreamedChunksBody(disposingSubject, Charset.defaultCharset());
Observable<ByteBuffer> chunkStream = Observable.fromEmitter(emitter -> {
streamingBodyThreadPool.execute(() -> {
bodyStreamer.call(new PassthroughObserver<>(emitter))
.doOnNext(buffer -> emitter.onNext(buffer))
.doOnError(e -> emitter.onError(e))
.doOnCompleted(() -> emitter.onCompleted())
.subscribe();
});
}, Emitter.BackpressureMode.BUFFER);

this.body = new StreamedChunksBody(chunkStream, Charset.defaultCharset());
return this;
}

@Override
public ResponseBuilder streamingBody(Func1<Observer<byte[]>, Observable<byte[]>> bodyStreamer) {
return streamingBodyAsBufferChunks(body -> bodyStreamer.call(new DelegatingObserver<byte[], ByteBuffer>(body) {
@Override
protected ByteBuffer map(byte[] bytes) {
return ByteBuffer.wrap(bytes);
}
}).map(bytes -> ByteBuffer.wrap(bytes)));
return streamingBodyAsBufferChunks(body -> bodyStreamer.call(
new DelegatingObserver<byte[], ByteBuffer>(body) {
@Override
protected ByteBuffer map(byte[] bytes) {
return ByteBuffer.wrap(bytes);
}
}).map(bytes -> ByteBuffer.wrap(bytes)));
}

@Override
public ResponseBuilder streamingJson(Func1<Observer<Json>, Observable<Json>> jsonStreamer) {
return streamingBody(body -> {
JsonStreamer streamer = new JsonStreamer(body);
jsonStreamer.call(streamer).subscribe(streamer);

return Observable.empty();
});
return streamingBody(body ->
Observable.fromEmitter(emitter -> {
JsonStreamer streamer = new JsonStreamer(emitter);
jsonStreamer.call(streamer)
.doOnNext(json -> streamer.onNext(json))
.doOnCompleted(() -> {
emitter.onNext("]".getBytes());
emitter.onCompleted();
})
.doOnError(e -> streamer.onError(e))
.subscribe();
}, Emitter.BackpressureMode.BUFFER));
}

@Override
Expand Down Expand Up @@ -155,6 +153,28 @@ public Response conflict(String content) {
return new ResponseImpl(Status.CONFLICT, headers, new ValueBody(new StringValue(content)));
}

private static class PassthroughObserver<T> implements Observer<T> {
private final Observer<T> target;

PassthroughObserver(Observer<T> target) {
this.target = target;
}

@Override
public void onNext(T t) {
target.onNext(t);
}

@Override
public void onError(Throwable e) {
target.onError(e);
}

@Override
public void onCompleted() {
}
}

private static abstract class DelegatingObserver<S, T> implements Observer<S> {
private final Observer<T> target;

Expand Down Expand Up @@ -191,7 +211,7 @@ private static class JsonStreamer implements Observer<Json> {

@Override
public void onNext(Json json) {
String out = first ? "[" : ",";;
String out = first ? "[" : ",";
first = false;

body.onNext((out + json.toString()).getBytes());
Expand All @@ -204,8 +224,6 @@ public void onError(Throwable e) {

@Override
public void onCompleted() {
body.onNext("]".getBytes());
body.onCompleted();
}
}
}
2 changes: 1 addition & 1 deletion parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>1.1.3</version>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
Expand Down

0 comments on commit 599e7b6

Please sign in to comment.