Extracting Future queuing logic from AsyncBase #685

Merged
merged 3 commits into from Oct 4, 2016

Projects

None yet

3 participants

@jnievelt
Contributor
jnievelt commented Sep 2, 2016 edited

Adding FutureQueue, a structure that essentially takes in (S, Future[T]) and gives back (S, Try[T]).

I didn't mean to totally gut AsyncBase, but it's mostly become a shim integrating its own interface (apply, tick), the OperationContainer interface (execute, executeTick), and its FutureQueue.

Making this private for now.

Adding a few more tests while I'm at it. Cleaning up Node.scala style since it does that automatically.

The next step would be to integrate this with AggregatingOutputCollector.

Joe Nievelt Extracting Future queuing logic from AsyncBase
47eeac6
@johnynek johnynek and 1 other commented on an outdated diff Sep 6, 2016
...twitter/summingbird/online/executor/FutureQueue.scala
+ val p = Promise[Unit]()
+ fs.foreach { f =>
+ f.ensure {
+ // Note that since we are only decrementing we can cross 0 only
+ // once (unless we decrement more than 2^32 times).
+ if (count.decrementAndGet() == 0) {
+ p.setValue(())
+ }
+ }
+ }
+ p
+ }
+ }
+}
+
+private[summingbird] class FutureQueue[S, T](
@johnynek
johnynek Sep 6, 2016 Collaborator

can we comment the type parameters?

@pankajroark
pankajroark Sep 6, 2016 edited Contributor

Can you also add a doc comment describing what the semantics are? Especially around blocking and how State relates to Future.

@johnynek johnynek and 1 other commented on an outdated diff Sep 6, 2016
...twitter/summingbird/online/executor/FutureQueue.scala
+ p.setValue(())
+ }
+ }
+ }
+ p
+ }
+ }
+}
+
+private[summingbird] class FutureQueue[S, T](
+ maxWaitingFutures: MaxWaitingFutures,
+ maxWaitingTime: MaxFutureWaitTime,
+ maxEmitPerExec: MaxEmitPerExecute) {
+ @transient protected lazy val logger: Logger = LoggerFactory.getLogger(getClass)
+
+ private[executor] lazy val outstandingFutures = Queue.linkedNonBlocking[Future[Unit]]
@johnynek
johnynek Sep 6, 2016 Collaborator

can this just be a regular private?

@jnievelt
jnievelt Sep 6, 2016 Contributor

Tests are currently inspecting this. It might make sense to modify/eliminate that test (I've broken it for now anyway, I think). Otherwise it can be private.

@johnynek johnynek and 1 other commented on an outdated diff Sep 6, 2016
...twitter/summingbird/online/executor/FutureQueue.scala
+
+private[summingbird] class FutureQueue[S, T](
+ maxWaitingFutures: MaxWaitingFutures,
+ maxWaitingTime: MaxFutureWaitTime,
+ maxEmitPerExec: MaxEmitPerExecute) {
+ @transient protected lazy val logger: Logger = LoggerFactory.getLogger(getClass)
+
+ private[executor] lazy val outstandingFutures = Queue.linkedNonBlocking[Future[Unit]]
+ private lazy val numPendingOutstandingFutures = new AtomicInteger(0)
+ private lazy val responses = Queue.linkedNonBlocking[(S, Try[T])]
+
+ def addAll(iter: TraversableOnce[(S, Future[T])]): Unit = {
+ val addedSize = iter.foldLeft(0) {
+ case (size, (state, fut)) =>
+ val responded =
+ fut
@johnynek
johnynek Sep 6, 2016 edited Collaborator

what about using fut.transform { t => responses.put((state, t)); Future.const(t) } instead of onSuccess and onFailure?

@jnievelt
jnievelt Sep 6, 2016 Contributor

I can use Future#respond, but it won't be quite as clean as that because we're also converting between Twitter & Scala Try.

@johnynek johnynek commented on an outdated diff Sep 6, 2016
...twitter/summingbird/online/executor/FutureQueue.scala
+ }
+ }
+}
+
+private[summingbird] class FutureQueue[S, T](
+ maxWaitingFutures: MaxWaitingFutures,
+ maxWaitingTime: MaxFutureWaitTime,
+ maxEmitPerExec: MaxEmitPerExecute) {
+ @transient protected lazy val logger: Logger = LoggerFactory.getLogger(getClass)
+
+ private[executor] lazy val outstandingFutures = Queue.linkedNonBlocking[Future[Unit]]
+ private lazy val numPendingOutstandingFutures = new AtomicInteger(0)
+ private lazy val responses = Queue.linkedNonBlocking[(S, Try[T])]
+
+ def addAll(iter: TraversableOnce[(S, Future[T])]): Unit = {
+ val addedSize = iter.foldLeft(0) {
@johnynek
johnynek Sep 6, 2016 Collaborator

what about:

val addedSize = iter.map { case (state, fut) =>
  val responded = ...
  if (addOutstandingFuture(responded)) 1 else 0
}.sum 
@johnynek
johnynek Sep 6, 2016 Collaborator

or, if we care about perf:

var addedSize = 0
iter.foreach { case (state, fut) =>
  ...
  if ( ) addedSize += 1
}
@johnynek johnynek commented on an outdated diff Sep 6, 2016
...twitter/summingbird/online/executor/FutureQueue.scala
+ }
+ }
+
+ if (outstandingFutures.size > maxWaitingFutures.get) {
+ /*
+ * This can happen on large key expansion.
+ * May indicate maxWaitingFutures is too low.
+ */
+ logger.debug(
+ "Exceeded maxWaitingFutures({}), put {} futures", maxWaitingFutures.get, addedSize
+ )
+ }
+ }
+
+ def addAllFuture(state: S, iterFut: Future[TraversableOnce[(S, Future[T])]]): Unit =
+ addOutstandingFuture(
@johnynek
johnynek Sep 6, 2016 Collaborator

can we use transform here as well. Not so crazy about the .onSuccess onFailure while we are making changes.

@johnynek johnynek commented on an outdated diff Sep 6, 2016
...twitter/summingbird/online/executor/FutureQueue.scala
+ p
+ }
+ }
+}
+
+private[summingbird] class FutureQueue[S, T](
+ maxWaitingFutures: MaxWaitingFutures,
+ maxWaitingTime: MaxFutureWaitTime,
+ maxEmitPerExec: MaxEmitPerExecute) {
+ @transient protected lazy val logger: Logger = LoggerFactory.getLogger(getClass)
+
+ private[executor] lazy val outstandingFutures = Queue.linkedNonBlocking[Future[Unit]]
+ private lazy val numPendingOutstandingFutures = new AtomicInteger(0)
+ private lazy val responses = Queue.linkedNonBlocking[(S, Try[T])]
+
+ def addAll(iter: TraversableOnce[(S, Future[T])]): Unit = {
@johnynek
johnynek Sep 6, 2016 Collaborator

I'm not sure we should have this method. Why not:

def add(s: S, v: Future[T]): Boolean =

then when we need to call addAll instead do: val addedSize = iter.count { case (s, fut) => add(s, fut) } and optionally log there?

@pankajroark pankajroark commented on an outdated diff Sep 6, 2016
...twitter/summingbird/online/executor/FutureQueue.scala
+ size
+ }
+ }
+
+ if (outstandingFutures.size > maxWaitingFutures.get) {
+ /*
+ * This can happen on large key expansion.
+ * May indicate maxWaitingFutures is too low.
+ */
+ logger.debug(
+ "Exceeded maxWaitingFutures({}), put {} futures", maxWaitingFutures.get, addedSize
+ )
+ }
+ }
+
+ def addAllFuture(state: S, iterFut: Future[TraversableOnce[(S, Future[T])]]): Unit =
@pankajroark
pankajroark Sep 6, 2016 edited Contributor

