Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(shareReplay): add config parameter #4059

Merged
merged 9 commits into from Jan 30, 2019
10 changes: 8 additions & 2 deletions compat/operator/shareReplay.ts
@@ -1,11 +1,17 @@
import { Observable, SchedulerLike } from 'rxjs';
import { shareReplay as higherOrder } from 'rxjs/operators';
import { ShareReplayConfig } from 'rxjs/internal-compatibility';

/**
* @method shareReplay
* @owner Observable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove @owner Observable here. It's not necessary.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

*/
export function shareReplay<T>(this: Observable<T>, bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike):
export function shareReplay<T>(this: Observable<T>, config: ShareReplayConfig): Observable<T>;
export function shareReplay<T>(this: Observable<T>, bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike): Observable<T>;
export function shareReplay<T>(this: Observable<T>, configOrBufferSize?: ShareReplayConfig | number, windowTime?: number, scheduler?: SchedulerLike):
Observable<T> {
return higherOrder(bufferSize, windowTime, scheduler)(this) as Observable<T>;
if (configOrBufferSize && typeof configOrBufferSize === 'object') {
return higherOrder(configOrBufferSize as ShareReplayConfig)(this) as Observable<T>;
}
return higherOrder(configOrBufferSize as number | undefined, windowTime, scheduler)(this) as Observable<T>;
}
6 changes: 0 additions & 6 deletions doc/README.md

This file was deleted.

30 changes: 15 additions & 15 deletions spec-dtslint/operators/shareReplay-spec.ts
@@ -1,30 +1,30 @@
import { of, asyncScheduler } from 'rxjs';
import { of, asyncScheduler } from 'rxjs';
import { shareReplay } from 'rxjs/operators';

