Skip to content
Permalink
Browse files

fix(race): ignore latter sources after first complete or error (#4809)

* test(race): add failing tests

* fix(race): ignore sources after first complete/error

Closes #4808

BREAKING CHANGE: `race()` will no longer subscribe to subsequent observables if a provided source synchronously errors or completes. This means side effects that might have occurred during subscription in those rare cases will no longer occur.
  • Loading branch information...
cartant authored and benlesh committed Jun 4, 2019
1 parent 362d1d4 commit f31c3df01b524126ec8e7760e89395ea2fd3717d
Showing with 33 additions and 1 deletion.
  1. +23 −1 spec/operators/race-spec.ts
  2. +10 −0 src/internal/observable/race.ts
@@ -1,7 +1,7 @@
import { expect } from 'chai';
import * as sinon from 'sinon';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { NEVER, of, race as staticRace, timer, defer, Observable } from 'rxjs';
import { EMPTY, NEVER, of, race as staticRace, timer, defer, Observable, throwError } from 'rxjs';
import { race, mergeMap, map, finalize, startWith } from 'rxjs/operators';

/** @test {race} */
@@ -184,6 +184,28 @@ describe('race operator', () => {
expect(onSubscribe.called).to.be.false;
});

it('should ignore latter observables if a former one completes immediately', () => {
const onComplete = sinon.spy();
const onSubscribe = sinon.spy();
const e1 = EMPTY; // Wins the race
const e2 = defer(onSubscribe); // Should be ignored

e1.pipe(race(e2)).subscribe({ complete: onComplete });
expect(onComplete.calledWithExactly()).to.be.true;
expect(onSubscribe.called).to.be.false;
});

it('should ignore latter observables if a former one errors immediately', () => {
const onError = sinon.spy();
const onSubscribe = sinon.spy();
const e1 = throwError('kaboom'); // Wins the race
const e2 = defer(onSubscribe); // Should be ignored

e1.pipe(race(e2)).subscribe({ error: onError });
expect(onError.calledWithExactly('kaboom')).to.be.true;
expect(onSubscribe.called).to.be.false;
});

it('should unsubscribe former observables if a latter one emits immediately', () => {
const onNext = sinon.spy();
const onUnsubscribe = sinon.spy();
@@ -137,4 +137,14 @@ export class RaceSubscriber<T> extends OuterSubscriber<T, T> {

this.destination.next(innerValue);
}

notifyComplete(innerSub: InnerSubscriber<T, T>): void {
this.hasFirst = true;
super.notifyComplete(innerSub);
}

notifyError(error: any, innerSub: InnerSubscriber<T, T>): void {
this.hasFirst = true;
super.notifyError(error, innerSub);
}
}

0 comments on commit f31c3df

Please sign in to comment.
You can’t perform that action at this time.