diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 73a75fa4799..960de28ad68 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -86,6 +86,7 @@ import rx.operators.OperationToObservableIterable; import rx.operators.OperationToObservableList; import rx.operators.OperationToObservableSortedList; +import rx.operators.OperationUsing; import rx.operators.OperationWindow; import rx.operators.OperationZip; import rx.operators.SafeObservableSubscription; @@ -4774,6 +4775,21 @@ public Observable> timeInterval(Scheduler scheduler) { return create(OperationTimeInterval.timeInterval(this, scheduler)); } + /** + * Constructs an observable sequence that depends on a resource object. + * + * @param resourceFactory + * The factory function to obtain a resource object. + * @param observableFactory + * The factory function to obtain an observable sequence that depends on the obtained resource. + * @return + * The observable sequence whose lifetime controls the lifetime of the dependent resource object. + * @see MSDN: Observable.Using + */ + public static Observable using(Func0 resourceFactory, Func1> observableFactory) { + return create(OperationUsing.using(resourceFactory, observableFactory)); + } + /** * Propagates the observable sequence that reacts first. * diff --git a/rxjava-core/src/main/java/rx/operators/OperationUsing.java b/rxjava-core/src/main/java/rx/operators/OperationUsing.java new file mode 100644 index 00000000000..dd0cc38d65f --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationUsing.java @@ -0,0 +1,59 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Subscription; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Func0; +import rx.util.functions.Func1; + +/** + * Constructs an observable sequence that depends on a resource object. + */ +public class OperationUsing { + + public static OnSubscribeFunc using( + final Func0 resourceFactory, + final Func1> observableFactory) { + return new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(Observer observer) { + Subscription resourceSubscription = Subscriptions.empty(); + try { + RESOURCE resource = resourceFactory.call(); + if (resource != null) { + resourceSubscription = resource; + } + Observable observable = observableFactory.call(resource); + SafeObservableSubscription subscription = new SafeObservableSubscription(); + // Use SafeObserver to guarantee resourceSubscription will + // be unsubscribed. + return subscription.wrap(new CompositeSubscription( + observable.subscribe(new SafeObserver( + subscription, observer)), + resourceSubscription)); + } catch (Throwable e) { + resourceSubscription.unsubscribe(); + return Observable. error(e).subscribe(observer); + } + } + }; + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationUsingTest.java b/rxjava-core/src/test/java/rx/operators/OperationUsingTest.java new file mode 100644 index 00000000000..42d31cea9a8 --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperationUsingTest.java @@ -0,0 +1,219 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.junit.Assert.fail; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static rx.operators.OperationUsing.using; + +import org.junit.Test; +import org.mockito.InOrder; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Func0; +import rx.util.functions.Func1; + +public class OperationUsingTest { + + @SuppressWarnings("serial") + private static class TestException extends RuntimeException { + } + + private static interface Resource extends Subscription { + public String getTextFromWeb(); + + @Override + public void unsubscribe(); + } + + @Test + public void testUsing() { + final Resource resource = mock(Resource.class); + when(resource.getTextFromWeb()).thenReturn("Hello world!"); + + Func0 resourceFactory = new Func0() { + @Override + public Resource call() { + return resource; + } + }; + + Func1> observableFactory = new Func1>() { + @Override + public Observable call(Resource resource) { + return Observable.from(resource.getTextFromWeb().split(" ")); + } + }; + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + Observable observable = Observable.create(using( + resourceFactory, observableFactory)); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext("Hello"); + inOrder.verify(observer, times(1)).onNext("world!"); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + + // The resouce should be closed + verify(resource, times(1)).unsubscribe(); + } + + @Test + public void testUsingWithSubscribingTwice() { + // When subscribe is called, a new resource should be created. + Func0 resourceFactory = new Func0() { + @Override + public Resource call() { + return new Resource() { + + boolean first = true; + + @Override + public String getTextFromWeb() { + if (first) { + first = false; + return "Hello world!"; + } + return "Nothing"; + } + + @Override + public void unsubscribe() { + } + + }; + } + }; + + Func1> observableFactory = new Func1>() { + @Override + public Observable call(Resource resource) { + return Observable.from(resource.getTextFromWeb().split(" ")); + } + }; + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + Observable observable = Observable.create(using( + resourceFactory, observableFactory)); + observable.subscribe(observer); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + inOrder.verify(observer, times(1)).onNext("Hello"); + inOrder.verify(observer, times(1)).onNext("world!"); + inOrder.verify(observer, times(1)).onCompleted(); + + inOrder.verify(observer, times(1)).onNext("Hello"); + inOrder.verify(observer, times(1)).onNext("world!"); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test(expected = TestException.class) + public void testUsingWithResourceFactoryError() { + Func0 resourceFactory = new Func0() { + @Override + public Subscription call() { + throw new TestException(); + } + }; + + Func1> observableFactory = new Func1>() { + @Override + public Observable call(Subscription subscription) { + return Observable.empty(); + } + }; + + Observable.create(using(resourceFactory, observableFactory)) + .toBlockingObservable().last(); + } + + @Test + public void testUsingWithObservableFactoryError() { + final Action0 unsubscribe = mock(Action0.class); + Func0 resourceFactory = new Func0() { + @Override + public Subscription call() { + return Subscriptions.create(unsubscribe); + } + }; + + Func1> observableFactory = new Func1>() { + @Override + public Observable call(Subscription subscription) { + throw new TestException(); + } + }; + + try { + Observable.create(using(resourceFactory, observableFactory)) + .toBlockingObservable().last(); + fail("Should throw a TestException when the observableFactory throws it"); + } catch (TestException e) { + // Make sure that unsubscribe is called so that users can close + // the resource if some error happens. + verify(unsubscribe, times(1)).call(); + } + } + + @Test + public void testUsingWithObservableFactoryErrorInOnSubscribe() { + final Action0 unsubscribe = mock(Action0.class); + Func0 resourceFactory = new Func0() { + @Override + public Subscription call() { + return Subscriptions.create(unsubscribe); + } + }; + + Func1> observableFactory = new Func1>() { + @Override + public Observable call(Subscription subscription) { + return Observable.create(new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(Observer t1) { + throw new TestException(); + } + }); + } + }; + + try { + Observable.create(using(resourceFactory, observableFactory)) + .toBlockingObservable().last(); + fail("Should throw a TestException when the observableFactory throws it"); + } catch (TestException e) { + // Make sure that unsubscribe is called so that users can close + // the resource if some error happens. + verify(unsubscribe, times(1)).call(); + } + } +}