Skip to content

Commit

Permalink
Merge pull request #247 from BenFradet/java_future
Browse files Browse the repository at this point in the history
JavaFuture <=> TwitterFuture bijection
  • Loading branch information
johnynek committed May 14, 2016
2 parents 2019fcd + b41f652 commit 029e439
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
Copyright 2012 Twitter, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.twitter.bijection.twitter_util

import java.util.concurrent.{ Future => JFuture }

import com.twitter.util._

/**
* Base class for converting java futures to twitter futures
*/
abstract class JavaFutureConverter {
def apply[T](javaFuture: JFuture[T]): Future[T]
}

/**
* Converter based on the specified <code>futurePool</code> which will create one thread per
* future possibly limited by the maximum size of the pool.
* To favor if there aren't too many futures to convert and one cares about latency.
*
* @param futurePool future pool used to retrieve the result of every future
* @param mayInterruptIfRunning whether or not the initial java future can be interrupted if it's
* running
*/
class FuturePoolJavaFutureConverter(
futurePool: FuturePool,
mayInterruptIfRunning: Boolean
) extends JavaFutureConverter {
override def apply[T](javaFuture: JFuture[T]): Future[T] = {
val f = futurePool { javaFuture.get() }
val p = Promise.attached(f)
p.setInterruptHandler { case NonFatal(e) =>
if (p.detach()) {
f.raise(e)
javaFuture.cancel(mayInterruptIfRunning)
p.setException(e)
}
}
p
}
}

