Skip to content

Commit

Permalink
Merge pull request #833 from mijicd/twitter-futures-interop
Browse files Browse the repository at this point in the history
Support Twitter futures
  • Loading branch information
Regis Kuckaertz committed May 9, 2019
2 parents b51fe05 + 709453f commit ce37ecd
Show file tree
Hide file tree
Showing 16 changed files with 192 additions and 55 deletions.
4 changes: 2 additions & 2 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
version = "2.0.0-RC6"
maxColumn = 120
align = most
continuationIndent.defnSite = 2
Expand All @@ -11,5 +12,4 @@ spaces {
}
optIn.annotationNewlines = true

rewrite.rules = [SortImports, RedundantBraces]

rewrite.rules = [SortImports, RedundantBraces]
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
- &scaladoc
stage: microsite
name: "Generate Scaladoc"
script: sbt ++$TRAVIS_SCALA_VERSION coreJVM/doc coreJS/doc interopCatsJVM/doc interopCatsJS/doc interopFutureJVM/doc interopFutureJS/doc interopScalaz7xJVM/doc interopScalaz7xJS/doc interopMonixJVM/doc interopMonixJS/doc interopJavaJVM/doc interopReactiveStreamsJVM/doc
script: sbt ++$TRAVIS_SCALA_VERSION coreJVM/doc coreJS/doc interopCatsJVM/doc interopCatsJS/doc interopFutureJVM/doc interopScalaz7xJVM/doc interopScalaz7xJS/doc interopMonixJVM/doc interopMonixJS/doc interopJavaJVM/doc interopReactiveStreamsJVM/doc interopTwitterJVM/doc
- &microsite
stage: microsite
name: "Generate microsite"
Expand Down
28 changes: 24 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ lazy val root = project
interopCatsJVM,
interopCatsJS,
interopFutureJVM,
// interopMonixJVM,
// interopMonixJS,
interopMonixJVM,
interopMonixJS,
interopScalaz7xJVM,
interopScalaz7xJS,
interopJavaJVM,
interopReactiveStreamsJVM,
// benchmarks,
interopTwitterJVM,
benchmarks,
testkitJVM,
docs
)
Expand Down Expand Up @@ -225,6 +226,16 @@ lazy val interopReactiveStreams = crossProject(JVMPlatform)

lazy val interopReactiveStreamsJVM = interopReactiveStreams.jvm.dependsOn(interopSharedJVM)

lazy val interopTwitter = crossProject(JSPlatform, JVMPlatform)
.in(file("interop-twitter"))
.settings(stdSettings("zio-interop-twitter"))
.settings(
libraryDependencies += "com.twitter" %% "util-core" % "19.4.0"
)
.dependsOn(core % "test->test;compile->compile")

lazy val interopTwitterJVM = interopTwitter.jvm.dependsOn(interopSharedJVM)

lazy val testkit = crossProject(JVMPlatform)
.in(file("testkit"))
.settings(stdSettings("zio-testkit"))
Expand Down Expand Up @@ -281,5 +292,14 @@ lazy val docs = project.module
"org.jsoup" % "jsoup" % "1.11.3" % "provided"
)
)
.dependsOn(coreJVM, interopCatsJVM, interopFutureJVM, interopScalaz7xJVM, interopJavaJVM, interopReactiveStreamsJVM)
.dependsOn(
coreJVM,
interopCatsJVM,
interopFutureJVM,
interopMonixJVM,
interopScalaz7xJVM,
interopJavaJVM,
interopReactiveStreamsJVM,
interopTwitterJVM
)
.enablePlugins(MdocPlugin, DocusaurusPlugin)
27 changes: 12 additions & 15 deletions core/jvm/src/test/scala/scalaz/zio/IOSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,9 @@ class IOSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends TestRuntim
failure = new Exception("expected")
_ <- IO.fail(failure).when(false)
failed <- IO.fail(failure).when(true).either
} yield
(val1 must_=== 0) and
(val2 must_=== 2) and
(failed must beLeft(failure))
} yield (val1 must_=== 0) and
(val2 must_=== 2) and
(failed must beLeft(failure))
)

def testWhenM =
Expand All @@ -175,12 +174,11 @@ class IOSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends TestRuntim
failure = new Exception("expected")
_ <- IO.fail(failure).whenM(conditionFalse)
failed <- IO.fail(failure).whenM(conditionTrue).either
} yield
(val1 must_=== 0) and
(conditionVal1 must_=== 1) and
(val2 must_=== 2) and
(conditionVal2 must_=== 2) and
(failed must beLeft(failure))
} yield (val1 must_=== 0) and
(conditionVal1 must_=== 1) and
(val2 must_=== 2) and
(conditionVal2 must_=== 2) and
(failed must beLeft(failure))
)

