-
-
Notifications
You must be signed in to change notification settings - Fork 244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
1202: prioritized merge #1205
1202: prioritized merge #1205
Conversation
priority.compareTo(o.priority) | ||
} | ||
|
||
val pq = new PriorityBlockingQueue[PQElem](sources.size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use normal PriorityQueue
instead of a blocking one? It seems like the access is synchronized anyway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I will look at replacing it and adding necessary synchronization, then adding tests.
Otherwise it looks good?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I haven't got a moment to take a close look, but it seems to look good.
I think it's not too far from mergeMapPrioritized
version If you'd like to go further:
onNext
of the sourceObservable
(upstream) would add a newObservable
to the composite instead of doing it from the starting listRefCountObservable
is used to control the number of active children and their lifecycle- It would be awesome to support different overflow strategies, but it would require to update
BufferedSubscriber
to allow priority queues. The implementation is inAbstractBackPressuredBufferedSubscriber
for Back-Pressure strategy. Unfortunately I don't know if there is a good implementation of concurrent priority queue out there. I imagine it's too much work, and I don't have capacity to work on it right now so I suppose we can stay with the current behavior which is a back-pressure with bufferSize of 0.
If you don't feel like doing it - that's okay, mergePrioritizedList
is already better than the status quo
It might also be nice to add a benchmark vs normal merge, but that's just my curiosity and completely optional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since updating BufferedSuscriber
would be lots of work I kept my implementation as is. I swapped out the Java PriorityBlockingQueue
for the Scala mutable.PriorityQueue
, and added tests.
@Avasil will you go ahead and merge this, or is there anything else needed? Thanks. |
* sources have items available. If items are available from sources with the | ||
* same priority, the order is undefined. | ||
*/ | ||
def mergePrioritizedList[A](sources: Observable[A]*)(priorities: Seq[Int]): Observable[A] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very interesting implementation @ctoomey.
I do have one suggestion about this signature ... it's not clear that the priorities
should match the sources and there's no reason for it.
How about ...
def mergePrioritizedList[A](sources: (Int, Observable[A])*): Observable[A]
Then you could do ...
Observable.mergePrioritizedList(
1 -> observable1,
2 -> observable2,
//...
)
That way the signature is clearer, and you don't need that runtime assert
in the implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya I'd prefer that signature too and in fact had it that way originally, but decided to match the way Akka streams did it. I'll change it.
CI failed due to unrelated test |
Another unrelated test failure. Please re-run the CI, thanks. |
I'm not sure about |
Thanks @Avasil -- looking forward to having this merged and released -- any idea when you'll do the next release? |
I wanted to do the next release with #1213 but it might take a while and we could do something earlier. |
It'd be great to have a release soon after this PR is merged, if that's not too much trouble. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's looking good @ctoomey ! Thank you for the effort and sorry for slow responses, I've been away for the last two weeks.
I'm wondering about how strict should we be about prioritizing sources.
e.g.
val source =
Observable.mergePrioritizedList((2, Observable(1, 1, 1, 1, 1, 1)), (1, Observable(2, 2, 2, 2, 2, 2)))
val f = source.toListL.runToFuture
All elements are available immediately for both sources, should it always return List(1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2)
?
BTW @AlecZorab Do you think the issue you mentioned is fixed now? It seems like the comment was deleted
|
||
// MUST BE synchronized by `lock` | ||
def signalOnNext(): Future[Ack] = | ||
lock.synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
redundant synchronize
- comment already mentions it and it is done in onNext
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
if (isDone) Stop else out.onNext(a) | ||
} | ||
|
||
def processNext(): Future[Ack] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs a comment: // MUST BE synchronized by `lock`
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@@ -6056,6 +6056,19 @@ object Observable extends ObservableDeprecatedBuilders { | |||
} | |||
} | |||
|
|||
/** Given a sequence of priority/observable pairs, combines them |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be good to mention whether higher value is higher priority
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
I'm not sure if the issue I spotted is fixed or not. It looks like The example that springs to mind: Observable calls But maybe the system invariants forbid that from happening? |
Good question. I thought more about this and how to document it and came up w/ the following doc. update that'll be in the next commit. One improvement this triggered was changing the order of subscribing to the sources from the given order to prioritized order, such that if multiple sources have items immediately available, the highest priority ones will come through first.
In your example, the proposed ordering is not guaranteed though, as the sequencing of the sources calling |
Could we provide the current implementation with API that would allow us to add customizable back-pressure in later release? |
case Continue => | ||
isDone = true | ||
out.onComplete() | ||
completePromises() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding the issue:
The key is that so long as there are items the downstream hasn't yet processed, lastAck will remain a Future[Ack], and signalOnComplete won't call out.onComplete until that future completes, i.e., downstream has finished processing all pending items including the last one from the last source.
Take the simplest case of your example: 2 sources w/ 1 item each:
Source A calls onNext, item gets sent downstream: lastAck = Future[Ack for A's item]
Source A calls onComplete: completedCount = 1
Source B calls onNext, item gets queued: lastAck = Future[Ack for A's item].flatMap(Future[Ack for B's item])
Source B calls onComplete: completedCount = 2; lastAck.onComplete { ... out.onComplete }
Looking at the contract: https://monix.io/docs/3x/reactive/observers.html#contract
- Back-pressure for onComplete and onError is optional: when calling onComplete or onError you are not required to wait on the Future[Ack] of the previous onNext.
It implies that Source B could call:
out.onNext(lastElem)
out.onComplete()
In that case, we could send remaining elements from the queue to the downstream instead of completing each promise with Stop
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, we could send remaining elements from the queue to the downstream instead of completing each promise with
Stop
.
So long as the downstream returns Continue
and none of the upstreams call onError
, the downstream will always get all the items emitted by the upstreams. That's what the "should push all items downstream before calling onComplete" test is verifying. Again:
The key is that so long as there are items the downstream hasn't yet processed, lastAck will remain a Future[Ack], and signalOnComplete won't call out.onComplete (or completePromises) until that future completes, i.e., downstream has finished processing all pending items including the last one from the last source.
We can't add an extra parameter, if that's what you mean. But should we add a buffered alternative, can always add an overload, with the same name or with a different name. |
Note I'm fine with a If some buffering is desired however, it would probably be incorrect to add it after the merge, because you can buffer lower-priority items, and keep the high priority items from being processed. So I'd like us to open an issue for a buffered variant too. I'm thinking that each priority can have its own buffer and then a loop would go and select between them, cycling from high priority to low priority. |
... speaking of this, this is how RxJava built a kick-ass Note: I enabled GitHub Actions, so a "stable" "snapshot" version will be published as soon as this PR is merged. |
throw e | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the above logic to ensure that we complete the upstream onNext promises when the downstream stops or errors early while we still have pending upstream items. Added tests to verify as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could use our syncOnStopOrFailure
syntax here, should be more efficient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much nicer, thanks.
Created #1227
Great, thanks @alexandru |
So having done some println debugging, I'm convinced that the issue I highlighted isn't an issue any more. The subtlety in the implementation that I missed is in the way the chaining of futures works in @ctoomey, my apologies for taking so long to understand what you were talking about |
|
||
def unsafeSubscribeFn(subscriber: Subscriber[A]): Cancelable = { | ||
val onNext = subscriber.onNext(elem) | ||
onNext.onComplete(_ => subscriber.onComplete()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since it's allowed in the contract, it might be a better test to change this to call onComplete
immediately, rather than waiting for onNext
to complete. Having tried it, the test still passes, so it's really just a case of making it a little bit more stringent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
No problem, good to be sure for this kind of thing. |
@Avasil will you please merge if there's nothing else? Thanks |
Awesome, I've merged into |
And how do we go about keeping consistency between series, should we open a PR from this branch and target 4.x.? |
* 1202: prioritized merge * 1202: prioritized merge * 1202: prioritized merge * Fix unrelated InputStreamObservableSuite bug that failed build * 1202: prioritized merge * 1202: prioritized merge * 1202: prioritized merge * 1202: prioritized merge * 1202: prioritized merge * 1202: prioritized merge * 1202: prioritized merge * 1202: prioritized merge * 1202: prioritized merge
It didn't, I introduced a parameter in our secrets, called
Unfortunately we have to start making incompatible changes, and maintaining two branches is inevitable, because the 4.x release will not be stable enough to release for a couple of months. So yes, new developments that are compatible with Monix 3.x should target the git cherry-pick 1a5b9b1b41eaa2a1e955302e741a6daab1ff73f6 In this case it was clear, no PRs needed, I just committed it. The author could do that too, in which case a PR would be needed. And a PR would also be needed in case of conflicts. |
We have version |
@alexandru the
|
@ctoomey sorry, I updated JCTools, and in the process I ended up "shading" it, but forgot to "aggregate" it, so it didn't get published. We're fixing it. |
@ctoomey try |
Thanks @alexandru, looks good now. |
Incomplete draft for #1202 for review/discussion. Tests to follow as appropriate.