Skip to content

Commit

Permalink
Refactor: format code
Browse files Browse the repository at this point in the history
  • Loading branch information
TapanVaishnav committed Oct 31, 2019
1 parent 39f63e2 commit 6dc9a95
Showing 1 changed file with 4 additions and 2 deletions.
Expand Up @@ -71,6 +71,9 @@ private[reactive] final class MapParallelUnorderedObservable[A, B](
private[this] val semaphore = AsyncSemaphore(parallelism)
// Buffer with the supplied overflow strategy.
private[this] val buffer = BufferedSubscriber[B](out, overflowStrategy, MultiProducer)
// everything gets canceled at once
private[this] val cancelComposite = Task.eval(composite.cancel())

// Flag indicating whether a final event was called, after which
// nothing else can happen. It's a very light protection, as
// access to it is concurrent and not synchronized
Expand All @@ -79,8 +82,6 @@ private[reactive] final class MapParallelUnorderedObservable[A, B](
// coming from the `buffer` - this indicates that the downstream
// no longer wants any events, so we must cancel
private[this] var lastAck: Ack = Continue
// everything gets canceled at once
private[this] val cancelComposite = Task.eval(composite.cancel())

private def process(elem: A) = {
// For protecting against user code, without violating the
Expand Down Expand Up @@ -115,6 +116,7 @@ private[reactive] final class MapParallelUnorderedObservable[A, B](
self.onError(ex)
}
)

ref.doOnCancel(cancelComposite)
}

Expand Down

0 comments on commit 6dc9a95

Please sign in to comment.