diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingCompletableObserverImpl.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingCompletableObserverImpl.java index dc12b25c2..a122dbd63 100755 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingCompletableObserverImpl.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingCompletableObserverImpl.java @@ -21,8 +21,7 @@ import io.reactivex.Maybe; import io.reactivex.disposables.Disposable; import io.reactivex.disposables.Disposables; -import io.reactivex.functions.BiConsumer; -import io.reactivex.functions.Consumer; +import io.reactivex.observers.DisposableMaybeObserver; import java.util.concurrent.atomic.AtomicReference; final class AutoDisposingCompletableObserverImpl implements AutoDisposingCompletableObserver { @@ -43,20 +42,23 @@ final class AutoDisposingCompletableObserverImpl implements AutoDisposingComplet @Override public void onSubscribe(final Disposable d) { if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, - lifecycle.doOnEvent(new BiConsumer() { - @Override public void accept(Object o, Throwable throwable) throws Exception { + lifecycle.subscribeWith(new DisposableMaybeObserver() { + @Override public void onSuccess(Object o) { callMainSubscribeIfNecessary(d); + AutoDisposingCompletableObserverImpl.this.dispose(); } - }) - .subscribe(new Consumer() { - @Override public void accept(Object o) throws Exception { - dispose(); - } - }, new Consumer() { - @Override public void accept(Throwable e1) throws Exception { - onError(e1); - } - }), getClass())) { + + @Override public void onError(Throwable e) { + callMainSubscribeIfNecessary(d); + AutoDisposingCompletableObserverImpl.this.onError(e); + } + + @Override public void onComplete() { + callMainSubscribeIfNecessary(d); + // Noop - we're unbound now + } + }), + getClass())) { if (AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass())) { delegate.onSubscribe(this); } @@ -81,7 +83,7 @@ private void lazyDispose() { } } - /* private */ + @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessors void callMainSubscribeIfNecessary(Disposable d) { // If we've never actually called the downstream onSubscribe (i.e. requested immediately in // onSubscribe and had a terminal event), we need to still send an empty disposable instance diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingMaybeObserverImpl.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingMaybeObserverImpl.java index 5c594cacf..f9bf50501 100755 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingMaybeObserverImpl.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingMaybeObserverImpl.java @@ -21,8 +21,7 @@ import io.reactivex.MaybeObserver; import io.reactivex.disposables.Disposable; import io.reactivex.disposables.Disposables; -import io.reactivex.functions.BiConsumer; -import io.reactivex.functions.Consumer; +import io.reactivex.observers.DisposableMaybeObserver; import java.util.concurrent.atomic.AtomicReference; final class AutoDisposingMaybeObserverImpl implements AutoDisposingMaybeObserver { @@ -43,20 +42,23 @@ final class AutoDisposingMaybeObserverImpl implements AutoDisposingMaybeObser @Override public void onSubscribe(final Disposable d) { if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, - lifecycle.doOnEvent(new BiConsumer() { - @Override public void accept(Object o, Throwable throwable) throws Exception { + lifecycle.subscribeWith(new DisposableMaybeObserver() { + @Override public void onSuccess(Object o) { callMainSubscribeIfNecessary(d); + AutoDisposingMaybeObserverImpl.this.dispose(); } - }) - .subscribe(new Consumer() { - @Override public void accept(Object o) throws Exception { - dispose(); - } - }, new Consumer() { - @Override public void accept(Throwable e1) throws Exception { - onError(e1); - } - }), getClass())) { + + @Override public void onError(Throwable e) { + callMainSubscribeIfNecessary(d); + AutoDisposingMaybeObserverImpl.this.onError(e); + } + + @Override public void onComplete() { + callMainSubscribeIfNecessary(d); + // Noop - we're unbound now + } + }), + getClass())) { if (AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass())) { delegate.onSubscribe(this); } @@ -81,7 +83,7 @@ private void lazyDispose() { } } - /* private */ + @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessors void callMainSubscribeIfNecessary(Disposable d) { // If we've never actually called the downstream onSubscribe (i.e. requested immediately in // onSubscribe and had a terminal event), we need to still send an empty disposable instance diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingObserverImpl.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingObserverImpl.java index 8f5001ba7..82ddd5373 100755 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingObserverImpl.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingObserverImpl.java @@ -21,8 +21,7 @@ import io.reactivex.Observer; import io.reactivex.disposables.Disposable; import io.reactivex.disposables.Disposables; -import io.reactivex.functions.BiConsumer; -import io.reactivex.functions.Consumer; +import io.reactivex.observers.DisposableMaybeObserver; import java.util.concurrent.atomic.AtomicReference; final class AutoDisposingObserverImpl implements AutoDisposingObserver { @@ -43,20 +42,23 @@ final class AutoDisposingObserverImpl implements AutoDisposingObserver { @Override public void onSubscribe(final Disposable d) { if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, - lifecycle.doOnEvent(new BiConsumer() { - @Override public void accept(Object o, Throwable throwable) throws Exception { + lifecycle.subscribeWith(new DisposableMaybeObserver() { + @Override public void onSuccess(Object o) { callMainSubscribeIfNecessary(d); + AutoDisposingObserverImpl.this.dispose(); } - }) - .subscribe(new Consumer() { - @Override public void accept(Object o) throws Exception { - dispose(); - } - }, new Consumer() { - @Override public void accept(Throwable e) throws Exception { - AutoDisposingObserverImpl.this.onError(e); - } - }), getClass())) { + + @Override public void onError(Throwable e) { + callMainSubscribeIfNecessary(d); + AutoDisposingObserverImpl.this.onError(e); + } + + @Override public void onComplete() { + callMainSubscribeIfNecessary(d); + // Noop - we're unbound now + } + }), + getClass())) { if (AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass())) { delegate.onSubscribe(this); } @@ -81,7 +83,7 @@ private void lazyDispose() { } } - /* private */ + @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessors void callMainSubscribeIfNecessary(Disposable d) { // If we've never actually called the downstream onSubscribe (i.e. requested immediately in // onSubscribe and had a terminal event), we need to still send an empty disposable instance diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSingleObserverImpl.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSingleObserverImpl.java index cefebb4fd..3aee9157b 100755 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSingleObserverImpl.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSingleObserverImpl.java @@ -21,8 +21,7 @@ import io.reactivex.SingleObserver; import io.reactivex.disposables.Disposable; import io.reactivex.disposables.Disposables; -import io.reactivex.functions.BiConsumer; -import io.reactivex.functions.Consumer; +import io.reactivex.observers.DisposableMaybeObserver; import java.util.concurrent.atomic.AtomicReference; final class AutoDisposingSingleObserverImpl implements AutoDisposingSingleObserver { @@ -43,20 +42,23 @@ final class AutoDisposingSingleObserverImpl implements AutoDisposingSingleObs @Override public void onSubscribe(final Disposable d) { if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, - lifecycle.doOnEvent(new BiConsumer() { - @Override public void accept(Object o, Throwable throwable) throws Exception { + lifecycle.subscribeWith(new DisposableMaybeObserver() { + @Override public void onSuccess(Object o) { callMainSubscribeIfNecessary(d); + AutoDisposingSingleObserverImpl.this.dispose(); } - }) - .subscribe(new Consumer() { - @Override public void accept(Object o) throws Exception { - dispose(); - } - }, new Consumer() { - @Override public void accept(Throwable e) throws Exception { - AutoDisposingSingleObserverImpl.this.onError(e); - } - }), getClass())) { + + @Override public void onError(Throwable e) { + callMainSubscribeIfNecessary(d); + AutoDisposingSingleObserverImpl.this.onError(e); + } + + @Override public void onComplete() { + callMainSubscribeIfNecessary(d); + // Noop - we're unbound now + } + }), + getClass())) { if (AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass())) { delegate.onSubscribe(this); } @@ -81,7 +83,7 @@ private void lazyDispose() { } } - /* private */ + @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessors void callMainSubscribeIfNecessary(Disposable d) { // If we've never actually called the downstream onSubscribe (i.e. requested immediately in // onSubscribe and had a terminal event), we need to still send an empty disposable instance diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSubscriberImpl.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSubscriberImpl.java index 873b98b0e..45647d581 100755 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSubscriberImpl.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSubscriberImpl.java @@ -19,9 +19,8 @@ import com.uber.autodispose.observers.AutoDisposingSubscriber; import io.reactivex.Maybe; import io.reactivex.disposables.Disposable; -import io.reactivex.functions.BiConsumer; -import io.reactivex.functions.Consumer; import io.reactivex.internal.subscriptions.EmptySubscription; +import io.reactivex.observers.DisposableMaybeObserver; import java.util.concurrent.atomic.AtomicReference; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -44,20 +43,23 @@ final class AutoDisposingSubscriberImpl implements AutoDisposingSubscriber @Override public void onSubscribe(final Subscription s) { if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, - lifecycle.doOnEvent(new BiConsumer() { - @Override public void accept(Object o, Throwable throwable) throws Exception { + lifecycle.subscribeWith(new DisposableMaybeObserver() { + @Override public void onSuccess(Object o) { callMainSubscribeIfNecessary(s); + AutoDisposingSubscriberImpl.this.dispose(); } - }) - .subscribe(new Consumer() { - @Override public void accept(Object o) throws Exception { - dispose(); - } - }, new Consumer() { - @Override public void accept(Throwable e) throws Exception { - AutoDisposingSubscriberImpl.this.onError(e); - } - }), getClass())) { + + @Override public void onError(Throwable e) { + callMainSubscribeIfNecessary(s); + AutoDisposingSubscriberImpl.this.onError(e); + } + + @Override public void onComplete() { + callMainSubscribeIfNecessary(s); + // Noop - we're unbound now + } + }), + getClass())) { if (AutoDisposeEndConsumerHelper.setOnce(mainSubscription, s, getClass())) { delegate.onSubscribe(this); } @@ -99,7 +101,7 @@ private void lazyCancel() { } } - /* private */ + @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessors void callMainSubscribeIfNecessary(Subscription s) { // If we've never actually started the upstream subscription (i.e. requested immediately in // onSubscribe and had a terminal event), we need to still send an empty subscription instance diff --git a/autodispose/src/main/java/com/uber/autodispose/TestScopeProvider.java b/autodispose/src/main/java/com/uber/autodispose/TestScopeProvider.java index d3e535403..4da06bca1 100644 --- a/autodispose/src/main/java/com/uber/autodispose/TestScopeProvider.java +++ b/autodispose/src/main/java/com/uber/autodispose/TestScopeProvider.java @@ -47,6 +47,16 @@ public static TestScopeProvider create(Maybe delegate) { return new TestScopeProvider(delegate); } + /** + * Creates a new provider that is "unbound", e.g. will emit a completion event to signal that the + * scope is unbound. + * + * @return the created TestScopeProvider + */ + public static TestScopeProvider unbound() { + return create(Maybe.empty()); + } + private final MaybeSubject innerMaybe = MaybeSubject.create(); private TestScopeProvider(Maybe delegate) {