Skip to content
Permalink
Browse files

Add cats.Parallel instance for Future

  • Loading branch information...
felixbr authored and travisbrown committed Jul 17, 2019
1 parent 22c62ba commit c617540e0f66ac65c91d423e2107c9cb10f7923f
@@ -1,8 +1,21 @@
package io.catbird.util

import cats.{ CoflatMap, Comonad, Eq, MonadError, Monoid, Semigroup }
import cats.{
Applicative,
CoflatMap,
CommutativeApplicative,
Comonad,
Eq,
Monad,
MonadError,
Monoid,
Parallel,
Semigroup,
~>
}
import com.twitter.util.{ Await, Duration, Future, Try }
import java.lang.Throwable

import scala.Boolean
import scala.util.{ Either, Left, Right }

@@ -43,9 +56,22 @@ trait FutureInstances extends FutureInstances1 {

final def futureEqWithFailure[A](atMost: Duration)(implicit A: Eq[A], T: Eq[Throwable]): Eq[Future[A]] =
Eq.by[Future[A], Future[Try[A]]](_.liftToTry)(futureEq[Try[A]](atMost))

implicit final val twitterFutureParallelInstance: Parallel[Future, FuturePar] =
new Parallel[Future, FuturePar] {
final override val applicative: Applicative[FuturePar] =
futureParCommutativeApplicative

final override val monad: Monad[Future] =
twitterFutureInstance

final override val sequential: FuturePar ~> Future = λ[FuturePar ~> Future](FuturePar.unwrap(_))

final override val parallel: Future ~> FuturePar = λ[Future ~> FuturePar](FuturePar(_))
}
}

private[util] trait FutureInstances1 {
private[util] trait FutureInstances1 extends FutureParallelNewtype {
final def futureComonad(atMost: Duration): Comonad[Future] =
new FutureCoflatMap with Comonad[Future] {
final def extract[A](x: Future[A]): A = Await.result(x, atMost)
@@ -58,6 +84,46 @@ private[util] trait FutureInstances1 {
}
}

private[util] trait FutureParallelNewtype {

type FuturePar[+A] = FuturePar.Type[A]

object FuturePar extends internal.Newtype1[Future]

implicit final val futureParCommutativeApplicative: CommutativeApplicative[FuturePar] =
new CommutativeApplicative[FuturePar] {
import FuturePar.{ unwrap, apply => par }

final override def pure[A](x: A): FuturePar[A] =
par(Future.value(x))
final override def map2[A, B, Z](fa: FuturePar[A], fb: FuturePar[B])(f: (A, B) => Z): FuturePar[Z] =
par(Future.join(unwrap(fa), unwrap(fb)).map(f.tupled)) // Future.join runs futures in parallel
final override def ap[A, B](ff: FuturePar[A => B])(fa: FuturePar[A]): FuturePar[B] =
map2(ff, fa)(_(_))
final override def product[A, B](fa: FuturePar[A], fb: FuturePar[B]): FuturePar[(A, B)] =
map2(fa, fb)((_, _))
final override def map[A, B](fa: FuturePar[A])(f: A => B): FuturePar[B] =
par(unwrap(fa).map(f))
final override def unit: FuturePar[scala.Unit] =
par(Future.Unit)
}

final def futureParEq[A](atMost: Duration)(implicit A: Eq[A]): Eq[FuturePar[A]] = new Eq[FuturePar[A]] {
import FuturePar.unwrap

final override def eqv(x: FuturePar[A], y: FuturePar[A]): Boolean =
futureEq(atMost).eqv(unwrap(x), unwrap(y))
}

final def futureParEqWithFailure[A](atMost: Duration)(implicit A: Eq[A], T: Eq[Throwable]): Eq[FuturePar[A]] =
new Eq[FuturePar[A]] {
import FuturePar.unwrap

final override def eqv(x: FuturePar[A], y: FuturePar[A]): Boolean =
futureEqWithFailure(atMost)(A, T).eqv(unwrap(x), unwrap(y))
}
}

private[util] sealed abstract class FutureCoflatMap extends CoflatMap[Future] {
final def coflatMap[A, B](fa: Future[A])(f: Future[A] => B): Future[B] = Future(f(fa))
}
@@ -0,0 +1,24 @@
package io.catbird.util.internal

/** INTERNAL API — Newtype encoding for types with one type parameter.
*
* The `Newtype1` abstract class indirection is needed for Scala 2.10,
* otherwise we could just define these types straight on the
* companion object. In Scala 2.10 definining these types
* straight on the companion object yields an error like:
* ''"only classes can have declared but undefined members"''.
*
* Inspired by
* [[https://github.com/alexknvl/newtypes alexknvl/newtypes]].
*/
private[util] abstract class Newtype1[F[_]] { self =>
type Base
trait Tag extends scala.Any
type Type[+A] <: Base with Tag

def apply[A](fa: F[A]): Type[A] =
fa.asInstanceOf[Type[A]]

def unwrap[A](fa: Type[A]): F[A] =
fa.asInstanceOf[F[A]]
}
@@ -10,6 +10,7 @@ import cats.laws.discipline._
import cats.laws.discipline.arbitrary._
import com.twitter.conversions.DurationOps._
import com.twitter.util.Future
import org.scalacheck.Arbitrary
import org.scalatest.FunSuite
import org.typelevel.discipline.scalatest.Discipline

@@ -21,10 +22,16 @@ class FutureSuite extends FunSuite with Discipline with FutureInstances with Arb
implicit val eqFutureEitherUnit: Eq[Future[Either[Throwable, Unit]]] = futureEqWithFailure(1.second)
implicit val eqFutureEitherInt: Eq[Future[Either[Throwable, Int]]] = futureEqWithFailure(1.second)
implicit val comonad: Comonad[Future] = futureComonad(1.second)
implicit val eqFutureParInt: Eq[FuturePar[Int]] = futureParEqWithFailure(1.second)
implicit val eqFutureParInt3: Eq[FuturePar[(Int, Int, Int)]] = futureParEqWithFailure(1.second)
implicit def arbFuturePar[A](implicit A: Arbitrary[A]): Arbitrary[FuturePar[A]] =
Arbitrary(A.arbitrary.map(value => FuturePar(Future.value(value))))

checkAll("Future[Int]", MonadErrorTests[Future, Throwable].monadError[Int, Int, Int])
checkAll("Future[Int]", ComonadTests[Future].comonad[Int, Int, Int])
checkAll("Future[Int]", FunctorTests[Future](comonad).functor[Int, Int, Int])
checkAll("Future[Int]", SemigroupTests[Future[Int]](twitterFutureSemigroup[Int]).semigroup)
checkAll("Future[Int]", MonoidTests[Future[Int]].monoid)
checkAll("Future[Int]", ParallelTests[Future, FuturePar].parallel[Int, Int])
checkAll("FuturePar[Int]", CommutativeApplicativeTests[FuturePar].commutativeApplicative[Int, Int, Int])
}

0 comments on commit c617540

Please sign in to comment.
You can’t perform that action at this time.