Skip to content

Commit

Permalink
#3 Implementing Monix Task Demo.
Browse files Browse the repository at this point in the history
  • Loading branch information
gvolpe committed Jun 16, 2017
1 parent 90eb3b7 commit 0253880
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 68 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,19 @@ Stream(message).covary[F] through jsonEncode[F, Person] to publisher

If you want your program to run forever with automatic error recovery you can choose to run your program in a loop that will restart every certain amount of specified time. An useful StreamLoop object that you can use to achieve this is provided by the library.

So, for the program defined above, this would be an example of a resilient app that restarts every 3 seconds in case of failure (only implemented for **cats.effect.IO** for now):
So, for the program defined above, this would be an example of a resilient app that restarts every 3 seconds in case of failure:

```scala
import com.github.gvolpe.fs2rabbit.StreamLoop
import scala.concurrent.duration._

implicit val appR = fs2.Scheduler.fromFixedDaemonPool(2, "restarter")
implicit val appS = IOEffectScheduler // or MonixEffectScheduler if using Monix Task

StreamLoop.run(() => program, 3.seconds)
```

See the [Demo](https://github.com/gvolpe/fs2-rabbit/tree/master/src/main/scala/com/github/gvolpe/fs2rabbit/example/Demo.scala) included in the library for more.
See the [examples](https://github.com/gvolpe/fs2-rabbit/tree/master/examples/src/main/scala/com/github/gvolpe/fs2rabbit/examples) to learn more!

## LICENSE

Expand Down
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ val CoreDependencies: Seq[ModuleID] = Seq(
)

val ExamplesDependencies: Seq[ModuleID] = Seq(
"ch.qos.logback" % "logback-classic" % "1.1.3" % "runtime"
"io.monix" %% "monix" % "3.0.0-7a337f9",
// "org.scalaz" %% "scalaz-concurrent" % "7.2.13",
"ch.qos.logback" % "logback-classic" % "1.1.3" % "runtime"
)

lazy val root = project.in(file("."))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ import scala.concurrent.duration.FiniteDuration
import scala.language.higherKinds

trait EffectScheduler[F[_]] {
def schedule[A](body: F[A], delay: FiniteDuration)(implicit ec: ExecutionContext, s: Scheduler): F[A]
def schedule[A](effect: F[A], delay: FiniteDuration)(implicit ec: ExecutionContext, s: Scheduler): F[A]
def unsafeRunSync(effect: F[Unit]): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ class StreamLoopSpec extends FlatSpecLike with Matchers {
implicit val s = fs2.Scheduler.fromFixedDaemonPool(2, "restarter")

implicit val es = new EffectScheduler[IO] {
override def schedule[A](body: IO[A], delay: FiniteDuration)
override def schedule[A](effect: IO[A], delay: FiniteDuration)
(implicit ec: ExecutionContext, s: Scheduler) = {
IO.async[Unit] { cb => s.scheduleOnce(delay)(cb(Right(()))) }.flatMap(_ => body)
IO.async[Unit] { cb => s.scheduleOnce(delay)(cb(Right(()))) }.flatMap(_ => effect)
}
override def unsafeRunSync(effect: IO[Unit]) = effect.unsafeRunSync()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.github.gvolpe.fs2rabbit.examples

import cats.effect.Effect
import com.github.gvolpe.fs2rabbit.Fs2Rabbit.{createAckerConsumer, createConnectionChannel, createPublisher, declareQueue}
import com.github.gvolpe.fs2rabbit.Fs2Utils.asyncF
import com.github.gvolpe.fs2rabbit.{EffectScheduler, StreamLoop}
import com.github.gvolpe.fs2rabbit.json.Fs2JsonEncoder.jsonEncode
import com.github.gvolpe.fs2rabbit.model._
import fs2.{Pipe, Stream}

import scala.concurrent.ExecutionContext
import scala.language.higherKinds

class GenericDemo[F[_]](implicit F: Effect[F], ES: EffectScheduler[F]) {

implicit val appS = scala.concurrent.ExecutionContext.Implicits.global
implicit val appR = fs2.Scheduler.fromFixedDaemonPool(2, "restarter")

val queueName: QueueName = "test"
val routingKey: RoutingKey = "test"

def logPipe: Pipe[F, AmqpEnvelope, AckResult] = { streamMsg =>
for {
amqpMsg <- streamMsg
_ <- asyncF[F, Unit](println(s"Consumed: $amqpMsg"))
} yield Ack(amqpMsg.deliveryTag)
}

val program = () => for {
connAndChannel <- createConnectionChannel[F]()
(_, channel) = connAndChannel
_ <- declareQueue[F](channel, queueName)
(acker, consumer) = createAckerConsumer[F](channel, queueName)
publisher = createPublisher[F](channel, "", routingKey)
result <- new Flow(consumer, acker, logPipe, publisher).flow
} yield result

StreamLoop.run(program)

}

class Flow[F[_]](consumer: StreamConsumer[F],
acker: StreamAcker[F],
logger: Pipe[F, AmqpEnvelope, AckResult],
publisher: StreamPublisher[F])
(implicit ec: ExecutionContext, F: Effect[F]) {

import io.circe.generic.auto._

case class Address(number: Int, streetName: String)
case class Person(id: Long, name: String, address: Address)

val simpleMessage = AmqpMessage("Hey!", AmqpProperties(None, None, Map("demoId" -> LongVal(123), "app" -> StringVal("fs2RabbitDemo"))))
val classMessage = AmqpMessage(Person(1L, "Sherlock", Address(212, "Baker St")), AmqpProperties.empty)

val flow: Stream[F, Unit] =
Stream(
Stream(simpleMessage).covary[F] to publisher,
Stream(classMessage).covary[F] through jsonEncode[F, Person] to publisher,
consumer through logger to acker
).join(3)

}
Original file line number Diff line number Diff line change
@@ -1,62 +1,5 @@
package com.github.gvolpe.fs2rabbit.examples

import cats.effect.IO
import com.github.gvolpe.fs2rabbit.Fs2Rabbit._
import com.github.gvolpe.fs2rabbit.Fs2Utils._
import com.github.gvolpe.fs2rabbit.StreamLoop
import com.github.gvolpe.fs2rabbit.model._
import com.github.gvolpe.fs2rabbit.json.Fs2JsonEncoder._
import fs2.{Pipe, Stream}

import scala.concurrent.ExecutionContext

object IODemo extends App {

implicit val appS = scala.concurrent.ExecutionContext.Implicits.global
implicit val appR = fs2.Scheduler.fromFixedDaemonPool(2, "restarter")

val queueName: QueueName = "test"
val routingKey: RoutingKey = "test"

def logPipe: Pipe[IO, AmqpEnvelope, AckResult] = { streamMsg =>
for {
amqpMsg <- streamMsg
_ <- asyncF[IO, Unit](println(s"Consumed: $amqpMsg"))
} yield Ack(amqpMsg.deliveryTag)
}

val program = () => for {
connAndChannel <- createConnectionChannel[IO]()
(_, channel) = connAndChannel
_ <- declareQueue[IO](channel, queueName)
(acker, consumer) = createAckerConsumer[IO](channel, queueName)
publisher = createPublisher[IO](channel, "", routingKey)
result <- new Flow(consumer, acker, logPipe, publisher).flow
} yield result

StreamLoop.run(program)

}

class Flow(consumer: StreamConsumer[IO],
acker: StreamAcker[IO],
logger: Pipe[IO, AmqpEnvelope, AckResult],
publisher: StreamPublisher[IO])
(implicit ec: ExecutionContext) {

import io.circe.generic.auto._

case class Address(number: Int, streetName: String)
case class Person(id: Long, name: String, address: Address)

val simpleMessage = AmqpMessage("Hey!", AmqpProperties(None, None, Map("demoId" -> LongVal(123), "app" -> StringVal("fs2RabbitDemo"))))
val classMessage = AmqpMessage(Person(1L, "Sherlock", Address(212, "Baker St")), AmqpProperties.empty)

val flow: Stream[IO, Unit] =
Stream(
Stream(simpleMessage).covary[IO] to publisher,
Stream(classMessage).covary[IO] through jsonEncode[IO, Person] to publisher,
consumer through logger to acker
).join(3)

}
object IODemo extends GenericDemo[IO] with App
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.github.gvolpe.fs2rabbit.examples

import monix.eval.Task

object MonixTaskDemo extends GenericDemo[Task] with App
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package com.github.gvolpe.fs2rabbit

import com.github.gvolpe.fs2rabbit.examples.scheduler.IOEffectScheduler
import com.github.gvolpe.fs2rabbit.examples.scheduler.{IOEffectScheduler, MonixEffectScheduler}

package object examples {

implicit val iOEffectScheduler = IOEffectScheduler
implicit val iOEffectScheduler = IOEffectScheduler
implicit val monixEffectScheduler = MonixEffectScheduler

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import scala.concurrent.duration.FiniteDuration

object IOEffectScheduler extends EffectScheduler[IO] {

override def schedule[A](body: IO[A], delay: FiniteDuration)
override def schedule[A](effect: IO[A], delay: FiniteDuration)
(implicit ec: ExecutionContext, s: Scheduler) = {
IO.async[Unit] { cb => s.scheduleOnce(delay)(cb(Right(()))) }.flatMap(_ => body)
IO.async[Unit] { cb => s.scheduleOnce(delay)(cb(Right(()))) }.flatMap(_ => effect)
}

override def unsafeRunSync(effect: IO[Unit]) = effect.unsafeRunSync()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.github.gvolpe.fs2rabbit.examples.scheduler

import com.github.gvolpe.fs2rabbit.EffectScheduler
import fs2.Scheduler
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global

import scala.concurrent.{Await, ExecutionContext}
import scala.concurrent.duration._

object MonixEffectScheduler extends EffectScheduler[Task] {

override def schedule[A](effect: Task[A], delay: FiniteDuration)
(implicit ec: ExecutionContext, s: Scheduler) = {
effect.delayExecution(delay)
}

override def unsafeRunSync(effect: Task[Unit]) = {
Await.result(effect.runAsync, 1.second)
}
}

0 comments on commit 0253880

Please sign in to comment.