Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce the Consumer type #184

Merged
merged 4 commits into from Jun 29, 2016

Conversation

Projects
None yet
3 participants
@alexandru
Copy link
Member

commented Jun 27, 2016

There are 2 API-breaking changes:

  • SyncObserver moves to Observer.Sync (trait becomes part of the Observer companion)
  • SyncSubscriber moves to Subscriber.Sync (trait becomes part of the Subscriber companion)
  • types with the deprecation warning will be provided

But the major addition is to introduce the Consumer type. The Consumer type is a factory of subscribers. The problems we are trying to solve:

  1. Observer (and Subscriber) instances are stateful and error prone - for example, by contract, with an observer instance you can subscribe to a single data-source
  2. there's no standard way to describe observers that will produce a final result - sure, with observables we could foldLeft and then runAsyncGetFirst or something along those lines, but those operators are usually meant for pure functions and there is no standard way to describe consumers that consume some streams and then finally produce a result - like for example a consumer that writes into a NIO async file channel and then when the stream is finished to return some statistics, like the number of bytes written to disk
  3. and you can't compose these subscribers very well - for example if you have a subscriber instance you can't feed it into something that can build a load-balancer that processes things in parallel

Meet the Consumer. The Consumer is a factory of Subscriber instances with benefits:

// For processing sums in parallel, useless of course, but can become 
// really helpful for logic sprinkled with I/O bound stuff
val consumer = Consumer
  .loadBalance(parallelism=10, Consumer.foldLeft(Coeval(0L))(_ + _))
  .map(_.sum)

val observable: Observable[Int] = ???
// Our consumer turns our observable into a Task processing sums, w00t!
val task: Task[Long] = observable.runWith(consumer)

// Consume the whole stream and get the result
task.runAsync.foreach(println)

alexandru added some commits Jun 27, 2016

Fix Consumer.loadBalance, move SyncObserver into Observer.Sync, SyncS…
…ubscriber into Subscriber.Sync, add tests

@alexandru alexandru changed the title Introduce the Consumer type Work-in-Progress: Introduce the Consumer type Jun 27, 2016

@alexandru

This comment has been minimized.

Copy link
Member Author

commented Jun 27, 2016

Guys, the star of this release is Consumer.scala - the other files changed are either tests or the API cosmetic issues and aren't that important. The review is about Consumer.scala.

@aoprisan

This comment has been minimized.

Copy link
Contributor

commented Jun 28, 2016

There is a failing test, not sure why: monix.tckTests.PublisherTest. See travis-ci/push.

@alexandru

This comment has been minimized.

Copy link
Member Author

commented Jun 28, 2016

@aoprisan it's probably just a timeout due to the system being overloaded with stuff being executed in parallel by SBT, the tckTests are a little fragile and I couldn't for the life of me disable parallel execution in SBT, not for a lack of trying. On my localhost if I want to run all the tests I have to do sbt -mem 4096 otherwise it's going to crash with an out-of-memory-exception due to the Scala.js SBT runner being a memory hog.

@aoprisan

This comment has been minimized.

Copy link
Contributor

commented Jun 28, 2016

I had a look on the PR and it looks great. To be honest, I have to revisit it as I am not sure I fully got all that is going on there :).

@alexandru alexandru changed the title Work-in-Progress: Introduce the Consumer type Introduce the Consumer type Jun 29, 2016

@alexandru alexandru merged commit 5baf85c into master Jun 29, 2016

0 of 2 checks passed

continuous-integration/travis-ci/pr The Travis CI build is in progress
Details
continuous-integration/travis-ci/push The Travis CI build is in progress
Details

@alexandru alexandru deleted the consumer branch Jun 29, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.