diff --git a/sqlbrite/src/androidTest/java/com/squareup/sqlbrite/QueryObservableTest.java b/sqlbrite/src/androidTest/java/com/squareup/sqlbrite/QueryObservableTest.java new file mode 100644 index 00000000..5d5ac6ca --- /dev/null +++ b/sqlbrite/src/androidTest/java/com/squareup/sqlbrite/QueryObservableTest.java @@ -0,0 +1,64 @@ +package com.squareup.sqlbrite; + +import android.database.Cursor; +import android.database.MatrixCursor; +import android.support.test.runner.AndroidJUnit4; + +import com.squareup.sqlbrite.SqlBrite.Query; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import rx.Observable; +import rx.functions.Func1; +import rx.observers.TestSubscriber; + +import static com.google.common.truth.Truth.assertThat; + +@RunWith(AndroidJUnit4.class) +public final class QueryObservableTest { + + @Test public void mapToListThrowsFromQueryRun() { + TestSubscriber testSubscriber = new TestSubscriber<>(); + + new QueryObservable(Observable.just(new Query() { + @Override public Cursor run() { + throw new IllegalStateException("test exception"); + } + })).mapToList(new Func1() { + @Override public Object call(Cursor cursor) { + throw new AssertionError("Must not be called"); + } + }).subscribe(testSubscriber); + + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertNoValues(); + assertThat(testSubscriber.getOnErrorEvents()).hasSize(1); + + IllegalStateException expected = (IllegalStateException) testSubscriber.getOnErrorEvents().get(0); + assertThat(expected).hasMessage("test exception"); + } + + @Test public void mapToListThrowsFromMapFunction() { + TestSubscriber testSubscriber = new TestSubscriber<>(); + + new QueryObservable(Observable.just(new Query() { + @Override public Cursor run() { + MatrixCursor cursor = new MatrixCursor(new String[]{"col1"}); + cursor.addRow(new Object[]{"value1"}); + return cursor; + } + })).mapToList(new Func1() { + @Override public Object call(Cursor cursor) { + throw new IllegalStateException("test exception"); + } + }).subscribe(testSubscriber); + + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertNoValues(); + assertThat(testSubscriber.getOnErrorEvents()).hasSize(1); + + IllegalStateException expected = (IllegalStateException) testSubscriber.getOnErrorEvents().get(0); + assertThat(expected).hasMessage("test exception"); + } +} diff --git a/sqlbrite/src/main/java/com/squareup/sqlbrite/QueryObservable.java b/sqlbrite/src/main/java/com/squareup/sqlbrite/QueryObservable.java index 4a5af1d6..bb5ced05 100644 --- a/sqlbrite/src/main/java/com/squareup/sqlbrite/QueryObservable.java +++ b/sqlbrite/src/main/java/com/squareup/sqlbrite/QueryObservable.java @@ -8,6 +8,8 @@ import java.util.List; import rx.Observable; import rx.Subscriber; +import rx.exceptions.Exceptions; +import rx.exceptions.OnErrorThrowable; import rx.functions.Func1; /** An {@link Observable} of {@link Query} which offers query-specific convenience operators. */ @@ -41,17 +43,22 @@ public final Observable> mapToList(@NonNull final Func1 m public Subscriber call(final Subscriber> subscriber) { return new Subscriber(subscriber) { @Override public void onNext(Query query) { - Cursor cursor = query.run(); - List items = new ArrayList<>(cursor.getCount()); try { - while (cursor.moveToNext() && !subscriber.isUnsubscribed()) { - items.add(mapper.call(cursor)); + Cursor cursor = query.run(); + List items = new ArrayList<>(cursor.getCount()); + try { + while (cursor.moveToNext() && !subscriber.isUnsubscribed()) { + items.add(mapper.call(cursor)); + } + } finally { + cursor.close(); } - } finally { - cursor.close(); - } - if (!subscriber.isUnsubscribed()) { - subscriber.onNext(items); + if (!subscriber.isUnsubscribed()) { + subscriber.onNext(items); + } + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + onError(OnErrorThrowable.addValueAsLastCause(e, query)); } }