Can you document the state contract here? Looks like "state" is only used for putting into failure response. Looking at FutureQueue as a standalone abstraction what is the reasoning behind such api design. Perhaps we should rename the parameter to reflect how it's used.

@johnynek johnynek and 1 other commented on an outdated diff Sep 8, 2016
...ter/summingbird/online/executor/FutureQueueLaws.scala
+import org.scalatest.concurrent.Eventually
+import org.scalatest.time.{ Seconds, Span }
+import scala.util.{ Failure, Success }
+
+case class NonNegativeShort(get: Short) {
+ require(get >= 0)
+}
+
+class FutureQueueLaws extends Properties("FutureQueue") with Eventually {
+ def genTry[T](implicit arb: Arbitrary[T]): Gen[Try[T]] = Gen.oneOf(arb.arbitrary.map(Return(_)), Arbitrary.arbitrary[java.lang.Exception].map(Throw(_)))
+ implicit def arbTry[T: Arbitrary] = Arbitrary(genTry[T])
+
+ implicit val arbNonNegativeShort: Arbitrary[NonNegativeShort] = Arbitrary(
+ Arbitrary.arbitrary[Short].filter { _ >= 0 }.map { NonNegativeShort }
+ )
+ def twitterToScala[T](t: Try[T]) = t match {
@johnynek
johnynek Sep 8, 2016 Collaborator

I've seen this written like 100 times. I finally got it into util, but we might not want to upgrade the dependency just to get it (we are having some GC regressions in some of the latest finagles).

@jnievelt
jnievelt Sep 10, 2016 Contributor

Looks like we already pick up bijection-util so I can just use what we have in UtilBijections.

Joe Nievelt PR feedback
32afb82
@jnievelt
Contributor

These comments should be addressed. On review, there is sort of a dependency on Queue, so maybe the dream of moving this out of summingbird is a bit farfetched?

I'm thinking about moving it up into the online package next to Queue. Mixed feelings about leaving is private vs. not.

@johnynek johnynek and 1 other commented on an outdated diff Sep 10, 2016
...twitter/summingbird/online/executor/FutureQueue.scala
@@ -141,6 +188,11 @@ private[summingbird] class FutureQueue[S, T](
}
}
+ /**
+ * Retrieve any completed results, along with their associated states.
+ *
+ * Returns up to maxEmitPerExec items.
+ */
def dequeue: TraversableOnce[(S, Try[T])] = {
@johnynek
johnynek Sep 10, 2016 Collaborator

just a question, can't this be Iterable[(S, Try[T])]? .take returns something iterable, right? This seems to limit the caller to only go through it once, but in fact, we materialize it into memory already.

@jnievelt
jnievelt Sep 12, 2016 Contributor

Yeah I guess Queue is set to return Seq; seems reasonable to do the same here.

@johnynek johnynek commented on an outdated diff Sep 10, 2016
...twitter/summingbird/online/executor/FutureQueue.scala
+ case Return(iter) => addAll(iter)
+ case Throw(ex) => responses.put((state, Failure(ex)))
+ }.unit
+ )
+
+ private def addOutstandingFuture(fut: Future[Unit]): Boolean =
+ if (!fut.isDefined) {
+ numPendingOutstandingFutures.incrementAndGet
+ val ensured = fut.ensure(numPendingOutstandingFutures.decrementAndGet)
+ outstandingFutures.put(ensured)
+ true
+ } else {
+ false
+ }
+
+ private def forceExtraFutures() {
@johnynek
johnynek Sep 10, 2016 Collaborator

can we make this forceExtraFutures(): Unit = { I think that is the preferred scala style now.

@johnynek johnynek commented on an outdated diff Sep 10, 2016
...twitter/summingbird/online/executor/FutureQueue.scala
+ outstandingFutures.putAll(pending)
+ }
+ } else {
+ // only dequeueAll if there's bang for the buck
+ if (outstandingFutures.size >= FutureQueue.OutstandingFuturesDequeueRatio * pendingFuturesCount) {
+ outstandingFutures.dequeueAll(_.isDefined)
+ }
+ }
+ }
+
+ /**
+ * Retrieve any completed results, along with their associated states.
+ *
+ * Returns up to maxEmitPerExec items.
+ */
+ def dequeue: TraversableOnce[(S, Try[T])] = {
@johnynek
johnynek Sep 10, 2016 Collaborator

can we return Iterable[(S, Try[T])] here?

@johnynek johnynek commented on an outdated diff Sep 10, 2016
...twitter/summingbird/online/executor/FutureQueue.scala
+ if (outstandingFutures.size >= FutureQueue.OutstandingFuturesDequeueRatio * pendingFuturesCount) {
+ outstandingFutures.dequeueAll(_.isDefined)
+ }
+ }
+ }
+
+ /**
+ * Retrieve any completed results, along with their associated states.
+ *
+ * Returns up to maxEmitPerExec items.
+ */
+ def dequeue: TraversableOnce[(S, Try[T])] = {
+ // don't let too many futures build up
+ forceExtraFutures()
+ // Take all results that have been placed for writing to the network
+ responses.take(maxEmitPerExec.get)
@johnynek
johnynek Sep 10, 2016 Collaborator

can we not take maxEmitPerExec as a class param and instead take it as a method arg here:

def dequeue(max: Int): Iterable[(S, Try[T])] = ...

This simplifies the state of the class and allows the caller to have better control (no need for a uniform max).

@johnynek johnynek commented on an outdated diff Sep 10, 2016
...twitter/summingbird/online/executor/FutureQueue.scala
+ *
+ * Minimally, this queue accepts a state value along with a Future. Once
+ * the Future completes, the result can be retrieved, along with the state,
+ * via the dequeue method.
+ *
+ * To support batching, inputs can also be TraversableOnce[(S, Future[T])]
+ *
+ * To support additional asynchronous behavior, a state value along with a
+ * Future[TraversableOnce[(S, Future[T])]] can be inserted. On failure, the
+ * outer state value is returned with the Failure. On success, the inner
+ * items are queued up to be inserted when they complete.
+ */
+private[summingbird] class FutureQueue[S, T](
+ maxWaitingFutures: MaxWaitingFutures,
+ maxWaitingTime: MaxFutureWaitTime,
+ maxEmitPerExec: MaxEmitPerExecute) {
@johnynek
johnynek Sep 10, 2016 Collaborator

can we remove this param and pass it to dequeue?

@johnynek johnynek commented on an outdated diff Sep 10, 2016
...twitter/summingbird/online/executor/FutureQueue.scala
+ logger.debug(
+ "Exceeded maxWaitingFutures({}), put {} futures", maxWaitingFutures.get, addedSize
+ )
+ }
+
+ addedSize
+ }
+
+ /**
+ * Queue a collection Future. On failure, the state can be retrieved
+ * with the failure. On success, the results are queued via addAll.
+ *
+ * The Future given here counts against the maxWaitingFutures, so calls
+ * to this method may cause a wait on dequeue.
+ */
+ def addAllFuture(state: S, iterFut: Future[TraversableOnce[(S, Future[T])]]): Unit =
@johnynek
johnynek Sep 10, 2016 Collaborator

