Skip to content

Commit

Permalink
Merge pull request ReactiveX#659 from akarnokd/SubjectsFixAdditional
Browse files Browse the repository at this point in the history
Missing fixes from the subject rewrite
  • Loading branch information
benjchristensen committed Dec 23, 2013
2 parents 0c8389c + 8720b0f commit 4ae8b9a
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 6 deletions.
13 changes: 7 additions & 6 deletions rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,13 @@ public void call(Collection<SubjectObserver<? super T>> observers) {

@Override
public void onNext(T v) {
/**
* Store the latest value but do not send it. It only gets sent when 'onCompleted' occurs.
*/
lastNotification.set(new Notification<T>(v));
for (Observer<? super T> o : subscriptionManager.rawSnapshot()) {
o.onNext(v);
// do not overwrite a terminal notification
// so new subscribers can get them
if (lastNotification.get().isOnNext()) {
lastNotification.set(new Notification<T>(v));
for (Observer<? super T> o : subscriptionManager.rawSnapshot()) {
o.onNext(v);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public Subscription onSubscribe(Observer<? super T> actualObserver) {
try {
current.terminationLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted waiting for termination.", e);
}
break;
Expand Down
50 changes: 50 additions & 0 deletions rxjava-core/src/test/java/rx/subjects/BehaviorSubjectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,5 +185,55 @@ public void testCompletedAfterErrorIsNotSent() {
verify(aObserver, never()).onNext("two");
verify(aObserver, never()).onCompleted();
}
@Test
public void testCompletedAfterErrorIsNotSent2() {
BehaviorSubject<String> subject = BehaviorSubject.create("default");

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
subject.subscribe(aObserver);

subject.onNext("one");
subject.onError(testException);
subject.onNext("two");
subject.onCompleted();

verify(aObserver, times(1)).onNext("default");
verify(aObserver, times(1)).onNext("one");
verify(aObserver, times(1)).onError(testException);
verify(aObserver, never()).onNext("two");
verify(aObserver, never()).onCompleted();

Observer<Object> o2 = mock(Observer.class);
subject.subscribe(o2);
verify(o2, times(1)).onError(testException);
verify(o2, never()).onNext(any());
verify(o2, never()).onCompleted();
}

@Test
public void testCompletedAfterErrorIsNotSent3() {
BehaviorSubject<String> subject = BehaviorSubject.create("default");

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
subject.subscribe(aObserver);

subject.onNext("one");
subject.onCompleted();
subject.onNext("two");
subject.onCompleted();

verify(aObserver, times(1)).onNext("default");
verify(aObserver, times(1)).onNext("one");
verify(aObserver, times(1)).onCompleted();
verify(aObserver, never()).onError(any(Throwable.class));
verify(aObserver, never()).onNext("two");

Observer<Object> o2 = mock(Observer.class);
subject.subscribe(o2);
verify(o2, times(1)).onCompleted();
verify(o2, never()).onNext(any());
verify(aObserver, never()).onError(any(Throwable.class));
}
}

0 comments on commit 4ae8b9a

Please sign in to comment.