Skip to content

Commit

Permalink
fix(shareLatest): properly closing sync observables
Browse files Browse the repository at this point in the history
  • Loading branch information
josepot committed May 7, 2021
1 parent 51148d1 commit c6b4010
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 30 deletions.
43 changes: 22 additions & 21 deletions packages/core/src/internal/share-latest.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Observable, Subscription, Subject, noop } from "rxjs"
import { Observable, Subscription, Subject, noop, Subscriber } from "rxjs"
import { BehaviorObservable } from "./BehaviorObservable"
import { EMPTY_VALUE } from "./empty-value"
import { SUSPENSE } from "../SUSPENSE"
Expand All @@ -10,7 +10,7 @@ const shareLatest = <T>(
teardown = noop,
): BehaviorObservable<T> => {
let subject: Subject<T> | null
let subscription: Subscription | null
let subscription: Subscriber<T> | null
let refCount = 0
let currentValue: T = EMPTY_VALUE
let promise: Promise<T> | null
Expand All @@ -29,15 +29,31 @@ const shareLatest = <T>(

refCount++
let innerSub: Subscription

subscriber.add(() => {
refCount--
innerSub.unsubscribe()
if (refCount === 0) {
currentValue = EMPTY_VALUE
if (subscription) {
subscription.unsubscribe()
}
teardown()
subject = null
subscription = null
promise = null
}
})

if (!subject) {
subject = new Subject<T>()
innerSub = subject.subscribe(subscriber)
subscription = null
subscription = source$.subscribe(
(value) => {
subscription = new Subscriber<T>(
(value: T) => {
subject!.next((currentValue = value))
},
(err) => {
(err: any) => {
const _subject = subject
subscription = null
subject = null
Expand All @@ -49,29 +65,14 @@ const shareLatest = <T>(
subject!.complete()
},
)
if (subscription.closed) subscription = null
source$.subscribe(subscription)
emitIfEmpty()
} else {
innerSub = subject.subscribe(subscriber)
if (currentValue !== EMPTY_VALUE) {
subscriber.next(currentValue)
}
}

return () => {
refCount--
innerSub.unsubscribe()
if (refCount === 0) {
currentValue = EMPTY_VALUE
if (subscription) {
subscription.unsubscribe()
}
teardown()
subject = null
subscription = null
promise = null
}
}
}) as BehaviorObservable<T>

let error: any = EMPTY_VALUE
Expand Down
32 changes: 23 additions & 9 deletions packages/core/src/share-latest.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { TestScheduler } from "rxjs/testing"
import { from, merge, defer } from "rxjs"
import { from, merge, defer, Observable, noop } from "rxjs"
import { shareLatest } from "./"
import { withLatestFrom, startWith, map } from "rxjs/operators"
import { withLatestFrom, startWith, map, take } from "rxjs/operators"

const scheduler = () =>
new TestScheduler((actual, expected) => {
Expand Down Expand Up @@ -75,15 +75,29 @@ describe("shareLatest", () => {

// prettier-ignore
it("should not skip values on a sync source", () => {
scheduler().run(({ expectObservable }) => {
const source = from(['a', 'b', 'c', 'd']) // cold("(abcd|)")
const sub1 = '^';
const expected1 = " (abcd|)"
scheduler().run(({ expectObservable }) => {
const source = from(['a', 'b', 'c', 'd']) // cold("(abcd|)")
const sub1 = '^';
const expected1 = " (abcd|)"

const shared = shareLatest()(source);
const shared = shareLatest()(source);

expectObservable(shared, sub1).toBe(expected1);
expectObservable(shared, sub1).toBe(expected1);
})
})

it("should stop listening to a synchronous observable when unsubscribed", () => {
let sideEffects = 0
const synchronousObservable = new Observable<number>((subscriber) => {
// This will check to see if the subscriber was closed on each loop
// when the unsubscribe hits (from the `take`), it should be closed
for (let i = 0; !subscriber.closed && i < 10; i++) {
sideEffects++
subscriber.next(i)
}
})
synchronousObservable.pipe(shareLatest(), take(3)).subscribe(noop)
expect(sideEffects).toBe(3)
})
})
})
})

0 comments on commit c6b4010

Please sign in to comment.