Skip to content
Permalink
Browse files

Add cats.Parallel instance for Rerunnable

  • Loading branch information...
felixbr authored and travisbrown committed Sep 22, 2019
1 parent 2865dad commit 5bc8a41c025f371f1d443661a7d4b3cf7449c342
@@ -1,9 +1,22 @@
package io.catbird.util

import cats.{ CoflatMap, Comonad, Eq, MonadError, Monoid, Semigroup }
import cats.{
Applicative,
CoflatMap,
CommutativeApplicative,
Comonad,
Eq,
Monad,
MonadError,
Monoid,
Parallel,
Semigroup,
~>
}
import com.twitter.util.{ Await, Duration, Future, FuturePool, Return, Throw, Try }
import java.lang.Throwable
import scala.Unit

import scala.{ Boolean, Unit }
import scala.annotation.tailrec
import scala.util.{ Either, Left, Right }

@@ -129,14 +142,31 @@ final object Rerunnable extends RerunnableInstances1 {
final val empty: Rerunnable[A] = Rerunnable.rerunnableInstance.pure(A.empty)
}

implicit final val rerunnableParallelInstance: Parallel.Aux[Rerunnable, Rerunnable.Par] =
new Parallel[Rerunnable] {
type F[x] = Rerunnable.Par[x]

final override val applicative: Applicative[Rerunnable.Par] =
rerunnableParCommutativeApplicative

final override val monad: Monad[Rerunnable] =
rerunnableInstance

final override val sequential: Rerunnable.Par ~> Rerunnable =
λ[Rerunnable.Par ~> Rerunnable](Rerunnable.Par.unwrap(_))

final override val parallel: Rerunnable ~> Rerunnable.Par =
λ[Rerunnable ~> Rerunnable.Par](Rerunnable.Par(_))
}

final def rerunnableEq[A](atMost: Duration)(implicit A: Eq[A]): Eq[Rerunnable[A]] =
Eq.by[Rerunnable[A], Future[A]](_.run)(futureEq[A](atMost))

final def rerunnableEqWithFailure[A](atMost: Duration)(implicit A: Eq[A], T: Eq[Throwable]): Eq[Rerunnable[A]] =
Eq.by[Rerunnable[A], Future[A]](_.run)(futureEqWithFailure[A](atMost))
}

private[util] trait RerunnableInstances1 {
private[util] trait RerunnableInstances1 extends RerunnableParallelNewtype {
final def rerunnableComonad(atMost: Duration): Comonad[Rerunnable] =
new RerunnableCoflatMap with Comonad[Rerunnable] {
final def extract[A](x: Rerunnable[A]): A = Await.result(x.run, atMost)
@@ -147,6 +177,58 @@ private[util] trait RerunnableInstances1 {
new RerunnableSemigroup[A]
}

private[util] trait RerunnableParallelNewtype {

type Par[+A] = Par.Type[A]

object Par {
type Type[+A] = Rerunnable[A]

def apply[A](fa: Rerunnable[A]): Type[A] =
fa.asInstanceOf[Type[A]]

def unwrap[A](fa: Type[A]): Rerunnable[A] =
fa.asInstanceOf[Rerunnable[A]]
}

implicit final val rerunnableParCommutativeApplicative: CommutativeApplicative[Par] =
new CommutativeApplicative[Par] {
import Par.unwrap

final override def pure[A](x: A): Rerunnable.Par[A] =
Par(Rerunnable.const(x))
final override def map2[A, B, Z](
fa: Rerunnable.Par[A],
fb: Rerunnable.Par[B]
)(f: (A, B) => Z): Rerunnable.Par[Z] =
// Future.join runs futures in parallel
Par(Rerunnable.fromFuture(Future.join(unwrap(fa).run, unwrap(fb).run)).map(f.tupled))
final override def ap[A, B](ff: Rerunnable.Par[A => B])(fa: Rerunnable.Par[A]): Rerunnable.Par[B] =
map2(ff, fa)(_(_))
final override def product[A, B](fa: Rerunnable.Par[A], fb: Rerunnable.Par[B]): Rerunnable.Par[(A, B)] =
map2(fa, fb)((_, _))
final override def map[A, B](fa: Rerunnable.Par[A])(f: A => B): Rerunnable.Par[B] =
Par(unwrap(fa).map(f))
final override def unit: Rerunnable.Par[scala.Unit] =
Par(Rerunnable.Unit)
}

final def rerunnableParEq[A](atMost: Duration)(implicit A: Eq[A]): Eq[Par[A]] = new Eq[Par[A]] {
import Par.unwrap

final override def eqv(x: Par[A], y: Par[A]): Boolean =
Rerunnable.rerunnableEq(atMost).eqv(unwrap(x), unwrap(y))
}

final def rerunnableParEqWithFailure[A](atMost: Duration)(implicit A: Eq[A], T: Eq[Throwable]): Eq[Par[A]] =
new Eq[Par[A]] {
import Par.unwrap

override def eqv(x: Par[A], y: Par[A]): Boolean =
Rerunnable.rerunnableEqWithFailure(atMost)(A, T).eqv(unwrap(x), unwrap(y))
}
}

private[util] sealed trait RerunnableCoflatMap extends CoflatMap[Rerunnable] {
final def coflatMap[A, B](fa: Rerunnable[A])(f: Rerunnable[A] => B): Rerunnable[B] =
new Rerunnable[B] {
@@ -9,16 +9,26 @@ import cats.laws.discipline._
import cats.laws.discipline.arbitrary._
import cats.{ Comonad, Eq }
import com.twitter.conversions.DurationOps._
import org.scalacheck.Arbitrary
import org.scalatest.funsuite.AnyFunSuite
import org.typelevel.discipline.scalatest.Discipline

class RerunnableSuite extends AnyFunSuite with Discipline with ArbitraryInstances with EqInstances {
implicit def rerunnableEq[A](implicit A: Eq[A]): Eq[Rerunnable[A]] =
Rerunnable.rerunnableEqWithFailure[A](1.second)
implicit val rerunnableComonad: Comonad[Rerunnable] = Rerunnable.rerunnableComonad(1.second)
implicit val rerunnableComonad: Comonad[Rerunnable] =
Rerunnable.rerunnableComonad(1.second)
implicit val rerunnableParEqInt: Eq[Rerunnable.Par[Int]] =
Rerunnable.rerunnableParEqWithFailure(1.second)
implicit val rerunnableParEqInt3: Eq[Rerunnable.Par[(Int, Int, Int)]] =
Rerunnable.rerunnableParEqWithFailure(1.second)
implicit def rerunnableParArbitrary[A](implicit A: Arbitrary[A]): Arbitrary[Rerunnable.Par[A]] =
Arbitrary(A.arbitrary.map(value => Rerunnable.Par(Rerunnable.const(value))))

checkAll("Rerunnable[Int]", MonadErrorTests[Rerunnable, Throwable].monadError[Int, Int, Int])
checkAll("Rerunnable[Int]", ComonadTests[Rerunnable].comonad[Int, Int, Int])
checkAll("Rerunnable[Int]", FunctorTests[Rerunnable](rerunnableComonad).functor[Int, Int, Int])
checkAll("Rerunnable[Int]", MonoidTests[Rerunnable[Int]].monoid)
checkAll("Rerunnable[Int]", ParallelTests[Rerunnable, Rerunnable.Par].parallel[Int, Int])
checkAll("Rerunnable.Par[Int]", CommutativeApplicativeTests[Rerunnable.Par].commutativeApplicative[Int, Int, Int])
}

0 comments on commit 5bc8a41

Please sign in to comment.
You can’t perform that action at this time.