diff --git a/src/utilities/observables/Concast.ts b/src/utilities/observables/Concast.ts index 326ae423a1e..5efb77a70f2 100644 --- a/src/utilities/observables/Concast.ts +++ b/src/utilities/observables/Concast.ts @@ -47,6 +47,11 @@ export class Concast extends Observable { // recent message, though possibly at different times in the past. private observers = new Set>(); + // Shadow observers awaiting broadcast messages. These observers are + // subordinated to this.observers, so should not prevent unsubscribing + // from this.sub. + private shadowObservers = new Set>(); + // This property starts off undefined to indicate the initial // subscription has not yet begun, then points to each source // subscription in turn, and finally becomes null after the sources have @@ -95,25 +100,29 @@ export class Concast extends Observable { this.handlers.complete!(); } + private deliverLastMessage (observer: Observer) { + if (this.latest) { + const nextOrError = this.latest[0]; + const method = observer[nextOrError]; + if (method) { + method.call(observer, this.latest[1]); + } + // If the subscription is already closed, and the last message was + // a 'next' message, simulate delivery of the final 'complete' + // message again. + if (this.sub === null && + nextOrError === "next" && + observer.complete) { + observer.complete(); + } + } + } + public addObserver(observer: Observer) { if (!this.observers.has(observer)) { // Immediately deliver the most recent message, so we can always // be sure all observers have the latest information. - if (this.latest) { - const nextOrError = this.latest[0]; - const method = observer[nextOrError]; - if (method) { - method.call(observer, this.latest[1]); - } - // If the subscription is already closed, and the last message was - // a 'next' message, simulate delivery of the final 'complete' - // message again. - if (this.sub === null && - nextOrError === "next" && - observer.complete) { - observer.complete(); - } - } + this.deliverLastMessage(observer); this.observers.add(observer); } } @@ -156,6 +165,7 @@ export class Concast extends Observable { if (this.sub !== null) { this.latest = ["next", result]; iterateObserversSafely(this.observers, "next", result); + iterateObserversSafely(this.shadowObservers, "next", result); } }, @@ -166,6 +176,7 @@ export class Concast extends Observable { this.latest = ["error", error]; this.reject(error); iterateObserversSafely(this.observers, "error", error); + iterateObserversSafely(this.shadowObservers, "error", error); } }, @@ -187,6 +198,7 @@ export class Concast extends Observable { // 'next' message (unless there was an error) immediately // followed by a 'complete' message (see addObserver). iterateObserversSafely(this.observers, "complete"); + iterateObserversSafely(this.shadowObservers, "complete"); } else if (isPromiseLike(value)) { value.then(obs => this.sub = obs.subscribe(this.handlers)); } else { @@ -201,7 +213,7 @@ export class Concast extends Observable { const once = () => { if (!called) { called = true; - this.observers.delete(observer); + this.shadowObservers.delete(observer); callback(); } } @@ -210,7 +222,11 @@ export class Concast extends Observable { error: once, complete: once, }; - this.addObserver(observer); + + // Immediately deliver the most recent message, so we can always + // be sure all observers have the latest information. + this.deliverLastMessage(observer); + this.shadowObservers.add(observer); } // A public way to abort observation and broadcast.