it('should infer correctly', () => {
const o = of('foo', 'bar', 'baz').pipe(shareReplay()); // $ExpectType Observable<string>
it('should accept an individual bufferSize parameter', () => {
const o = of(1, 2, 3).pipe(shareReplay(1)); // $ExpectType Observable<number>
});

it('should support a bufferSize', () => {
const o = of('foo', 'bar', 'baz').pipe(shareReplay(6)); // $ExpectType Observable<string>
it('should accept individual bufferSize and windowTime parameters', () => {
const o = of(1, 2, 3).pipe(shareReplay(1, 2)); // $ExpectType Observable<number>
});

it('should support a windowTime', () => {
const o = of('foo', 'bar', 'baz').pipe(shareReplay(6, 4)); // $ExpectType Observable<string>
it('should accept individual bufferSize, windowTime and scheduler parameters', () => {
const o3 = of(1, 2, 3).pipe(shareReplay(1, 2, asyncScheduler)); // $ExpectType Observable<number>
});

it('should support a scheduler', () => {
const o = of('foo', 'bar', 'baz').pipe(shareReplay(6, 4, asyncScheduler)); // $ExpectType Observable<string>
it('should accept a bufferSize config parameter', () => {
const o = of(1, 2, 3).pipe(shareReplay({ bufferSize: 1, refCount: true })); // $ExpectType Observable<number>
});

it('should enforce type of bufferSize', () => {
const o = of('foo', 'bar', 'baz').pipe(shareReplay('abc')); // $ExpectError
it('should accept bufferSize and windowTime config parameters', () => {
const o = of(1, 2, 3).pipe(shareReplay({ bufferSize: 1, windowTime: 2, refCount: true })); // $ExpectType Observable<number>
});

it('should enforce type of windowTime', () => {
const o = of('foo', 'bar', 'baz').pipe(shareReplay(5, 'abc')); // $ExpectError
it('should accept bufferSize, windowTime and scheduler config parameters', () => {
const o = of(1, 2, 3).pipe(shareReplay({ bufferSize: 1, windowTime: 2, scheduler: asyncScheduler, refCount: true })); // $ExpectType Observable<number>
});

it('should enforce type of scheduler', () => {
const o = of('foo', 'bar', 'baz').pipe(shareReplay(5, 3, 'abc')); // $ExpectError
it('should require a refCount config parameter', () => {
const o = of(1, 2, 3).pipe(shareReplay({ bufferSize: 1 })); // $ExpectError
});
53 changes: 39 additions & 14 deletions spec/operators/shareReplay-spec.ts
Expand Up @@ -163,20 +163,6 @@ describe('shareReplay operator', () => {
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should not restart if refCount hits 0 due to unsubscriptions', () => {
const results: number[] = [];
const source = interval(10, rxTestScheduler).pipe(
take(10),
shareReplay(1)
);
const subs = source.subscribe(x => results.push(x));
rxTestScheduler.schedule(() => subs.unsubscribe(), 35);
rxTestScheduler.schedule(() => source.subscribe(x => results.push(x)), 54);

rxTestScheduler.flush();
expect(results).to.deep.equal([0, 1, 2, 4, 5, 6, 7, 8, 9]);
});

it('when no windowTime is given ReplaySubject should be in _infiniteTimeWindow mode', () => {
const spy = sinon.spy(rxTestScheduler, 'now');

Expand All @@ -187,6 +173,45 @@ describe('shareReplay operator', () => {
expect(spy, 'ReplaySubject should not call scheduler.now() when no windowTime is given').to.be.not.called;
});

it('should not restart due to unsubscriptions if refCount is false', () => {
const source = cold('a-b-c-d-e-f-g-h-i-j');
const sub1 = '^------!';
const expected1 = 'a-b-c-d-';
const sub2 = '-----------^-------';
const expected2 = '-----------fg-h-i-j';

const shared = source.pipe(shareReplay({ bufferSize: 1, refCount: false }));

expectObservable(shared, sub1).toBe(expected1);
expectObservable(shared, sub2).toBe(expected2);
});

it('should restart due to unsubscriptions if refCount is true', () => {
const source = cold('a-b-c-d-e-f-g-h-i-j');
const sub1 = '^------!';
const expected1 = 'a-b-c-d-';
const sub2 = '-----------^------------------';
const expected2 = '-----------a-b-c-d-e-f-g-h-i-j';

const shared = source.pipe(shareReplay({ bufferSize: 1, refCount: true }));

expectObservable(shared, sub1).toBe(expected1);
expectObservable(shared, sub2).toBe(expected2);
});

it('should default to refCount being false', () => {
const source = cold('a-b-c-d-e-f-g-h-i-j');
const sub1 = '^------!';
const expected1 = 'a-b-c-d-';
const sub2 = '-----------^-------';
const expected2 = '-----------fg-h-i-j';

const shared = source.pipe(shareReplay(1));

expectObservable(shared, sub1).toBe(expected1);
expectObservable(shared, sub2).toBe(expected2);
});

it('should not break lift() composability', (done: MochaDone) => {
class MyCustomObservable<T> extends Observable<T> {
lift<R>(operator: Operator<T, R>): Observable<R> {
Expand Down
1 change: 1 addition & 0 deletions src/internal-compatibility/index.ts
Expand Up @@ -23,6 +23,7 @@ export { SubscribeOnObservable } from '../internal/observable/SubscribeOnObserva
export { Timestamp } from '../internal/operators/timestamp';
export { TimeInterval } from '../internal/operators/timeInterval';
export { GroupedObservable } from '../internal/operators/groupBy';
export { ShareReplayConfig } from '../internal/operators/shareReplay';
export { ThrottleConfig, defaultThrottleConfig } from '../internal/operators/throttle';

export { rxSubscriber } from '../internal/symbol/rxSubscriber';
Expand Down
46 changes: 36 additions & 10 deletions src/internal/operators/shareReplay.ts
Expand Up @@ -4,6 +4,13 @@ import { Subscription } from '../Subscription';
import { MonoTypeOperatorFunction, SchedulerLike } from '../types';
import { Subscriber } from '../Subscriber';

export interface ShareReplayConfig {
bufferSize?: number;
windowTime?: number;
refCount: boolean;
scheduler?: SchedulerLike;
}

/**
* Share source and replay specified number of emissions on subscription.
*
Expand Down Expand Up @@ -46,18 +53,36 @@ import { Subscriber } from '../Subscriber';
* @method shareReplay
* @owner Observable
*/
export function shareReplay<T>(config: ShareReplayConfig): MonoTypeOperatorFunction<T>;
export function shareReplay<T>(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>;
export function shareReplay<T>(
bufferSize: number = Number.POSITIVE_INFINITY,
windowTime: number = Number.POSITIVE_INFINITY,
configOrBufferSize?: ShareReplayConfig | number,
windowTime?: number,
scheduler?: SchedulerLike
): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => source.lift(shareReplayOperator(bufferSize, windowTime, scheduler));
let config: ShareReplayConfig;
if (configOrBufferSize && typeof configOrBufferSize === 'object') {
config = configOrBufferSize as ShareReplayConfig;
} else {
config = {
bufferSize: configOrBufferSize as number | undefined,
windowTime,
refCount: false,
scheduler
};
}
return (source: Observable<T>) => source.lift(shareReplayOperator(config));
}

function shareReplayOperator<T>(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike) {
let subject: ReplaySubject<T>;
function shareReplayOperator<T>({
bufferSize = Number.POSITIVE_INFINITY,
windowTime = Number.POSITIVE_INFINITY,
refCount: useRefCount,
scheduler
}: ShareReplayConfig) {
let subject: ReplaySubject<T> | undefined;
let refCount = 0;
let subscription: Subscription;
let subscription: Subscription | undefined;
let hasError = false;
let isComplete = false;

Expand All @@ -80,13 +105,14 @@ function shareReplayOperator<T>(bufferSize?: number, windowTime?: number, schedu
}

const innerSub = subject.subscribe(this);

return () => {
this.add(() => {
refCount--;
innerSub.unsubscribe();
if (subscription && refCount === 0 && isComplete) {
if (subscription && !isComplete && useRefCount && refCount === 0) {
subscription.unsubscribe();
subscription = undefined;
subject = undefined;
}
};
});
};
}