Skip to content

Commit

Permalink
Fix #793: switchMap should wait for last child to complete (#918)
Browse files Browse the repository at this point in the history
  • Loading branch information
oleg-py authored and Avasil committed Jul 5, 2019
1 parent 1064841 commit 01818d5
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 24 deletions.
Expand Up @@ -42,6 +42,8 @@ private[reactive] final class SwitchMapObservable[A, B](source: Observable[A], f
private[this] var activeChildIndex: Int = -1
// MUST BE synchronized by `self`
private[this] var upstreamIsDone: Boolean = false
// MUST BE synchronized by `self`
private[this] var lastChildIsDone: Boolean = false

def onNext(elem: A): Ack = self.synchronized {
if (upstreamIsDone) Stop
Expand All @@ -61,15 +63,24 @@ private[reactive] final class SwitchMapObservable[A, B](source: Observable[A], f
activeChild := childObservable.unsafeSubscribeFn(new Observer[B] {
def onNext(elem: B) =
self.synchronized {
if (upstreamIsDone || myChildIndex != activeChildIndex)
if (myChildIndex != activeChildIndex)
Stop
else {
ack = out.onNext(elem).syncOnStopOrFailure(_ => cancelFromDownstream())
ack
}
}

def onComplete(): Unit = ()
def onComplete(): Unit = self.synchronized {
if (myChildIndex == activeChildIndex) {
if (upstreamIsDone) {
activeChildIndex = -1
out.onComplete()
} else {
lastChildIsDone = true
}
}
}
def onError(ex: Throwable): Unit =
self.synchronized {
if (myChildIndex == activeChildIndex)
Expand Down Expand Up @@ -104,9 +115,10 @@ private[reactive] final class SwitchMapObservable[A, B](source: Observable[A], f
def onComplete(): Unit = self.synchronized {
if (!upstreamIsDone) {
upstreamIsDone = true
activeChildIndex = -1
activeChild.cancel()
out.onComplete()
if (lastChildIsDone) {
activeChildIndex = -1
out.onComplete()
}
}
}
})
Expand Down
Expand Up @@ -86,24 +86,14 @@ object SwitchMapSuite extends BaseOperatorSuite {
assertEquals(r2.value.get, r1.value.get)
}

test("switchMap should cancel child after stream has ended") { implicit s =>
val source = Observable.now(1L).switchMap { x =>
Observable.intervalWithFixedDelay(1.second, 1.second).map(_ + x)
}

var total = 0L
source.unsafeSubscribeFn(new Observer.Sync[Long] {
def onNext(elem: Long): Ack = {
total += elem
Continue
}

def onError(ex: Throwable): Unit = throw ex
def onComplete(): Unit = ()
})

s.tick()
assertEquals(total, 0)
assert(s.state.tasks.isEmpty, "tasks.isEmpty")
test("Observable.unit.switchMap(_ => a) <-> a") { implicit s =>
val expectedCount = 100
val size = Observable.unit
.switchMap(_ => Observable.interval(1.second).take(expectedCount))
.countL
.runToFuture

s.tick(1.day)
assertEquals(size.value.get.get, expectedCount)
}
}

0 comments on commit 01818d5

Please sign in to comment.