Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lint suppressions and use custom observers in lifecycle subscriptions #108

Merged
merged 4 commits into from Oct 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -21,8 +21,7 @@
import io.reactivex.Maybe;
import io.reactivex.disposables.Disposable;
import io.reactivex.disposables.Disposables;
import io.reactivex.functions.BiConsumer;
import io.reactivex.functions.Consumer;
import io.reactivex.observers.DisposableMaybeObserver;
import java.util.concurrent.atomic.AtomicReference;

final class AutoDisposingCompletableObserverImpl implements AutoDisposingCompletableObserver {
Expand All @@ -43,20 +42,23 @@ final class AutoDisposingCompletableObserverImpl implements AutoDisposingComplet

@Override public void onSubscribe(final Disposable d) {
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable,
lifecycle.doOnEvent(new BiConsumer<Object, Throwable>() {
@Override public void accept(Object o, Throwable throwable) throws Exception {
lifecycle.subscribeWith(new DisposableMaybeObserver<Object>() {
@Override public void onSuccess(Object o) {
callMainSubscribeIfNecessary(d);
AutoDisposingCompletableObserverImpl.this.dispose();
}
})
.subscribe(new Consumer<Object>() {
@Override public void accept(Object o) throws Exception {
dispose();
}
}, new Consumer<Throwable>() {
@Override public void accept(Throwable e1) throws Exception {
onError(e1);
}
}), getClass())) {

@Override public void onError(Throwable e) {
callMainSubscribeIfNecessary(d);
AutoDisposingCompletableObserverImpl.this.onError(e);
}

@Override public void onComplete() {
callMainSubscribeIfNecessary(d);
// Noop - we're unbound now
}
}),
getClass())) {
if (AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass())) {
delegate.onSubscribe(this);
}
Expand All @@ -81,7 +83,7 @@ private void lazyDispose() {
}
}

/* private */
@SuppressWarnings("WeakerAccess") // Avoiding synthetic accessors
void callMainSubscribeIfNecessary(Disposable d) {
// If we've never actually called the downstream onSubscribe (i.e. requested immediately in
// onSubscribe and had a terminal event), we need to still send an empty disposable instance
Expand Down
Expand Up @@ -21,8 +21,7 @@
import io.reactivex.MaybeObserver;
import io.reactivex.disposables.Disposable;
import io.reactivex.disposables.Disposables;
import io.reactivex.functions.BiConsumer;
import io.reactivex.functions.Consumer;
import io.reactivex.observers.DisposableMaybeObserver;
import java.util.concurrent.atomic.AtomicReference;

final class AutoDisposingMaybeObserverImpl<T> implements AutoDisposingMaybeObserver<T> {
Expand All @@ -43,20 +42,23 @@ final class AutoDisposingMaybeObserverImpl<T> implements AutoDisposingMaybeObser

@Override public void onSubscribe(final Disposable d) {
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable,
lifecycle.doOnEvent(new BiConsumer<Object, Throwable>() {
@Override public void accept(Object o, Throwable throwable) throws Exception {
lifecycle.subscribeWith(new DisposableMaybeObserver<Object>() {
@Override public void onSuccess(Object o) {
callMainSubscribeIfNecessary(d);
AutoDisposingMaybeObserverImpl.this.dispose();
}
})
.subscribe(new Consumer<Object>() {
@Override public void accept(Object o) throws Exception {
dispose();
}
}, new Consumer<Throwable>() {
@Override public void accept(Throwable e1) throws Exception {
onError(e1);
}
}), getClass())) {

@Override public void onError(Throwable e) {
callMainSubscribeIfNecessary(d);
AutoDisposingMaybeObserverImpl.this.onError(e);
}

@Override public void onComplete() {
callMainSubscribeIfNecessary(d);
// Noop - we're unbound now
}
}),
getClass())) {
if (AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass())) {
delegate.onSubscribe(this);
}
Expand All @@ -81,7 +83,7 @@ private void lazyDispose() {
}
}

