Skip to content

Commit

Permalink
fix(shareReplay): no longer misses synchronous values from source
Browse files Browse the repository at this point in the history
* fix(shareReplay): subscribe to subject before source subscription happens

* test(shareReplay): improve test on sync source subscription

Co-authored-by: Víctor Oliva <olivarra1@gmail.com>
  • Loading branch information
2 people authored and benlesh committed Jun 29, 2020
1 parent 3bb2f7f commit d2f6ac7
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 3 deletions.
23 changes: 21 additions & 2 deletions spec/operators/shareReplay-spec.ts
@@ -1,9 +1,9 @@
import { expect } from 'chai';
import * as sinon from 'sinon';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { shareReplay, mergeMapTo, retry, take } from 'rxjs/operators';
import { shareReplay, mergeMapTo, retry } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { Observable, interval, Operator, Observer, of } from 'rxjs';
import { Observable, Operator, Observer, of, from } from 'rxjs';

declare function asDiagram(arg: string): Function;
declare const rxTestScheduler: TestScheduler;
Expand Down Expand Up @@ -243,4 +243,23 @@ describe('shareReplay operator', () => {
done();
});
});

it('should not skip values on a sync source', () => {
const a = from(['a', 'b', 'c', 'd']);
// We would like for the previous line to read like this:
//
// const a = cold('(abcd|)');
//
// However, that would synchronously emit multiple values at frame 0,
// but it's not synchronous upon-subscription.
// TODO: revisit once https://github.com/ReactiveX/rxjs/issues/5523 is fixed

const x = cold( 'x-------x');
const expected = '(abcd)--d';

const shared = a.pipe(shareReplay(1));
const result = x.pipe(mergeMapTo(shared));
expectObservable(result).toBe(expected);
});

});
5 changes: 4 additions & 1 deletion src/internal/operators/shareReplay.ts
Expand Up @@ -91,9 +91,11 @@ function shareReplayOperator<T>({

return function shareReplayOperation(this: Subscriber<T>, source: Observable<T>) {
refCount++;
let innerSub: Subscription;
if (!subject || hasError) {
hasError = false;
subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler);
innerSub = subject.subscribe(this);
subscription = source.subscribe({
next(value) { subject.next(value); },
error(err) {
Expand All @@ -106,9 +108,10 @@ function shareReplayOperator<T>({
subject.complete();
},
});
} else {
innerSub = subject.subscribe(this);
}

const innerSub = subject.subscribe(this);
this.add(() => {
refCount--;
innerSub.unsubscribe();
Expand Down

0 comments on commit d2f6ac7

Please sign in to comment.