def testUnsandbox = {
Expand Down Expand Up @@ -260,11 +258,10 @@ class IOSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends TestRuntim
both <- (ZIO.halt(Cause.Both(Cause.Interrupt, Cause.die(ex))) <> IO.unit).run
thn <- (ZIO.halt(Cause.Then(Cause.Interrupt, Cause.die(ex))) <> IO.unit).run
fail <- (ZIO.fail(ex) <> IO.unit).run
} yield
(plain must_=== Exit.die(ex))
.and(both must_=== Exit.die(ex))
.and(thn must_=== Exit.die(ex))
.and(fail must_=== Exit.succeed(()))
} yield (plain must_=== Exit.die(ex))
.and(both must_=== Exit.die(ex))
.and(thn must_=== Exit.die(ex))
.and(fail must_=== Exit.succeed(()))
}
}

Expand Down
9 changes: 4 additions & 5 deletions core/jvm/src/test/scala/scalaz/zio/QueueSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -491,11 +491,10 @@ class QueueSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends TestRun
v1 <- queue.take
v2 <- queue.take
v3 <- queue.take
} yield
(v must_=== Range.inclusive(1, 32).toList)
.and(v1 must_=== 33)
.and(v2 must_=== 34)
.and(v3 must_=== 35)
} yield (v must_=== Range.inclusive(1, 32).toList)
.and(v1 must_=== 33)
.and(v2 must_=== 34)
.and(v3 must_=== 35)
)

def e36 =
Expand Down
9 changes: 5 additions & 4 deletions core/jvm/src/test/scala/scalaz/zio/RefMSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,11 @@ class RefMSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends TestRunt
case Changed => IO.succeed("closed" -> Closed)
}
value2 <- refM.get
} yield
(r1 must beTheSameAs("changed")) and (value1 must beTheSameAs(Changed)) and (r2 must beTheSameAs("closed")) and (value2 must beTheSameAs(
Closed
))
} yield (r1 must beTheSameAs("changed")) and (value1 must beTheSameAs(Changed)) and (r2 must beTheSameAs(
"closed"
)) and (value2 must beTheSameAs(
Closed
))
)

def e11 =
Expand Down
12 changes: 5 additions & 7 deletions core/jvm/src/test/scala/scalaz/zio/stm/STMSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -473,10 +473,9 @@ final class STMSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends Tes
leftV1 <- STM.succeed(1).orElseEither(STM.succeed("No me!")).commit
leftV2 <- STM.succeed(2).orElseEither(STM.fail("No!")).commit
failedV <- STM.fail(-1).orElseEither(STM.fail(-2)).commit.either
} yield
(rightV must beRight(42)) and (leftV1 must beLeft(1)) and (leftV2 must beLeft(2)) and (failedV must beLeft(
-2
))
} yield (rightV must beRight(42)) and (leftV1 must beLeft(1)) and (leftV2 must beLeft(2)) and (failedV must beLeft(
-2
))
)

def e36 =
Expand All @@ -488,9 +487,8 @@ final class STMSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends Tes
_ <- STM.fail("Error!")
} yield ()).commit.either
v <- tvar.get.commit
} yield
(e must be left "Error!") and
(v must_=== 0)
} yield (e must be left "Error!") and
(v must_=== 0)
)

def e37 =
Expand Down
4 changes: 2 additions & 2 deletions docs/interop/monix.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ needs to be available.
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import scalaz.zio.{ IO, DefaultRuntime }
import scalaz.zio.interop.monixio._
import scalaz.zio.interop.monix._

object UnsafeExample extends DefaultRuntime {
def main(args: Array[String]): Unit = {
Expand Down Expand Up @@ -71,7 +71,7 @@ def fromCoeval[A](coeval: eval.Coeval[A]): Task[A]
```scala
import monix.eval.Coeval
import scalaz.zio.{ IO, DefaultRuntime }
import scalaz.zio.interop.monixio._
import scalaz.zio.interop.monix._

object UnsafeExample extends DefaultRuntime {
def main(args: Array[String]): Unit = {
Expand Down
31 changes: 31 additions & 0 deletions docs/interop/twitter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
---
id: interop_twitter
title: "Twitter"
---

`interop-twitter` module provides capability to convert Twitter `Future` into ZIO `Task`.

### Example

```scala
import com.twitter.util.Future
import scalaz.zio.{ App, Task }
import scalaz.zio.console._
import scalaz.zio.interop.twitter._

object Example extends App {
def run(args: List[String]) = {
val program =
for {
_ <- putStrLn("Hello! What is your name?")
name <- getStrLn
greeting <- Task.fromTwitterFuture(Task(greet(name)))
_ <- putStrLn(greeting)
} yield ()

program.fold(_ => 1, _ => 0)
}

private def greet(name: String): Future[String] = Future.value(s"Hello, $name!")
}
```
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package scalaz.zio.interop

import monix.eval
import monix.execution.Scheduler
import _root_.monix.eval
import _root_.monix.execution.Scheduler
import org.specs2.concurrent.ExecutionEnv
import scalaz.zio.Exit.Cause.Fail
import scalaz.zio.{ FiberFailure, IO, TestRuntime }
import scalaz.zio.interop.monixio._
import scalaz.zio.interop.monix._

class MonixSpec(implicit ee: ExecutionEnv) extends TestRuntime {
def is = s2"""
Expand Down
8 changes: 4 additions & 4 deletions interop-monix/shared/src/main/scala/zio/interop/monix.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package scalaz.zio.interop

import monix.eval
import monix.execution.Scheduler
import _root_.monix.eval
import _root_.monix.execution.Scheduler
import scalaz.zio.{ IO, Task, UIO }

object monixio {
object monix {
implicit class IOObjOps(private val obj: IO.type) extends AnyVal {
def fromTask[A](task: eval.Task[A])(implicit scheduler: Scheduler): Task[A] =
Task.fromFuture(_ => task.runToFuture)
Expand All @@ -13,7 +13,7 @@ object monixio {
Task.fromTry(coeval.runTry())
}

implicit class IOThrowableOps[A](private val io: Task[A]) extends AnyVal {
implicit class TaskOps[A](private val io: Task[A]) extends AnyVal {
def toTask: UIO[eval.Task[A]] =
io.fold(eval.Task.raiseError, eval.Task.now)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2017-2019 John A. De Goes and the ZIO Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package scalaz.zio.interop

import com.twitter.util.{ Future, FutureCancelledException, Return, Throw }
import scalaz.zio.{ Task, UIO }

package object twitter {
implicit class TaskObjOps(private val obj: Task.type) extends AnyVal {
final def fromTwitterFuture[A](future: Task[Future[A]]): Task[A] =
future.flatMap { f =>
Task.effectAsyncInterrupt { cb =>
f.respond {
case Return(a) => cb(Task.succeed(a))
case Throw(e) => cb(Task.fail(e))
}

Left(UIO(f.raise(new FutureCancelledException)))
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package scalaz.zio.interop

import java.util.concurrent.atomic.AtomicInteger

import com.twitter.util.{ Duration => TwitterDuration, Future, JavaTimer }
import org.specs2.concurrent.ExecutionEnv
import scalaz.zio.{ FiberFailure, Task, TestRuntime }
import scalaz.zio.Exit.Cause.Fail
import scalaz.zio.duration.Duration
import scalaz.zio.interop.twitter._

import scala.concurrent.duration._

class TwitterSpec(implicit ee: ExecutionEnv) extends TestRuntime {
def is =
"Twitter spec".title ^ s2"""
`Task.fromTwitterFuture` must
return failing `Task` if future failed. $propagateFailures
return successful `Task` if future succeeded. $propagateResults
ensure future is interrupted together with task. $propagateInterrupts
"""

private def propagateFailures = {
val error = new Exception
val future = Task(Future.exception[Int](error))
val task = Task.fromTwitterFuture(future)

unsafeRun(task) must throwAn(FiberFailure(Fail(error)))
}

private def propagateResults = {
val value = 10
val future = Task(Future.value(value))
val task = Task.fromTwitterFuture(future)

unsafeRun(task) ==== value
}

private def propagateInterrupts = {
implicit val timer = new JavaTimer(true)

val value = new AtomicInteger(0)
val futureDelay = TwitterDuration.fromMilliseconds(300)
val future = Task(Future.sleep(futureDelay).map(_ => value.incrementAndGet()))

val taskTimeout = Duration.fromScala(100.millis)
val task = Task.fromTwitterFuture(future).timeout(taskTimeout)

unsafeRun(task) must beNone

MILLISECONDS.sleep(500)

value.get() ==== 0
}
}
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ addSbtPlugin("com.dwijnand" % "sbt-travisci" %
addSbtPlugin("org.scala-js" % "sbt-scalajs" % "0.6.27")
addSbtPlugin("org.portable-scala" % "sbt-scalajs-crossproject" % "0.6.0")
addSbtPlugin("com.47deg" % "sbt-microsites" % "0.9.0" exclude ("org.scalameta", "mdoc"))
addSbtPlugin("com.geirsson" % "sbt-scalafmt" % "1.6.0-RC4")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.0.0")
addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.9.0")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.1")
addSbtPlugin("com.thoughtworks.sbt-api-mappings" % "sbt-api-mappings" % "3.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ object ArbitraryStream {
for {
it <- Arbitrary.arbitrary[List[T]]
n <- Gen.choose(0, it.size)
} yield
ZStream.unfoldM((n, it)) {
case (_, Nil) | (0, _) =>
IO.fail("fail-case")
case (n, head :: rest) => IO.succeed(Some((head, (n - 1, rest))))
}
} yield ZStream.unfoldM((n, it)) {
case (_, Nil) | (0, _) =>
IO.fail("fail-case")
case (n, head :: rest) => IO.succeed(Some((head, (n - 1, rest))))
}
}
3 changes: 2 additions & 1 deletion website/sidebars.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
"interop/interop_monix",
"interop/interop_scalaz7x",
"interop/interop_scalaz_8",
"interop/interop_reactivestreams"
"interop/interop_reactivestreams",
"interop/interop_twitter"
]
},
"usecases-sidebar": {
Expand Down

0 comments on commit ce37ecd

Please sign in to comment.