Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConcurrentChannel, Iterant.channel, Iterant#consume #778

Merged
merged 32 commits into from Nov 21, 2018

Conversation

Projects
None yet
1 participant
@alexandru
Copy link
Member

commented Nov 17, 2018

Summary of changes:

  1. add monix.catnap.ConcurrentChannel
  2. optimize monix.catnap.ConcurrentQueue and monix.catnap.AsyncQueue
  3. add Iterant.channel and Iterant#consume

I'm really happy to present monix.catnap.ConcurrentChannel, a data type for modelling complex producer-consumer communication pipelines, that is lower level than a streaming data type, but more flexible in the configurations it allows:

final class ConcurrentChannel[F[_], E, A](...)(implicit F: Async[F], timer: Timer[F]) 
  extends ProducerF[F, E, A] with ChannelF[F, E, A] {

  def push(a: A): F[Boolean]

  def pushMany(seq: Iterable[A]): F[Boolean]

  def halt(e: E): F[Unit]

  def consume: Resource[F, ConsumerF[F, E, A]]

  def consumeWithConfig(config: ConsumerF.Config): Resource[F, ConsumerF[F, E, A]]

  def awaitConsumers(n: Int): F[Boolean]
}

The ProducerF interface represents the write side of the channel:

trait ProducerF[F[_], E, A] extends Serializable {
  def push(a: A): F[Boolean]

  def pushMany(seq: Iterable[A]): F[Boolean]

  def halt(e: E): F[Unit]

  def awaitConsumers(n: Int): F[Boolean]
}

And the ConsumerF is the read side of the channel:

trait ConsumerF[F[_], E, A] extends Serializable {
  def pull: F[Either[E, A]]

  def pullMany(minLength: Int, maxLength: Int): F[Either[E, Seq[A]]]
}

object ConsumerF {

  final case class Config(
    capacity: Option[BufferCapacity] = None,
    consumerType: Option[ChannelType.ConsumerSide] = None,
    padding: Option[PaddingStrategy] = None
  )
}

And there's also a ChannelF interface describing things that can be consumed, like ConcurrentChannel or Iterant.toChannel:

trait ChannelF[F[_], E, A] extends Serializable {  

  def consume: Resource[F, ConsumerF[F, E, A]]

  @UnsafeProtocol
  def consumeWithConfig(config: ConsumerF.Config): Resource[F, ConsumerF[F, E, A]]
}

If configured properly, or by default, it can be safe (aka thread-safe) to:

  • push events from multiple producers, from multiple threads, concurrently
  • pull events from multiple workers, sharing the same ConsumerF, from multiple threads, concurrently

The same configuration options exist as for ConcurrentQueue, for performance reasons. Thus:

  • the write side can be configured to be either MultiProducer or SingleProducer
  • the read side (on consume) can be configured to be either MultiConsumer or SingleConsumer

In terms of performance, the ConcurrentChannel kicks ass, just like ConcurrentQueue.

Versus ConcurrentQueue

The ConcurrentChannel resembles ConcurrentQueue, however it also handles these concerns:

  1. has the ability to signal a final event, via halt, to all current and future consumers, closing the channel and disallowing other events afterwards
  2. has the ability to do broadcasting / multicasting, by being able to connect multiple consumers, via multiple calls to consume, that will receive the same messages; note the channel doesn't have a shared queue, rather each call to consume will create a ConsumerF with its own queue

These 2 abilities are really powerful, and ConcurrentChannel can in fact act as a "Subject" for Iterant, as in a push-based abstraction that we can use to build Iterant streams, see below.

Iterant

This channel opens up a lot of possibilities for enhancing Iterant. And thus we now have:

sealed abstract class Iterant[F[_], A] {
  // ...

  def pushToChannel(channel: Producer[F, A])(implicit F: Sync[F]): F[Unit]

  def consume(implicit F: Concurrent[F], cs: ContextShift[F]): Resource[F, Consumer[F, A]]

  def consumeWithConfig(config: ConsumerF.Config)
    (implicit F: Concurrent[F], cs: ContextShift[F]): Resource[F, Consumer[F, A]] 
}

object Iterant {
  // Handy alias
  type Consumer[F[_], A] = monix.catnap.ConsumerF[F, Option[Throwable], A]
  // Handy alias
  type Producer[F[_], A] = monix.catnap.ProducerF[F, Option[Throwable], A]
  // Handy alias
  type Channel[F[_], A] = monix.catnap.ChannelF[F, Option[Throwable], A]

  def fromConsumer[F[_], A](consumer: Consumer[F, A], maxBatchSize: Int = 256)
    (implicit F: Async[F]): Iterant[F, A]

  def fromResource[F[_], A](r: Resource[F, A])(implicit F: Sync[F]): Iterant[F, A]

  def fromChannel[F[_], A](
    channel: Channel[F, A],
    bufferCapacity: BufferCapacity = Bounded(256),
    maxBatchSize: Int = 256)
    (implicit F: Async[F]): Iterant[F, A]