this seems like a really weird method. I think in the original, with S = Seq[S1] then what you actually have is state is the union of all states, where each part is separate in TraversableOnce.

So, really, I think you have something like, there is a Monoid[S] and the sum of all the TraversableOnce[(S, _)] is the state.

So, I don't see how this can be general without it.

In the general case, I think we might want something like Seq[S], Future[Seq[Future[T]]] so we have a sequence in the same order of states and Ts. Is that right?

@johnynek johnynek and 1 other commented on an outdated diff Sep 10, 2016
...m/twitter/summingbird/online/executor/AsyncBase.scala
- private def finishExecute(fIn: Future[TraversableOnce[(Seq[S], Future[TraversableOnce[O]])]]) = {
- addOutstandingFuture(handleSuccess(fIn).unit)
+ private def finishExecute(states: Seq[S], fIn: Future[TraversableOnce[(Seq[S], Future[TraversableOnce[O]])]]) = {
@johnynek
johnynek Sep 10, 2016 Collaborator

actually, it looks like Seq[S] is really an Option, is that right? Can we simplify this?

@johnynek
johnynek Sep 10, 2016 Collaborator

actually, only the first Seq[S] is an option, so that breaks the uniformity of the typing in FutureQueue.

I wonder if addAllFuture is so specialized, it should stay in this class. I don't see how any user can use it generally without a big coincidence in the FutureQueue implementation. To me, I think so, then I think FutureQueue becomes very clear and simple.

@jnievelt
jnievelt Sep 12, 2016 Contributor

Sure, I like that approach too.

@jnievelt
jnievelt Sep 12, 2016 Contributor

On review, the main thing we lose is the presence of the outer Future on the queue (as something counting against the max that we can wait on, etc.). Maybe this is a reasonable tradeoff though.

@pankajroark pankajroark and 1 other commented on an outdated diff Sep 12, 2016
...twitter/summingbird/online/executor/FutureQueue.scala
+
+ // Track the futures that may be outstanding. We may need to wait on some
+ // of them in case the actual outstanding count gets too high.
+ private lazy val outstandingFutures = Queue.linkedNonBlocking[Future[Unit]]
+ // Track the number of actually outstanding futures
+ private[executor] lazy val numPendingOutstandingFutures = new AtomicInteger(0)
+ // When futures complete, they deposit their results here.
+ private lazy val responses = Queue.linkedNonBlocking[(S, Try[T])]
+
+ private val tryBijection = UtilBijections.twitter2ScalaTry[T]
+
+ /**
+ * Add a Future with state onto the queue. This can eventually be
+ * dequeued as (S, Try[T]) when the Future completes.
+ *
+ * Returns true if the Future is not yet complete. When false, the
@pankajroark
pankajroark Sep 12, 2016 Contributor

The meaning of false return value seems confusing. Why would caller of add care whether that future counts towards maxWaitingFutures limit or not; because of asynchronous execution user doesn't have control over how futures finish and thus can't do anything about it when encountering a false value.

For addOutstandingFutures the return value of false is ok, since that is internal to this class and false more directly indicates that we could not add to the list of outstanding futures. But I don't see much value in returning that as the return value of add.

@jnievelt
jnievelt Sep 12, 2016 Contributor

Fair point. I was anyway a little worried that folks would confuse this with "item was queued or not" semantics. Making this a Unit method simplifies that; I'll make this a private internal method and making a public wrapper for it.

Joe Nievelt Reworkign per PR feedback
da0fe97
@jnievelt
Contributor
jnievelt commented Oct 4, 2016

Any remaining concerns on this one? I've verified that the current diff works fine with my planned changes to AggregatorOutputCollector.

@johnynek
Collaborator
johnynek commented Oct 4, 2016

+1 this is really great work, @jnievelt

I think the FutureQueue is really a general rate-limiter on S => Future[T] now.

@jnievelt jnievelt merged commit 4a0d92e into develop Oct 4, 2016

2 checks passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
continuous-integration/travis-ci/push The Travis CI build passed
Details
@johnynek johnynek deleted the jnievelt/future-queue branch Oct 4, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment