Skip to content

Commit

Permalink
fix(retry): Ensure teardown happens before resubscription with synchr…
Browse files Browse the repository at this point in the history
…onous observables

Related: #5620

- Resolves an issue where all teardowns would not execute until the result observable was complete if the source was synchronous

BREAKING CHANGE: Removed an undocumented behavior where passing a negative count argument to `retry` would result in an observable that repeats forever.
  • Loading branch information
benlesh committed Aug 6, 2020
1 parent 98356f4 commit 6f90597
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 53 deletions.
19 changes: 19 additions & 0 deletions spec/operators/retry-spec.ts
Expand Up @@ -117,6 +117,25 @@ describe('retry operator', () => {
});
});

it('should always teardown before starting the next cycle, even when synchronous', () => {
const results: any[] = [];
const source = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.error('bad');
return () => {
results.push('teardown');
}
});
const subscription = source.pipe(retry(3)).subscribe({
next: value => results.push(value),
error: (err) => results.push(err)
});

expect(subscription.closed).to.be.true;
expect(results).to.deep.equal([1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'teardown', 1, 2, 'bad', 'teardown'])
});

it('should retry a number of times, then call next handler without error, then retry and error', (done: MochaDone) => {
let index = 0;
let errors = 0;
Expand Down
100 changes: 47 additions & 53 deletions src/internal/operators/retry.ts
@@ -1,9 +1,10 @@
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { Observable } from '../Observable';

import { MonoTypeOperatorFunction, TeardownLogic } from '../types';
import { MonoTypeOperatorFunction } from '../types';
import { lift } from '../util/lift';
import { Subscription } from '../Subscription';
import { EMPTY } from '../observable/empty';

export interface RetryConfig {
count: number;
Expand Down Expand Up @@ -58,62 +59,55 @@ export interface RetryConfig {
*/
export function retry<T>(count?: number): MonoTypeOperatorFunction<T>;
export function retry<T>(config: RetryConfig): MonoTypeOperatorFunction<T>;
export function retry<T>(configOrCount: number | RetryConfig = -1): MonoTypeOperatorFunction<T> {
export function retry<T>(configOrCount: number | RetryConfig = Infinity): MonoTypeOperatorFunction<T> {
let config: RetryConfig;
if (configOrCount && typeof configOrCount === 'object') {
config = configOrCount as RetryConfig;
config = configOrCount;
} else {
config = {
count: configOrCount as number
count: configOrCount
};
}
return (source: Observable<T>) => lift(source, new RetryOperator(config.count, !!config.resetOnSuccess, source));
}

class RetryOperator<T> implements Operator<T, T> {
constructor(private count: number,
private resetOnSuccess: boolean,
private source: Observable<T>) {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(new RetrySubscriber(subscriber, this.count, this.resetOnSuccess, this.source));
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class RetrySubscriber<T> extends Subscriber<T> {
private readonly initialCount: number;

constructor(destination: Subscriber<any>,
private count: number,
private resetOnSuccess: boolean,
private source: Observable<T>
) {
super(destination);
this.initialCount = this.count;
}

next(value?: T): void {
super.next(value);
if (this.resetOnSuccess) {
this.count = this.initialCount;
}
}
const { count, resetOnSuccess = false } = config;

error(err: any) {
if (!this.isStopped) {
const { source, count } = this;
if (count === 0) {
return super.error(err);
} else if (count > -1) {
this.count = count - 1;
return (source: Observable<T>) => count <= 0 ? EMPTY: lift(source, function (this: Subscriber<T>, source: Observable<T>) {
const subscriber = this;
let soFar = 0;
const subscription = new Subscription();
let innerSub: Subscription | null;
const subscribeNext = () => {
let syncUnsub = false;
innerSub = source.subscribe({
next: (value) => {
if (resetOnSuccess) {
soFar = 0;
}
subscriber.next(value);
},
error: (err) => {
if (soFar++ < count) {
if (innerSub) {
subscription.remove(innerSub);
innerSub.unsubscribe();
subscribeNext();
} else {
syncUnsub = true;
}
} else {
subscriber.error(err);
}
},
complete: () => subscriber.complete(),
});
if (syncUnsub) {
innerSub.unsubscribe();
innerSub = null;
subscribeNext();
} else {
subscription.add(innerSub);
}
source.subscribe(this._unsubscribeAndRecycle());
}
}
}
};
subscribeNext();
return subscription;
})
}

0 comments on commit 6f90597

Please sign in to comment.