Skip to content

Commit

Permalink
Merge 07c73d3 into d6d4db5
Browse files Browse the repository at this point in the history
  • Loading branch information
brianchung808 committed Nov 28, 2016
2 parents d6d4db5 + 07c73d3 commit 18601d8
Show file tree
Hide file tree
Showing 15 changed files with 60 additions and 46 deletions.
2 changes: 1 addition & 1 deletion libraries.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ ext {
springBootWeb : 'org.springframework.boot:spring-boot-starter-web:1.3.1.RELEASE',
springBootTest : 'org.springframework.boot:spring-boot-starter-test:1.3.1.RELEASE',

rxJava : 'io.reactivex:rxjava:1.0.14',
rxJava : 'io.reactivex.rxjava2:rxjava:2.0.0',

junit : 'junit:junit:4.12',
mockito : 'org.mockito:mockito-all:1.10.19'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
*/
package io.jmnarloch.spring.boot.rxjava.async;

import io.reactivex.Observable;
import io.reactivex.observers.DisposableObserver;
import org.springframework.web.context.request.async.DeferredResult;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;

/**
* A subscriber that sets the single value produced by the {@link Observable} on the {@link DeferredResult}.
Expand All @@ -27,17 +26,15 @@
* @author Robert Danci
* @see DeferredResult
*/
class DeferredResultSubscriber<T> extends Subscriber<T> implements Runnable {
class DeferredResultObserver<T> extends DisposableObserver<T> implements Runnable {

private final DeferredResult<T> deferredResult;

private final Subscription subscription;

public DeferredResultSubscriber(Observable<T> observable, DeferredResult<T> deferredResult) {
public DeferredResultObserver(Observable<T> observable, DeferredResult<T> deferredResult) {
this.deferredResult = deferredResult;
this.deferredResult.onTimeout(this);
this.deferredResult.onCompletion(this);
this.subscription = observable.subscribe(this);
observable.subscribe(this);
}

@Override
Expand All @@ -51,11 +48,11 @@ public void onError(Throwable e) {
}

@Override
public void onCompleted() {
public void onComplete() {
}

@Override
public void run() {
this.subscription.unsubscribe();
this.dispose();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import org.springframework.util.Assert;
import org.springframework.web.context.request.async.DeferredResult;
import rx.Observable;
import io.reactivex.Observable;

import java.util.List;

Expand All @@ -31,7 +31,7 @@ public class ObservableDeferredResult<T> extends DeferredResult<List<T>> {

private static final Object EMPTY_RESULT = new Object();

private final DeferredResultSubscriber<List<T>> subscriber;
private final DeferredResultObserver<List<T>> subscriber;

public ObservableDeferredResult(Observable<T> observable) {
this(null, EMPTY_RESULT, observable);
Expand All @@ -45,6 +45,6 @@ public ObservableDeferredResult(Long timeout, Object timeoutResult, Observable<T
super(timeout, timeoutResult);
Assert.notNull(observable, "observable can not be null");

subscriber = new DeferredResultSubscriber<List<T>>(observable.toList(), this);
subscriber = new DeferredResultObserver<List<T>>(observable.toList().toObservable(), this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import rx.Observable;
import io.reactivex.Observable;

/**
* A specialized {@link SseEmitter} that handles {@link Observable} types. The emitter subscribes to the
Expand All @@ -28,7 +28,7 @@
*/
public class ObservableSseEmitter<T> extends SseEmitter {

private final ResponseBodyEmitterSubscriber<T> subscriber;
private final ResponseBodyEmitterObserver<T> subscriber;

public ObservableSseEmitter(Observable<T> observable) {
this(null, observable);
Expand All @@ -40,6 +40,6 @@ public ObservableSseEmitter(MediaType mediaType, Observable<T> observable) {

public ObservableSseEmitter(Long timeout, MediaType mediaType, Observable<T> observable) {
super(timeout);
this.subscriber = new ResponseBodyEmitterSubscriber<T>(mediaType, observable, this);
this.subscriber = new ResponseBodyEmitterObserver<T>(mediaType, observable, this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
*/
package io.jmnarloch.spring.boot.rxjava.async;

import io.reactivex.Observable;
import io.reactivex.observers.DisposableObserver;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;

import java.io.IOException;

Expand All @@ -29,23 +28,21 @@
*
* @author Jakub Narloch
*/
class ResponseBodyEmitterSubscriber<T> extends Subscriber<T> implements Runnable {
class ResponseBodyEmitterObserver<T> extends DisposableObserver<T> implements Runnable {

private final MediaType mediaType;

private final Subscription subscription;

private final ResponseBodyEmitter responseBodyEmitter;

private boolean completed;

public ResponseBodyEmitterSubscriber(MediaType mediaType, Observable<T> observable, ResponseBodyEmitter responseBodyEmitter) {
public ResponseBodyEmitterObserver(MediaType mediaType, Observable<T> observable, ResponseBodyEmitter responseBodyEmitter) {

this.mediaType = mediaType;
this.responseBodyEmitter = responseBodyEmitter;
this.responseBodyEmitter.onTimeout(this);
this.responseBodyEmitter.onCompletion(this);
this.subscription = observable.subscribe(this);
observable.subscribe(this);
}

@Override
Expand All @@ -66,7 +63,7 @@ public void onError(Throwable e) {
}

@Override
public void onCompleted() {
public void onComplete() {
if(!completed) {
completed = true;
responseBodyEmitter.complete();
Expand All @@ -75,6 +72,6 @@ public void onCompleted() {

@Override
public void run() {
subscription.unsubscribe();
this.dispose();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import org.springframework.util.Assert;
import org.springframework.web.context.request.async.DeferredResult;
import rx.Single;
import io.reactivex.Single;

/**
* A specialized {@link DeferredResult} that handles {@link Single} return type.
Expand All @@ -29,7 +29,7 @@ public class SingleDeferredResult<T> extends DeferredResult<T> {

private static final Object EMPTY_RESULT = new Object();

private final DeferredResultSubscriber<T> subscriber;
private final DeferredResultObserver<T> subscriber;

public SingleDeferredResult(Single<T> single) {
this(null, EMPTY_RESULT, single);
Expand All @@ -43,6 +43,6 @@ public SingleDeferredResult(Long timeout, Object timeoutResult, Single<T> single
super(timeout, timeoutResult);
Assert.notNull(single, "single can not be null");

subscriber = new DeferredResultSubscriber<T>(single.toObservable(), this);
subscriber = new DeferredResultObserver<T>(single.toObservable(), this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.springframework.web.method.support.HandlerMethodReturnValueHandler;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;
import rx.Observable;
import rx.Single;
import io.reactivex.Observable;
import io.reactivex.Single;

import java.util.ArrayList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.springframework.web.context.request.async.WebAsyncUtils;
import org.springframework.web.method.support.AsyncHandlerMethodReturnValueHandler;
import org.springframework.web.method.support.ModelAndViewContainer;
import rx.Observable;
import io.reactivex.Observable;

/**
* A specialized {@link AsyncHandlerMethodReturnValueHandler} that handles {@link Observable} return types.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.springframework.web.context.request.async.WebAsyncUtils;
import org.springframework.web.method.support.AsyncHandlerMethodReturnValueHandler;
import org.springframework.web.method.support.ModelAndViewContainer;
import rx.Single;
import io.reactivex.Single;

/**
* A specialized {@link AsyncHandlerMethodReturnValueHandler} that handles {@link Single} return types.
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/io/jmnarloch/spring/boot/rxjava/Demo.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import rx.Observable;
import io.reactivex.Observable;

import java.util.Date;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import rx.Observable;
import rx.functions.Func1;
import io.reactivex.Observable;
import io.reactivex.functions.Function;

import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -101,9 +101,9 @@ public ObservableDeferredResult<Object> error() {

@RequestMapping(method = RequestMethod.GET, value = "/timeout")
public ObservableDeferredResult<String> timeout() {
return new ObservableDeferredResult<String>(Observable.timer(1, TimeUnit.MINUTES).map(new Func1<Long, String>() {
return new ObservableDeferredResult<String>(Observable.timer(1, TimeUnit.MINUTES).map(new Function<Long, String>() {
@Override
public String call(Long aLong) {
public String apply(Long aLong) {
return "single value";
}
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import rx.Observable;
import io.reactivex.Observable;

import java.util.Date;
import java.util.GregorianCalendar;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import rx.Single;
import io.reactivex.Single;

import java.util.Date;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
*/
package io.jmnarloch.spring.boot.rxjava.mvc;

import io.reactivex.Observable;
import io.reactivex.functions.Function;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.IntegrationTest;
Expand All @@ -31,9 +34,12 @@
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import rx.Observable;
import rx.functions.Func1;
import org.springframework.web.method.support.HandlerMethodArgumentResolver;
import org.springframework.web.method.support.HandlerMethodReturnValueHandler;
import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -63,6 +69,20 @@ public class ObservableReturnValueHandlerTest {
@EnableAutoConfiguration
@RestController
protected static class Application {
private @Autowired RequestMappingHandlerAdapter adapter;

// Needed b/c ObservableJust in rxjava2 implements Callable.
// The Callable handler is the first eligible that is picked up.
@PostConstruct
public void prioritizeCustomReturnValueHandlers () {
ArrayList<HandlerMethodReturnValueHandler> returnValueHandlers =
new ArrayList<>(adapter.getReturnValueHandlers());
List<HandlerMethodReturnValueHandler> customReturnValueHandlers =
adapter.getCustomReturnValueHandlers();
returnValueHandlers.removeAll (customReturnValueHandlers);
returnValueHandlers.addAll (0, customReturnValueHandlers);
adapter.setReturnValueHandlers (returnValueHandlers);
}

@RequestMapping(method = RequestMethod.GET, value = "/empty")
public Observable<Void> empty() {
Expand All @@ -86,9 +106,9 @@ public Observable<Object> error() {

@RequestMapping(method = RequestMethod.GET, value = "/timeout")
public Observable<String> timeout() {
return Observable.timer(1, TimeUnit.MINUTES).map(new Func1<Long, String>() {
return Observable.timer(1, TimeUnit.MINUTES).map(new Function<Long, String>() {
@Override
public String call(Long aLong) {
public String apply(Long aLong) {
return "single value";
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import rx.Single;
import io.reactivex.Single;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
Expand Down

0 comments on commit 18601d8

Please sign in to comment.