  def channel[F[_], A](
    bufferCapacity: BufferCapacity = Bounded(256),
    maxBatchSize: Int = 256,
    producerType: ChannelType.ProducerSide = MultiProducer)
    (implicit F: Concurrent[F], cs: ContextShift[F]): F[(Producer[F, A], Iterant[F, A])]
}

Or in other words:

  • via Iterant.channel, we can get a push-based ProducerF, connected to an Iterant.
  • given a ConsumerF, we can build an Iterant out of it
  • and via Iterant#consume we can get a ConsumerF value for consuming any Iterant

This basically gives Iterant superpowers 馃槈

Examples

Full example for ConcurrentChannel, for doing async logging:

import cats.implicits._
import cats.effect._
import monix.execution.Scheduler.global
  
// For being able to do IO.start
implicit val cs = global.contextShift[IO]
// We need a `Timer` for this to work
implicit val timer = global.timer[IO]

// Completion event
sealed trait Complete
object Complete extends Complete

def logLines(consumer: ConsumerF[IO, Complete, String], index: Int): IO[Unit] =
  consumer.pull.flatMap {
    case Right(message) =>
      IO(println("Worker $$index: $$message"))
        // continue loop
        .flatMap(_ => logLines(consumer, index))
    case Left(Complete) =>
      IO(println("Worker $$index is done!"))
  }
  
for {
  channel <- ConcurrentChannel[IO].of[Complete, String]
  // Workers 1 & 2, sharing the load between them
  task_1_2 = channel.consume.use { ref =>
    (logLines(ref, 1), logLines(ref, 2)).parSequence_
  }
  consumers_1_2 <- task_1_2.start // fiber
  // Workers 3 & 4, receiving the same events as workers 1 & 2,
  // but sharing the load between them
  task_3_4 = channel.consume.use { ref =>
    (logLines(ref, 3), logLines(ref, 4)).parSequence_
  }
  consumers_3_4 <- task_3_4.start // fiber
  // Pushing some samples
  _ <- channel.push("Hello, ")
  _ <- channel.push("World!")
  // Signal there are no more events
  _ <- channel.halt(Complete)
  // Await for the completion of the consumers
  _ <- consumers_1_2.join
  _ <- consumers_3_4.join
} yield ()

Example for producing a stream:

import cats.implicits._
import cats.effect.Sync
    
sealed trait Complete
object Complete extends Complete
    
def range[F[_]](from: Int, until: Int, increment: Int = 1)
  (channel: ConcurrentChannel[F, Complete, Int])
  (implicit F: Sync[F]): F[Unit] = {
    
  if (from != until)
    channel.push(from).flatMap {
      case true =>
        range(from + increment, until, increment)(channel)
      case false =>
        F.unit // we need to stop
    }
  else // we're done, close the channel
    channel.halt(Complete)
}

Or via pushMany:

def range[F[_]](from: Int, until: Int, increment: Int = 1)
  (channel: ConcurrentChannel[F, Complete, Int])
  (implicit F: Sync[F]): F[Unit] = {

   channel.pushMany(Range(from, until, increment)).flatMap {
    case true =>
      channel.halt(Complete)
    case false =>
      F.unit // was already halted, do nothing else
  }
}

So lets say we define a fold function that works with ConsumerF:

def foldConsumer[F[_], A, S](c: ConsumerF[F, Option[Throwable], A], seed: S)(f: (S, A) => S)
  (implicit F: Sync[F]): F[S] = {

  c.pull.flatMap {
    case Right(a) => foldConsumer(c, f(seed, a))(f)
    case Left(None) => F.pure(seed)
    case Left(Some(e)) => F.raiseError(e)
  }
}

We could now use something like this to consume an Iterant stream:

Iterant[Task].range(0, 10000).consume.use { consumer =>
  foldConsumer(consumer, 0L)(_ + _)
}

And we can also work with an input/output pair, via Iterant.channel:

Iterant[Task].channel[Int].flatMap { case (producer, stream) =>
   val write = for {
     _ <- producer.push(1)
     _ <- producer.push(2)
     _ <- producer.push(3)
     _ <- producer.halt(None)
   } yield ()

  for {
    // Writing to channel
    _   <- (producer.awaitConsumers(1) *> write).start
    // Reading from stream (should see the events published to channel)
    sum <- stream.foldLeftL(0L)(_ + _)
  } yield {
     sum
  }
}

alexandru added some commits Nov 14, 2018

@alexandru alexandru added this to the 3.0.0 milestone Nov 17, 2018

alexandru added some commits Nov 17, 2018

@codecov

This comment has been minimized.

Copy link

commented Nov 18, 2018

Codecov Report

Merging #778 into master will decrease coverage by 0.03%.
The diff coverage is 83.59%.

@@            Coverage Diff             @@
##           master     #778      +/-   ##
==========================================
- Coverage   90.49%   90.46%   -0.04%     
==========================================
  Files         419      426       +7     
  Lines       11908    12308     +400     
  Branches     2183     2264      +81     
==========================================
+ Hits        10776    11134     +358     
- Misses       1132     1174      +42

alexandru added some commits Nov 18, 2018

alexandru added some commits Nov 21, 2018

@alexandru alexandru merged commit cdd3406 into monix:master Nov 21, 2018

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can鈥檛 perform that action at this time.