/* private */
@SuppressWarnings("WeakerAccess") // Avoiding synthetic accessors
void callMainSubscribeIfNecessary(Disposable d) {
// If we've never actually called the downstream onSubscribe (i.e. requested immediately in
// onSubscribe and had a terminal event), we need to still send an empty disposable instance
Expand Down
Expand Up @@ -21,8 +21,7 @@
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.disposables.Disposables;
import io.reactivex.functions.BiConsumer;
import io.reactivex.functions.Consumer;
import io.reactivex.observers.DisposableMaybeObserver;
import java.util.concurrent.atomic.AtomicReference;

final class AutoDisposingObserverImpl<T> implements AutoDisposingObserver<T> {
Expand All @@ -43,20 +42,23 @@ final class AutoDisposingObserverImpl<T> implements AutoDisposingObserver<T> {

@Override public void onSubscribe(final Disposable d) {
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable,
lifecycle.doOnEvent(new BiConsumer<Object, Throwable>() {
@Override public void accept(Object o, Throwable throwable) throws Exception {
lifecycle.subscribeWith(new DisposableMaybeObserver<Object>() {
@Override public void onSuccess(Object o) {
callMainSubscribeIfNecessary(d);
AutoDisposingObserverImpl.this.dispose();
}
})
.subscribe(new Consumer<Object>() {
@Override public void accept(Object o) throws Exception {
dispose();
}
}, new Consumer<Throwable>() {
@Override public void accept(Throwable e) throws Exception {
AutoDisposingObserverImpl.this.onError(e);
}
}), getClass())) {

@Override public void onError(Throwable e) {
callMainSubscribeIfNecessary(d);
AutoDisposingObserverImpl.this.onError(e);
}

@Override public void onComplete() {
callMainSubscribeIfNecessary(d);
// Noop - we're unbound now
}
}),
getClass())) {
if (AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass())) {
delegate.onSubscribe(this);
}
Expand All @@ -81,7 +83,7 @@ private void lazyDispose() {
}
}

/* private */
@SuppressWarnings("WeakerAccess") // Avoiding synthetic accessors
void callMainSubscribeIfNecessary(Disposable d) {
// If we've never actually called the downstream onSubscribe (i.e. requested immediately in
// onSubscribe and had a terminal event), we need to still send an empty disposable instance
Expand Down
Expand Up @@ -21,8 +21,7 @@
import io.reactivex.SingleObserver;
import io.reactivex.disposables.Disposable;
import io.reactivex.disposables.Disposables;
import io.reactivex.functions.BiConsumer;
import io.reactivex.functions.Consumer;
import io.reactivex.observers.DisposableMaybeObserver;
import java.util.concurrent.atomic.AtomicReference;

final class AutoDisposingSingleObserverImpl<T> implements AutoDisposingSingleObserver<T> {
Expand All @@ -43,20 +42,23 @@ final class AutoDisposingSingleObserverImpl<T> implements AutoDisposingSingleObs

@Override public void onSubscribe(final Disposable d) {
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable,
lifecycle.doOnEvent(new BiConsumer<Object, Throwable>() {
@Override public void accept(Object o, Throwable throwable) throws Exception {
lifecycle.subscribeWith(new DisposableMaybeObserver<Object>() {
@Override public void onSuccess(Object o) {
callMainSubscribeIfNecessary(d);
AutoDisposingSingleObserverImpl.this.dispose();
}
})
.subscribe(new Consumer<Object>() {
@Override public void accept(Object o) throws Exception {
dispose();
}
}, new Consumer<Throwable>() {
@Override public void accept(Throwable e) throws Exception {
AutoDisposingSingleObserverImpl.this.onError(e);
}
}), getClass())) {

@Override public void onError(Throwable e) {
callMainSubscribeIfNecessary(d);
AutoDisposingSingleObserverImpl.this.onError(e);
}

@Override public void onComplete() {
callMainSubscribeIfNecessary(d);
// Noop - we're unbound now
}
}),
getClass())) {
if (AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass())) {
delegate.onSubscribe(this);
}
Expand All @@ -81,7 +83,7 @@ private void lazyDispose() {
}
}

/* private */
@SuppressWarnings("WeakerAccess") // Avoiding synthetic accessors
void callMainSubscribeIfNecessary(Disposable d) {
// If we've never actually called the downstream onSubscribe (i.e. requested immediately in
// onSubscribe and had a terminal event), we need to still send an empty disposable instance
Expand Down
Expand Up @@ -19,9 +19,8 @@
import com.uber.autodispose.observers.AutoDisposingSubscriber;
import io.reactivex.Maybe;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.BiConsumer;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.subscriptions.EmptySubscription;
import io.reactivex.observers.DisposableMaybeObserver;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
Expand All @@ -44,20 +43,23 @@ final class AutoDisposingSubscriberImpl<T> implements AutoDisposingSubscriber<T>

@Override public void onSubscribe(final Subscription s) {
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable,
lifecycle.doOnEvent(new BiConsumer<Object, Throwable>() {
@Override public void accept(Object o, Throwable throwable) throws Exception {
lifecycle.subscribeWith(new DisposableMaybeObserver<Object>() {
@Override public void onSuccess(Object o) {
callMainSubscribeIfNecessary(s);
AutoDisposingSubscriberImpl.this.dispose();
}
})
.subscribe(new Consumer<Object>() {
@Override public void accept(Object o) throws Exception {
dispose();
}
}, new Consumer<Throwable>() {
@Override public void accept(Throwable e) throws Exception {
AutoDisposingSubscriberImpl.this.onError(e);
}
}), getClass())) {

@Override public void onError(Throwable e) {
callMainSubscribeIfNecessary(s);
AutoDisposingSubscriberImpl.this.onError(e);
}

@Override public void onComplete() {
callMainSubscribeIfNecessary(s);
// Noop - we're unbound now
}
}),
getClass())) {
if (AutoDisposeEndConsumerHelper.setOnce(mainSubscription, s, getClass())) {
delegate.onSubscribe(this);
}
Expand Down Expand Up @@ -99,7 +101,7 @@ private void lazyCancel() {
}
}

/* private */
@SuppressWarnings("WeakerAccess") // Avoiding synthetic accessors
void callMainSubscribeIfNecessary(Subscription s) {
// If we've never actually started the upstream subscription (i.e. requested immediately in
// onSubscribe and had a terminal event), we need to still send an empty subscription instance
Expand Down
Expand Up @@ -47,6 +47,16 @@ public static TestScopeProvider create(Maybe<?> delegate) {
return new TestScopeProvider(delegate);
}

/**
* Creates a new provider that is "unbound", e.g. will emit a completion event to signal that the
* scope is unbound.
*
* @return the created TestScopeProvider
*/
public static TestScopeProvider unbound() {
return create(Maybe.empty());
}

private final MaybeSubject<Object> innerMaybe = MaybeSubject.create();

private TestScopeProvider(Maybe<?> delegate) {
Expand Down