Skip to content

Commit

Permalink
Fix cleanup observer blocking unsubscribe.
Browse files Browse the repository at this point in the history
  • Loading branch information
Javier authored and benjamn committed Oct 14, 2020
1 parent f45e0f6 commit c6d58db
Showing 1 changed file with 33 additions and 17 deletions.
50 changes: 33 additions & 17 deletions src/utilities/observables/Concast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ export class Concast<T> extends Observable<T> {
// recent message, though possibly at different times in the past.
private observers = new Set<Observer<T>>();

// Shadow observers awaiting broadcast messages. These observers are
// subordinated to this.observers and should not prevent unsubscribing
// from this.sub.
private shadowObservers = new Set<Observer<T>>();

// This property starts off undefined to indicate the initial
// subscription has not yet begun, then points to each source
// subscription in turn, and finally becomes null after the sources have
Expand Down Expand Up @@ -95,25 +100,29 @@ export class Concast<T> extends Observable<T> {
this.handlers.complete!();
}

private deliverLastMessage (observer: Observer<T>) {
if (this.latest) {
const nextOrError = this.latest[0];
const method = observer[nextOrError];
if (method) {
method.call(observer, this.latest[1]);
}
// If the subscription is already closed, and the last message was
// a 'next' message, simulate delivery of the final 'complete'
// message again.
if (this.sub === null &&
nextOrError === "next" &&
observer.complete) {
observer.complete();
}
}
}

public addObserver(observer: Observer<T>) {
if (!this.observers.has(observer)) {
// Immediately deliver the most recent message, so we can always
// be sure all observers have the latest information.
if (this.latest) {
const nextOrError = this.latest[0];
const method = observer[nextOrError];
if (method) {
method.call(observer, this.latest[1]);
}
// If the subscription is already closed, and the last message was
// a 'next' message, simulate delivery of the final 'complete'
// message again.
if (this.sub === null &&
nextOrError === "next" &&
observer.complete) {
observer.complete();
}
}
this.deliverLastMessage(observer);
this.observers.add(observer);
}
}
Expand Down Expand Up @@ -156,6 +165,7 @@ export class Concast<T> extends Observable<T> {
if (this.sub !== null) {
this.latest = ["next", result];
iterateObserversSafely(this.observers, "next", result);
iterateObserversSafely(this.shadowObservers, "next", result);
}
},

Expand All @@ -166,6 +176,7 @@ export class Concast<T> extends Observable<T> {
this.latest = ["error", error];
this.reject(error);
iterateObserversSafely(this.observers, "error", error);
iterateObserversSafely(this.shadowObservers, "error", error);
}
},

Expand All @@ -187,6 +198,7 @@ export class Concast<T> extends Observable<T> {
// 'next' message (unless there was an error) immediately
// followed by a 'complete' message (see addObserver).
iterateObserversSafely(this.observers, "complete");
iterateObserversSafely(this.shadowObservers, "complete");
} else if (isPromiseLike(value)) {
value.then(obs => this.sub = obs.subscribe(this.handlers));
} else {
Expand All @@ -201,7 +213,7 @@ export class Concast<T> extends Observable<T> {
const once = () => {
if (!called) {
called = true;
this.observers.delete(observer);
this.shadowObservers.delete(observer);
callback();
}
}
Expand All @@ -210,7 +222,11 @@ export class Concast<T> extends Observable<T> {
error: once,
complete: once,
};
this.addObserver(observer);

// Immediately deliver the most recent message, so we can always
// be sure all observers have the latest information.
this.deliverLastMessage(observer);
this.shadowObservers.add(observer);
}

// A public way to abort observation and broadcast.
Expand Down

0 comments on commit c6d58db

Please sign in to comment.