New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Observable.mapParallelOrdered #558

Merged
merged 13 commits into from Nov 1, 2018

Conversation

Projects
None yet
4 participants
@Avasil
Collaborator

Avasil commented Jan 22, 2018

Closes #329

This is my first attempt.
Tests are the same as for Observable#mapParallelUnordered minus backpressuring on buffer and I've added test for parallel processing and preserving order.

It passes all of the test cases so that looks good but it is pretty likely I've got lost in proper cancelation, cleaning resources etc despite basing implementation on Observable#mapParallelUnordered. :)

Things to consider:

  • After my experience with default mutable.Queue in #509 I think we might need to use something else to get easiest boost in performance. Right now it is probably best to have anything working but I think this operator will be used a lot so we will have to tune its performance sooner or later.
  • Sempahore is released as soon as computation is complete - not after getting ack from downstream. This is because there might be situation when we have parallelism = 1000 and 999 tasks are idle but the first element is still doing some work, "wasting" time. I think it can lead to higher memory consumption but better throughput so that's the tradeoff to make. It should probably be limited with some kind of BlockingQueue so it doesn't consume memory forever if oldest task never finish.
    EDIT: I came to the conclusion I can't do that, it would violate backpressure so I can only release semaphore after feeding element further (or canceling it).

I need to come back to this code after I clear my head but in the meantime I'm opening PR so you could see how is my progress. :)

@codecov

This comment has been minimized.

codecov bot commented Jan 22, 2018

Codecov Report

Merging #558 into master will decrease coverage by 0.01%.
The diff coverage is 87.95%.

@@            Coverage Diff             @@
##           master     #558      +/-   ##
==========================================
- Coverage   90.34%   90.32%   -0.02%     
==========================================
  Files         414      415       +1     
  Lines       11691    11774      +83     
  Branches     2154     2149       -5     
==========================================
+ Hits        10562    10635      +73     
- Misses       1129     1139      +10
@alexandru

This comment has been minimized.

Member

alexandru commented Jan 30, 2018

Sorry for not reviewing this @Avasil, this is a difficult one.

@Avasil

This comment has been minimized.

Collaborator

Avasil commented Jan 31, 2018

@alexandru
No worries, I don't expect it going through very quickly, there is probably a lot of work ahead and it is important to get this right.

I will come back to it during/after weekend to fix one bug I mentioned (should be easy to fix) and whatever else I find.

@Avasil Avasil changed the title from WIP: Add Observable.mapParallelOrdered to Add Observable.mapParallelOrdered Feb 22, 2018

@alexandru

Sorry for procrastinating for so long on this one @Avasil.

This implementation is a little complicated, consider that you're using at the same time:

  1. AsyncSemaphore
  2. BufferedSubscriber
  3. ConcurrentLinkedQueue
  4. ReentrantLock

The logic isn't necessarily incorrect, seems to be doing the right thing, but I hope you don't mind if I procrastinate some more 🙂

Btw, this operation is one of those issues where if you come up with an original implementation that has reasonable performance, you could probably write a successful academic paper about it 🙂

package monix.reactive.internal.operators
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.locks.ReentrantLock

This comment has been minimized.

@alexandru

alexandru Mar 17, 2018

Member

You're using java.util.concurrent.ConcurrentLinkedQueue and java.util.concurrent.locks.ReentrantLock in code that is shared between the JVM and JS.

I'm wondering why the tests passed. Maybe Scala.js has dummies for them. But it's not a good idea.

This comment has been minimized.

@Avasil

Avasil Mar 17, 2018

Collaborator

I'm not too familiar with Scala.js - should I only rely on monix primitives / implement something new if needed?

This comment has been minimized.

@alexandru

alexandru Mar 17, 2018

Member

For ConcurrentLinkedQueue I'm pretty sure we have an alias or wrapper somewhere in monix.execution that you can use.

In the case of ReentrantLock, I have no idea what the Scala.js version is doing. Should be a dummy. I guess that if the tests pass, then it's not bad. Was surprised to see it in shared code, guess Scala.js has evolved its Java library.

So you can leave it, but was just mentioning that in case you introduce it in shared code, then you need to understand what the Scala.js implementation is doing, to verify that it's OK.

This comment has been minimized.

@Avasil