/**
* Converter based on a [[Timer]] which will create a task which will check every
* <code>checkFrequency</code> if the java future is completed, the threading model is the one
* used by the specified <code>timer</code> which is often a thread pool of size 1.
* To favor if there are a lot of futures to convert and one cares less about the latency induced
* by <code>checkFrequency</code>.
* <code>checkFrequency</code> needs to be a multiple of the timer implementation's granularity
* which is often 1ms.
*
* @param timer timer used to schedule a task which will check if the java future is done
* @param checkFrequency frequency at which the java future will be checked for completion
* @param mayInterruptIfRunning whether or not the initial java future can be interrupted if it's
* running
*/
class TimerJavaFutureConverter(
timer: Timer,
checkFrequency: Duration,
mayInterruptIfRunning: Boolean
) extends JavaFutureConverter {
override def apply[T](javaFuture: JFuture[T]): Future[T] = {
val p = Promise[T]
lazy val task: TimerTask = timer.schedule(checkFrequency) {
if (javaFuture.isDone) {
p.updateIfEmpty(Try(javaFuture.get()))
task.cancel()
}
}
require(task != null)
p.setInterruptHandler { case NonFatal(e) =>
task.cancel()
p.updateIfEmpty(Throw(e))
javaFuture.cancel(mayInterruptIfRunning)
}
p
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ limitations under the License.

package com.twitter.bijection.twitter_util

import com.twitter.bijection.{ AbstractBijection, Bijection, ImplicitBijection }
import java.util.concurrent.{ Future => JavaFuture }

import com.twitter.bijection._
import com.twitter.io.Buf
import com.twitter.util.{ Future => TwitterFuture, Try => TwitterTry, Promise => TwitterPromise, Return, Throw, FuturePool }

Expand All @@ -40,7 +42,7 @@ trait UtilBijections {
implicit def futureBijection[A, B](implicit bij: ImplicitBijection[A, B]): Bijection[TwitterFuture[A], TwitterFuture[B]] =
new AbstractBijection[TwitterFuture[A], TwitterFuture[B]] {
override def apply(fa: TwitterFuture[A]) = fa.map(bij(_))
override def invert(fb: TwitterFuture[B]) = fb.map(bij.invert(_))
override def invert(fb: TwitterFuture[B]) = fb.map(bij.invert)
}

/**
Expand All @@ -50,7 +52,7 @@ trait UtilBijections {
implicit def futureScalaBijection[A, B](implicit bij: ImplicitBijection[A, B], executor: ExecutionContext): Bijection[ScalaFuture[A], ScalaFuture[B]] =
new AbstractBijection[ScalaFuture[A], ScalaFuture[B]] {
override def apply(fa: ScalaFuture[A]) = fa.map(bij(_))
override def invert(fb: ScalaFuture[B]) = fb.map(bij.invert(_))
override def invert(fb: ScalaFuture[B]) = fb.map(bij.invert)
}

/**
Expand Down Expand Up @@ -78,6 +80,45 @@ trait UtilBijections {
}
}

/**
* Injection from twitter futures to java futures.
* Will throw when inverting back from java future to twitter future if the java future is not
* done.
*/
def twitter2JavaFutureInjection[A]: Injection[TwitterFuture[A], JavaFuture[A]] = {
new AbstractInjection[TwitterFuture[A], JavaFuture[A]] {
override def apply(f: TwitterFuture[A]): JavaFuture[A] =
f.toJavaFuture.asInstanceOf[JavaFuture[A]]

override def invert(f: JavaFuture[A]): ScalaTry[TwitterFuture[A]] =
Inversion.attemptWhen(f)(_.isDone)(jf => TwitterFuture(jf.get()))
}
}

/**
* Bijection between java futures and twitter futures.
* An implicit [[JavaFutureConverter]] is needed, two strategies are available out of the box:
* - [[FuturePoolJavaFutureConverter]] which is based on a [[FuturePool]] and which will
* create one thread per future. To favor if there aren't too many futures to convert and one
* cares about latency.
* - [[TimerJavaFutureConverter]] which is based on a [[com.twitter.util.Timer]] which will
* create a task which will check every <code>checkFrequency</code> if the java future is
* completed, one thread will be used for every conversion. To favor if there are a lot of
* futures to convert and one cares less about the latency induced by
* <code>checkFrequency</code>.
*/
implicit def twitter2JavaFutureBijection[A](
implicit converter: JavaFutureConverter
): Bijection[TwitterFuture[A], JavaFuture[A]] = {
new AbstractBijection[TwitterFuture[A], JavaFuture[A]] {
override def apply(f: TwitterFuture[A]): JavaFuture[A] =
f.toJavaFuture.asInstanceOf[JavaFuture[A]]

override def invert(f: JavaFuture[A]): TwitterFuture[A] =
converter(f)
}
}

/**
* Bijection between twitter and scala style Trys
*/
Expand All @@ -102,7 +143,7 @@ trait UtilBijections {
implicit def tryBijection[A, B](implicit bij: ImplicitBijection[A, B]): Bijection[TwitterTry[A], TwitterTry[B]] =
new AbstractBijection[TwitterTry[A], TwitterTry[B]] {
override def apply(fa: TwitterTry[A]) = fa.map(bij(_))
override def invert(fb: TwitterTry[B]) = fb.map(bij.invert(_))
override def invert(fb: TwitterTry[B]) = fb.map(bij.invert)
}

/**
Expand All @@ -112,7 +153,7 @@ trait UtilBijections {
implicit def tryScalaBijection[A, B](implicit bij: ImplicitBijection[A, B]): Bijection[ScalaTry[A], ScalaTry[B]] =
new AbstractBijection[ScalaTry[A], ScalaTry[B]] {
override def apply(fa: ScalaTry[A]) = fa.map(bij(_))
override def invert(fb: ScalaTry[B]) = fb.map(bij.invert(_))
override def invert(fb: ScalaTry[B]) = fb.map(bij.invert)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,79 +16,109 @@ limitations under the License.

package com.twitter.bijection.twitter_util

import com.twitter.bijection.{ CheckProperties, BaseProperties, Bijection }
import com.twitter.bijection.{ CheckProperties, BaseProperties }
import com.twitter.io.Buf
import com.twitter.util.{ Future => TwitterFuture, Try => TwitterTry, Await => TwitterAwait }
import com.twitter.util.{ Future => TwitterFuture, Try => TwitterTry, Await => TwitterAwait, FuturePool, JavaTimer }
import java.lang.{ Integer => JInt, Long => JLong }
import java.util.concurrent.{ Future => JavaFuture, Callable, FutureTask }
import org.scalacheck.Arbitrary
import org.scalatest.{ PropSpec, MustMatchers }
import org.scalatest.prop.PropertyChecks
import org.scalatest.BeforeAndAfterAll

import org.scalacheck.Prop.forAll
import scala.concurrent.{ Future => ScalaFuture, Await => ScalaAwait }
import scala.concurrent.duration.Duration
import scala.util.{ Try => ScalaTry }
import scala.concurrent.future
import scala.concurrent.ExecutionContext.Implicits.global

class UtilBijectionLaws extends CheckProperties with BaseProperties {
class UtilBijectionLaws extends CheckProperties with BaseProperties with BeforeAndAfterAll {
import UtilBijections._

protected def toOption[T](f: TwitterFuture[T]): Option[T] = TwitterTry(TwitterAwait.result(f)).toOption
protected def toOption[T](f: TwitterFuture[T]): Option[T] =
TwitterTry(TwitterAwait.result(f)).toOption

protected def toOption[T](f: ScalaFuture[T]): Option[T] = TwitterTry(ScalaAwait.result(f, Duration.Inf)).toOption
protected def toOption[T](f: ScalaFuture[T]): Option[T] =
TwitterTry(ScalaAwait.result(f, Duration.Inf)).toOption

implicit def futureArb[T: Arbitrary] = arbitraryViaFn[T, TwitterFuture[T]] { TwitterFuture.value(_) }
protected def toOption[T](f: JavaFuture[T]): Option[T] =
TwitterTry(f.get()).toOption

implicit def futureArb[T: Arbitrary] = arbitraryViaFn[T, TwitterFuture[T]] { TwitterFuture.value }
implicit def scalaFutureArb[T: Arbitrary] = arbitraryViaFn[T, ScalaFuture[T]] { future(_) }
implicit def javaFutureArb[T: Arbitrary] = arbitraryViaFn[T, JavaFuture[T]] { t =>
val f = new FutureTask[T](new Callable[T] {
override def call(): T = t
})
f.run()
f
}
implicit def tryArb[T: Arbitrary] = arbitraryViaFn[T, TwitterTry[T]] { TwitterTry(_) }
implicit def scalaTryArb[T: Arbitrary] = arbitraryViaFn[T, ScalaTry[T]] { ScalaTry(_) }

implicit val jIntArb = arbitraryViaBijection[Int, JInt]
implicit val jLongArb = arbitraryViaBijection[Long, JLong]
implicit val bufArb: Arbitrary[Buf] = arbitraryViaFn[Array[Byte], Buf](Buf.ByteArray.Owned.apply)

implicit protected def futureEq[T: Equiv]: Equiv[TwitterFuture[T]] = Equiv.fromFunction { (f1, f2) =>
Equiv[Option[T]].equiv(toOption(f1), toOption(f2))
}
implicit protected def futureEq[T: Equiv]: Equiv[TwitterFuture[T]] =
Equiv.fromFunction { (f1, f2) => Equiv[Option[T]].equiv(toOption(f1), toOption(f2)) }

implicit protected def scalaFutureEq[T: Equiv]: Equiv[ScalaFuture[T]] = Equiv.fromFunction { (f1, f2) =>
Equiv[Option[T]].equiv(toOption(f1), toOption(f2))
}
implicit protected def scalaFutureEq[T: Equiv]: Equiv[ScalaFuture[T]] =
Equiv.fromFunction { (f1, f2) => Equiv[Option[T]].equiv(toOption(f1), toOption(f2)) }

implicit protected def javaFutureEq[T: Equiv]: Equiv[JavaFuture[T]] =
Equiv.fromFunction { (f1, f2) => Equiv[Option[T]].equiv(toOption(f1), toOption(f2)) }

type FromMap = Map[Int, Long]
type ToMap = Map[JInt, JLong]

property("round trips com.twitter.util.Future[Map[Int, String]] -> com.twitter.util.Future[JInt, JLong]") {
property("round trips TwitterFuture[Map[Int, String]] <-> Twitter.Future[JInt, JLong]") {
isBijection[TwitterFuture[FromMap], TwitterFuture[ToMap]]
}

property("round trips scala.concurrent.Future[Map[Int, String]] -> scala.concurrent.Future[JInt, JLong]") {
property("round trips ScalaFuture[Map[Int, String]] <-> ScalaFuture[JInt, JLong]") {
isBijection[ScalaFuture[FromMap], ScalaFuture[ToMap]]
}

property("round trips com.twitter.util.Try[Map[Int, String]] -> com.twitter.util.Try[Map[JInt, JLong]]") {
property("round trips TwitterTry[Map[Int, String]] <-> TwitterTry[Map[JInt, JLong]]") {
isBijection[TwitterTry[FromMap], TwitterTry[ToMap]]
}

property("round trips scala.util.Try[Map[Int, String]] -> scala.util.Try[Map[JInt, JLong]]") {
property("round trips ScalaTry[Map[Int, String]] <-> ScalaTry[Map[JInt, JLong]]") {
isBijection[ScalaTry[FromMap], ScalaTry[ToMap]]
}

property("round trips com.twitter.util.Try[Map[JInt, JLong]] -> scala.util.Try[Map[JInt, JLong]]") {
property("round trips TwitterTry[Map[JInt, JLong]] <-> ScalaTry[Map[JInt, JLong]]") {
isBijection[TwitterTry[ToMap], ScalaTry[ToMap]]
}

property("round trips com.twitter.util.Future[Map[JInt, JLong]] -> scala.concurrent.Future[Map[JInt, JLong]]") {
property("round trips TwitterFuture[Map[JInt, JLong]] <-> ScalaFuture[Map[JInt, JLong]]") {
isBijection[TwitterFuture[ToMap], ScalaFuture[ToMap]]
}

property("round trips shared com.twitter.io.Buf -> Array[Byte]") {
property("round trips TwitterFuture[Map[JInt, JLong]] <-> JavaFuture[Map[JInt, JLong]] " +
"using FuturePool") {
implicit val converter = new FuturePoolJavaFutureConverter(FuturePool.unboundedPool, true)
isBijection[TwitterFuture[ToMap], JavaFuture[ToMap]]
}

property("round trips TwitterFuture[Map[JInt, JLong]] <-> JavaFuture[Map[JInt, JLong]] " +
"using Timer") {
implicit val converter =
new TimerJavaFutureConverter(new JavaTimer, com.twitter.util.Duration.fromSeconds(1), true)
isBijection[TwitterFuture[ToMap], JavaFuture[ToMap]]
}

property("TwitterFuture[Map[JInt, JLong]] -> JavaFuture[Map[JInt, JLong]]") {
isInjection[TwitterFuture[ToMap], JavaFuture[ToMap]](
futureArb[ToMap], twitter2JavaFutureInjection, javaFutureArb[ToMap], futureEq, javaFutureEq)
}

property("round trips shared com.twitter.io.Buf <-> Array[Byte]") {
import Shared.byteArrayBufBijection

isBijection[Array[Byte], Buf]
}

property("round trips owned com.twitter.io.Buf -> Array[Byte]") {
property("round trips owned com.twitter.io.Buf <-> Array[Byte]") {
import Owned.byteArrayBufBijection

isBijection[Array[Byte], Buf]
Expand Down

0 comments on commit 029e439

Please sign in to comment.