Skip to content

Commit

Permalink
Implement the 'Using' operator
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Nov 11, 2013
1 parent a10e0a2 commit bf079c6
Show file tree
Hide file tree
Showing 3 changed files with 294 additions and 0 deletions.
16 changes: 16 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import rx.operators.OperationToObservableIterable;
import rx.operators.OperationToObservableList;
import rx.operators.OperationToObservableSortedList;
import rx.operators.OperationUsing;
import rx.operators.OperationWindow;
import rx.operators.OperationZip;
import rx.operators.SafeObservableSubscription;
Expand Down Expand Up @@ -4594,6 +4595,21 @@ public Observable<TimeInterval<T>> timeInterval(Scheduler scheduler) {
return create(OperationTimeInterval.timeInterval(this, scheduler));
}

/**
* Constructs an observable sequence that depends on a resource object.
*
* @param resourceFactory
* The factory function to obtain a resource object.
* @param observableFactory
* The factory function to obtain an observable sequence that depends on the obtained resource.
* @return
* The observable sequence whose lifetime controls the lifetime of the dependent resource object.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229585(v=vs.103).aspx">MSDN: Observable.Using</a>
*/
public static <T, RESOURCE extends Subscription> Observable<T> using(Func0<RESOURCE> resourceFactory, Func1<RESOURCE, Observable<T>> observableFactory) {
return create(OperationUsing.using(resourceFactory, observableFactory));
}

/**
* Propagates the observable sequence that reacts first.
*
Expand Down
59 changes: 59 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationUsing.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func0;
import rx.util.functions.Func1;

/**
* Constructs an observable sequence that depends on a resource object.
*/
public class OperationUsing {

public static <T, RESOURCE extends Subscription> OnSubscribeFunc<T> using(
final Func0<RESOURCE> resourceFactory,
final Func1<RESOURCE, Observable<T>> observableFactory) {
return new OnSubscribeFunc<T>() {
@Override
public Subscription onSubscribe(Observer<? super T> observer) {
Subscription resourceSubscription = Subscriptions.empty();
try {
RESOURCE resource = resourceFactory.call();
if (resource != null) {
resourceSubscription = resource;
}
Observable<T> observable = observableFactory.call(resource);
SafeObservableSubscription subscription = new SafeObservableSubscription();
// Use SafeObserver to guarantee resourceSubscription will
// be unsubscribed.
return subscription.wrap(new CompositeSubscription(
observable.subscribe(new SafeObserver<T>(
subscription, observer)),
resourceSubscription));
} catch (Throwable e) {
resourceSubscription.unsubscribe();
return Observable.<T> error(e).subscribe(observer);
}
}
};
}
}
219 changes: 219 additions & 0 deletions rxjava-core/src/test/java/rx/operators/OperationUsingTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import static org.junit.Assert.fail;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static rx.operators.OperationUsing.using;

import org.junit.Test;
import org.mockito.InOrder;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Func0;
import rx.util.functions.Func1;

public class OperationUsingTest {

@SuppressWarnings("serial")
private static class TestException extends RuntimeException {
}

private static interface Resource extends Subscription {
public String getTextFromWeb();

@Override
public void unsubscribe();
}

@Test
public void testUsing() {
final Resource resource = mock(Resource.class);
when(resource.getTextFromWeb()).thenReturn("Hello world!");

Func0<Resource> resourceFactory = new Func0<Resource>() {
@Override
public Resource call() {
return resource;
}
};

Func1<Resource, Observable<String>> observableFactory = new Func1<Resource, Observable<String>>() {
@Override
public Observable<String> call(Resource resource) {
return Observable.from(resource.getTextFromWeb().split(" "));
}
};

@SuppressWarnings("unchecked")
Observer<String> observer = (Observer<String>) mock(Observer.class);
Observable<String> observable = Observable.create(using(
resourceFactory, observableFactory));
observable.subscribe(observer);

InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext("Hello");
inOrder.verify(observer, times(1)).onNext("world!");
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();

// The resouce should be closed
verify(resource, times(1)).unsubscribe();
}

@Test
public void testUsingWithSubscribingTwice() {
// When subscribe is called, a new resource should be created.
Func0<Resource> resourceFactory = new Func0<Resource>() {
@Override
public Resource call() {
return new Resource() {

boolean first = true;

@Override
public String getTextFromWeb() {
if (first) {
first = false;
return "Hello world!";
}
return "Nothing";
}

@Override
public void unsubscribe() {
}

};
}
};

Func1<Resource, Observable<String>> observableFactory = new Func1<Resource, Observable<String>>() {
@Override
public Observable<String> call(Resource resource) {
return Observable.from(resource.getTextFromWeb().split(" "));
}
};

@SuppressWarnings("unchecked")
Observer<String> observer = (Observer<String>) mock(Observer.class);
Observable<String> observable = Observable.create(using(
resourceFactory, observableFactory));
observable.subscribe(observer);
observable.subscribe(observer);

InOrder inOrder = inOrder(observer);

inOrder.verify(observer, times(1)).onNext("Hello");
inOrder.verify(observer, times(1)).onNext("world!");
inOrder.verify(observer, times(1)).onCompleted();

inOrder.verify(observer, times(1)).onNext("Hello");
inOrder.verify(observer, times(1)).onNext("world!");
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}

@Test(expected = TestException.class)
public void testUsingWithResourceFactoryError() {
Func0<Subscription> resourceFactory = new Func0<Subscription>() {
@Override
public Subscription call() {
throw new TestException();
}
};

Func1<Subscription, Observable<Integer>> observableFactory = new Func1<Subscription, Observable<Integer>>() {
@Override
public Observable<Integer> call(Subscription subscription) {
return Observable.empty();
}
};

Observable.create(using(resourceFactory, observableFactory))
.toBlockingObservable().last();
}

@Test
public void testUsingWithObservableFactoryError() {
final Action0 unsubscribe = mock(Action0.class);
Func0<Subscription> resourceFactory = new Func0<Subscription>() {
@Override
public Subscription call() {
return Subscriptions.create(unsubscribe);
}
};

Func1<Subscription, Observable<Integer>> observableFactory = new Func1<Subscription, Observable<Integer>>() {
@Override
public Observable<Integer> call(Subscription subscription) {
throw new TestException();
}
};

try {
Observable.create(using(resourceFactory, observableFactory))
.toBlockingObservable().last();
fail("Should throw a TestException when the observableFactory throws it");
} catch (TestException e) {
// Make sure that unsubscribe is called so that users can close
// the resource if some error happens.
verify(unsubscribe, times(1)).call();
}
}

@Test
public void testUsingWithObservableFactoryErrorInOnSubscribe() {
final Action0 unsubscribe = mock(Action0.class);
Func0<Subscription> resourceFactory = new Func0<Subscription>() {
@Override
public Subscription call() {
return Subscriptions.create(unsubscribe);
}
};

Func1<Subscription, Observable<Integer>> observableFactory = new Func1<Subscription, Observable<Integer>>() {
@Override
public Observable<Integer> call(Subscription subscription) {
return Observable.create(new OnSubscribeFunc<Integer>() {
@Override
public Subscription onSubscribe(Observer<? super Integer> t1) {
throw new TestException();
}
});
}
};

try {
Observable.create(using(resourceFactory, observableFactory))
.toBlockingObservable().last();
fail("Should throw a TestException when the observableFactory throws it");
} catch (TestException e) {
// Make sure that unsubscribe is called so that users can close
// the resource if some error happens.
verify(unsubscribe, times(1)).call();
}
}
}

0 comments on commit bf079c6

Please sign in to comment.