Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1202: prioritized merge #1205

Merged
merged 14 commits into from
Aug 11, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -6056,6 +6056,18 @@ object Observable extends ObservableDeprecatedBuilders {
}
}

/** Given an observable sequence and associated priorities, it combines them
* into a new observable, preferring higher-priority sources when multiple
* sources have items available.
*/
def mergePrioritizedList[A](sources: Observable[A]*)(priorities: Seq[Int]): Observable[A] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very interesting implementation @ctoomey.

I do have one suggestion about this signature ... it's not clear that the priorities should match the sources and there's no reason for it.

How about ...

def mergePrioritizedList[A](sources: (Int, Observable[A])*): Observable[A]

Then you could do ...

Observable.mergePrioritizedList(
  1 -> observable1,
  2 -> observable2,
  //...
)

That way the signature is clearer, and you don't need that runtime assert in the implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya I'd prefer that signature too and in fact had it that way originally, but decided to match the way Akka streams did it. I'll change it.

if (sources.isEmpty) {
Observable.empty
} else {
new MergePrioritizedListObservable[A](sources, priorities)
}
}

/** Given a list of source Observables, emits all of the items from
* the first of these Observables to emit an item or to complete,
* and cancel the rest.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright (c) 2014-2020 by The Monix Project Developers.
* See the project homepage at: https://monix.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.reactive.internal.builders

import java.util.concurrent.PriorityBlockingQueue

import monix.execution.Ack.{Continue, Stop}
import monix.execution.cancelables.CompositeCancelable
import monix.execution.{Ack, Cancelable, Scheduler}
import monix.reactive.Observable
import monix.reactive.observers.Subscriber

import scala.concurrent.{Future, Promise}
import scala.jdk.CollectionConverters._
import scala.util.Success

private[reactive] final class MergePrioritizedListObservable[A](sources: Seq[Observable[A]], priorities: Seq[Int])
extends Observable[A] {
require(sources.size == priorities.size, "sources.size != priorities.size")

override def unsafeSubscribeFn(out: Subscriber[A]): Cancelable = {
import out.scheduler

val numberOfObservables = sources.size

val lock = new AnyRef
var isDone = false

// NOTE: We use arrays and other mutable structures here to be as performant as possible.

// MUST BE synchronized by `lock`
var lastAck = Continue: Future[Ack]

case class PQElem(data: A, promise: Promise[Ack], priority: Int) extends Comparable[PQElem] {
override def compareTo(o: PQElem): Int =
priority.compareTo(o.priority)
}

val pq = new PriorityBlockingQueue[PQElem](sources.size)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use normal PriorityQueue instead of a blocking one? It seems like the access is synchronized anyway

Copy link
Contributor Author

@ctoomey ctoomey Jun 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will look at replacing it and adding necessary synchronization, then adding tests.

Otherwise it looks good?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I haven't got a moment to take a close look, but it seems to look good.

I think it's not too far from mergeMapPrioritized version If you'd like to go further:

  • onNext of the source Observable (upstream) would add a new Observable to the composite instead of doing it from the starting list
  • RefCountObservable is used to control the number of active children and their lifecycle
  • It would be awesome to support different overflow strategies, but it would require to update BufferedSubscriber to allow priority queues. The implementation is in AbstractBackPressuredBufferedSubscriber for Back-Pressure strategy. Unfortunately I don't know if there is a good implementation of concurrent priority queue out there. I imagine it's too much work, and I don't have capacity to work on it right now so I suppose we can stay with the current behavior which is a back-pressure with bufferSize of 0.

If you don't feel like doing it - that's okay, mergePrioritizedList is already better than the status quo

It might also be nice to add a benchmark vs normal merge, but that's just my curiosity and completely optional.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since updating BufferedSuscriber would be lots of work I kept my implementation as is. I swapped out the Java PriorityBlockingQueue for the Scala mutable.PriorityQueue, and added tests.


// MUST BE synchronized by `lock`
var completedCount = 0

// MUST BE synchronized by `lock`
def rawOnNext(a: A): Future[Ack] = {
if (isDone) Stop else out.onNext(a)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the above logic to ensure that we complete the upstream onNext promises when the downstream stops or errors early while we still have pending upstream items. Added tests to verify as well.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use our syncOnStopOrFailure syntax here, should be more efficient

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much nicer, thanks.


def processNext(): Future[Ack] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs a comment: // MUST BE synchronized by `lock`

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

val e = pq.remove()
val fut = rawOnNext(e.data)
e.promise.completeWith(fut)
fut
}

// MUST BE synchronized by `lock`
def signalOnNext(): Future[Ack] =
lock.synchronized {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant synchronize - comment already mentions it and it is done in onNext

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

lastAck = lastAck match {
case Continue => processNext()
case Stop => Stop
case async =>
async.flatMap {
// async execution, we have to re-sync
case Continue => lock.synchronized(processNext())
case Stop => Stop
}
}

lastAck
}

def signalOnError(ex: Throwable): Unit =
lock.synchronized {
if (!isDone) {
isDone = true
out.onError(ex)
lastAck = Stop
completePromises()
}
}

def signalOnComplete(): Unit =
lock.synchronized {
completedCount += 1

if (completedCount == numberOfObservables && !isDone) {
lastAck match {
case Continue =>
isDone = true
out.onComplete()
completePromises()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the issue:

The key is that so long as there are items the downstream hasn't yet processed, lastAck will remain a Future[Ack], and signalOnComplete won't call out.onComplete until that future completes, i.e., downstream has finished processing all pending items including the last one from the last source.

Take the simplest case of your example: 2 sources w/ 1 item each:

Source A calls onNext, item gets sent downstream: lastAck = Future[Ack for A's item]
Source A calls onComplete: completedCount = 1
Source B calls onNext, item gets queued: lastAck = Future[Ack for A's item].flatMap(Future[Ack for B's item])
Source B calls onComplete: completedCount = 2; lastAck.onComplete { ... out.onComplete }

Looking at the contract: https://monix.io/docs/3x/reactive/observers.html#contract

  1. Back-pressure for onComplete and onError is optional: when calling onComplete or onError you are not required to wait on the Future[Ack] of the previous onNext.

It implies that Source B could call:

out.onNext(lastElem)
out.onComplete()

In that case, we could send remaining elements from the queue to the downstream instead of completing each promise with Stop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, we could send remaining elements from the queue to the downstream instead of completing each promise with Stop.

So long as the downstream returns Continue and none of the upstreams call onError, the downstream will always get all the items emitted by the upstreams. That's what the "should push all items downstream before calling onComplete" test is verifying. Again:

The key is that so long as there are items the downstream hasn't yet processed, lastAck will remain a Future[Ack], and signalOnComplete won't call out.onComplete (or completePromises) until that future completes, i.e., downstream has finished processing all pending items including the last one from the last source.

case Stop =>
() // do nothing
case async =>
async.onComplete {
case Success(Continue) =>
lock.synchronized {
if (!isDone) {
isDone = true
out.onComplete()
completePromises()
}
}
case _ =>
() // do nothing
}
}

lastAck = Stop
}
}

def completePromises(): Unit = {
pq.iterator().asScala.foreach(e => e.promise.complete(Success(Stop)))
}

val composite = CompositeCancelable()

sources.zip(priorities).foreach { pair =>
composite += pair._1.unsafeSubscribeFn(new Subscriber[A] {
implicit val scheduler: Scheduler = out.scheduler

def onNext(elem: A): Future[Ack] = {
if (isDone) {
Stop
} else {
pq.add(PQElem(elem, Promise(), pair._2))
signalOnNext()
}
}

def onError(ex: Throwable): Unit =
signalOnError(ex)

def onComplete(): Unit =
signalOnComplete()
})
}
composite
}
}