Avasil Apr 27, 2018

Collaborator

Leaving note about Scala.JS:

Reentrant lock:
https://github.com/scala-js/scala-js/blob/master/javalib/src/main/scala/java/util/concurrent/locks/ReentrantLock.scala

So it just returns true for tryLock every time, sounds safe to use.

ConcurrentLinkedQueue:
https://github.com/scala-js/scala-js/blob/master/javalib/src/main/scala/java/util/concurrent/ConcurrentLinkedQueue.scala

This is a bit more complicated but glancing through the code it looks like ordinary Queue.
I've found Monix's ConcurrentQueue but it's private[buffer].

BTW @alexandru

I'm trying to see how I can "merge" or remove any of those structures:

  1. AsyncSemaphore - I need to limit incoming tasks.
  2. BufferedSubscriber - I call subscriber.onNext concurrently.
  3. ConcurrentLinkedQueue - order elements, need synchronized access because when one thread is checking Queue for head completion, new elements can arrive and they should be added to the queue.
  4. ReentrantLock - I use it to ensure that only one thread at the time checks Queue, I only use tryLock so it shouldn't be big overhead

I'm trying to figure out if I can get rid of BufferedSubscriber since I call onNext only from one place but no luck so far - it's failing few tests.

Other than that I have only desparate idea which is implementing custom semi-bounded thread-safe Queue which could track completion of its items and only allow up to N not-completed elements. Then backpressuring on it instead of AsyncSemaphore :D But even if that makes any sense I don't think I have capacity to do it soon and we probably want this operator in Monix 3.0.0 - a lot of people are requesting this.

Show resolved Hide resolved ...ala/monix/reactive/internal/operators/MapParallelOrderedObservable.scala
@Avasil

This comment has been minimized.

Collaborator

Avasil commented Mar 17, 2018

Thanks for some comments @alexandru (and @wogan!) - I also need to come back to this with fresh head and see how can I simplify it

@alexandru

This comment has been minimized.

Member

alexandru commented Sep 25, 2018

I've been delaying this PR for months.

@Avasil please merge with master and re-test. Do you feel confident in the implementation?

I will take it for a spin, but I'd appreciate a re-review from you, maybe you get some fresh ideas.

@alexandru alexandru added this to the 3.0.0-RC2 milestone Sep 25, 2018

@Avasil

This comment has been minimized.

Collaborator

Avasil commented Sep 25, 2018

@alexandru
I will do it in the next couple of days, update and let you know. :)

@alexandru

This comment has been minimized.

Member

alexandru commented Sep 25, 2018

👍 sounds good

Avasil added some commits Oct 5, 2018

Merge branch 'master' into Observable#mapParallelOrdered
# Conflicts:
#	monix-reactive/shared/src/main/scala/monix/reactive/Observable.scala
@Avasil

This comment has been minimized.

Collaborator

Avasil commented Oct 6, 2018

@alexandru
I've merged to the latest changes and added more comments, hopefully it will make review easier.
Unfortunately I didn't manage to come up with better design. Though I feel like this one should work. :) The only improvement I'm trying to get working is getting rid of BufferedSubscriber.

Most of the stuff is the same as unordered version, the difference is sendDownstreamOrdered with while loop and queue. I'll benchmark it to see if it's not 10x times slower than unordered variant but review in the meantime would be much appreciated!

@alexandru

This comment has been minimized.

Member

alexandru commented Oct 31, 2018

We can merge it for the next release I think.

But please re-merge master.

Avasil added some commits Nov 1, 2018

Merge branch 'master' into Observable#mapParallelOrdered
# Conflicts:
#	monix-execution/shared/src/main/scala/monix/execution/misc/AsyncQueue.scala
@Avasil

This comment has been minimized.

Collaborator

Avasil commented Nov 1, 2018

Thanks @alexandru , I've re-merged with master. I was playing with getting rid of BufferedSubscriber in the implementation, I think it's doable but I didn't get anywhere at the time. Fortunately tuning internals later should be possible without breaking compatibility. :)

@alexandru

This comment has been minimized.

Member

alexandru commented Nov 1, 2018

Indeed, we can tune the internals later, at this point it should just be correct.

@alexandru alexandru merged commit 0466b84 into monix:master Nov 1, 2018

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment