Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Twitter futures #833

Merged
merged 22 commits into from
May 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
version = "2.0.0-RC6"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary? It's added by metals, isn't it? If possible I'd move that out of the way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, it is. All hell breaks loose if you remove it :).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sad 😞

maxColumn = 120
align = most
continuationIndent.defnSite = 2
Expand All @@ -11,5 +12,4 @@ spaces {
}
optIn.annotationNewlines = true

rewrite.rules = [SortImports, RedundantBraces]

rewrite.rules = [SortImports, RedundantBraces]
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
- &scaladoc
stage: microsite
name: "Generate Scaladoc"
script: sbt ++$TRAVIS_SCALA_VERSION coreJVM/doc coreJS/doc interopCatsJVM/doc interopCatsJS/doc interopFutureJVM/doc interopFutureJS/doc interopScalaz7xJVM/doc interopScalaz7xJS/doc interopMonixJVM/doc interopMonixJS/doc interopJavaJVM/doc interopReactiveStreamsJVM/doc
script: sbt ++$TRAVIS_SCALA_VERSION coreJVM/doc coreJS/doc interopCatsJVM/doc interopCatsJS/doc interopFutureJVM/doc interopScalaz7xJVM/doc interopScalaz7xJS/doc interopMonixJVM/doc interopMonixJS/doc interopJavaJVM/doc interopReactiveStreamsJVM/doc interopTwitterJVM/doc
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is interopFutureJS/doc gone by accident or for purpose? WHy?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was on purpose, as I think there isn't (wasn't?) such interop atm. I can reintroduce it though.

- &microsite
stage: microsite
name: "Generate microsite"
Expand Down
28 changes: 24 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ lazy val root = project
interopCatsJVM,
interopCatsJS,
interopFutureJVM,
// interopMonixJVM,
// interopMonixJS,
interopMonixJVM,
interopMonixJS,
interopScalaz7xJVM,
interopScalaz7xJS,
interopJavaJVM,
interopReactiveStreamsJVM,
// benchmarks,
interopTwitterJVM,
benchmarks,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the reason to have benchmarks commented out?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No idea. It was commented out for a long time now. As far as I can see it's commented out together with Monix during "death of TF" PR :).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember it :D , I wondered why it was already commented before

testkitJVM,
docs
)
Expand Down Expand Up @@ -225,6 +226,16 @@ lazy val interopReactiveStreams = crossProject(JVMPlatform)

lazy val interopReactiveStreamsJVM = interopReactiveStreams.jvm.dependsOn(interopSharedJVM)

lazy val interopTwitter = crossProject(JSPlatform, JVMPlatform)
.in(file("interop-twitter"))
.settings(stdSettings("zio-interop-twitter"))
.settings(
libraryDependencies += "com.twitter" %% "util-core" % "19.4.0"
)
.dependsOn(core % "test->test;compile->compile")

lazy val interopTwitterJVM = interopTwitter.jvm.dependsOn(interopSharedJVM)

lazy val testkit = crossProject(JVMPlatform)
.in(file("testkit"))
.settings(stdSettings("zio-testkit"))
Expand Down Expand Up @@ -281,5 +292,14 @@ lazy val docs = project.module
"org.jsoup" % "jsoup" % "1.11.3" % "provided"
)
)
.dependsOn(coreJVM, interopCatsJVM, interopFutureJVM, interopScalaz7xJVM, interopJavaJVM, interopReactiveStreamsJVM)
.dependsOn(
coreJVM,
interopCatsJVM,
interopFutureJVM,
interopMonixJVM,
interopScalaz7xJVM,
interopJavaJVM,
interopReactiveStreamsJVM,
interopTwitterJVM
)
.enablePlugins(MdocPlugin, DocusaurusPlugin)
27 changes: 12 additions & 15 deletions core/jvm/src/test/scala/scalaz/zio/IOSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,9 @@ class IOSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends TestRuntim
failure = new Exception("expected")
_ <- IO.fail(failure).when(false)
failed <- IO.fail(failure).when(true).either
} yield
(val1 must_=== 0) and
(val2 must_=== 2) and
(failed must beLeft(failure))
} yield (val1 must_=== 0) and
(val2 must_=== 2) and
(failed must beLeft(failure))
)

def testWhenM =
Expand All @@ -175,12 +174,11 @@ class IOSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends TestRuntim
failure = new Exception("expected")
_ <- IO.fail(failure).whenM(conditionFalse)
failed <- IO.fail(failure).whenM(conditionTrue).either
} yield
(val1 must_=== 0) and
(conditionVal1 must_=== 1) and
(val2 must_=== 2) and
(conditionVal2 must_=== 2) and
(failed must beLeft(failure))
} yield (val1 must_=== 0) and
(conditionVal1 must_=== 1) and
(val2 must_=== 2) and
(conditionVal2 must_=== 2) and
(failed must beLeft(failure))
)

def testUnsandbox = {
Expand Down Expand Up @@ -260,11 +258,10 @@ class IOSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends TestRuntim
both <- (ZIO.halt(Cause.Both(Cause.Interrupt, Cause.die(ex))) <> IO.unit).run
thn <- (ZIO.halt(Cause.Then(Cause.Interrupt, Cause.die(ex))) <> IO.unit).run
fail <- (ZIO.fail(ex) <> IO.unit).run
} yield
(plain must_=== Exit.die(ex))
.and(both must_=== Exit.die(ex))
.and(thn must_=== Exit.die(ex))
.and(fail must_=== Exit.succeed(()))
} yield (plain must_=== Exit.die(ex))
.and(both must_=== Exit.die(ex))
.and(thn must_=== Exit.die(ex))
.and(fail must_=== Exit.succeed(()))
}
}

Expand Down
9 changes: 4 additions & 5 deletions core/jvm/src/test/scala/scalaz/zio/QueueSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -491,11 +491,10 @@ class QueueSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends TestRun
v1 <- queue.take
v2 <- queue.take
v3 <- queue.take
} yield
(v must_=== Range.inclusive(1, 32).toList)
.and(v1 must_=== 33)
.and(v2 must_=== 34)
.and(v3 must_=== 35)
} yield (v must_=== Range.inclusive(1, 32).toList)
.and(v1 must_=== 33)
.and(v2 must_=== 34)
.and(v3 must_=== 35)
)

def e36 =
Expand Down
9 changes: 5 additions & 4 deletions core/jvm/src/test/scala/scalaz/zio/RefMSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,11 @@ class RefMSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends TestRunt
case Changed => IO.succeed("closed" -> Closed)
}
value2 <- refM.get
} yield
(r1 must beTheSameAs("changed")) and (value1 must beTheSameAs(Changed)) and (r2 must beTheSameAs("closed")) and (value2 must beTheSameAs(
Closed
))
} yield (r1 must beTheSameAs("changed")) and (value1 must beTheSameAs(Changed)) and (r2 must beTheSameAs(
"closed"
)) and (value2 must beTheSameAs(
Closed
))
)

def e11 =
Expand Down
12 changes: 5 additions & 7 deletions core/jvm/src/test/scala/scalaz/zio/stm/STMSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -473,10 +473,9 @@ final class STMSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends Tes
leftV1 <- STM.succeed(1).orElseEither(STM.succeed("No me!")).commit
leftV2 <- STM.succeed(2).orElseEither(STM.fail("No!")).commit
failedV <- STM.fail(-1).orElseEither(STM.fail(-2)).commit.either
} yield
(rightV must beRight(42)) and (leftV1 must beLeft(1)) and (leftV2 must beLeft(2)) and (failedV must beLeft(
-2
))
} yield (rightV must beRight(42)) and (leftV1 must beLeft(1)) and (leftV2 must beLeft(2)) and (failedV must beLeft(
-2
))
)

def e36 =
Expand All @@ -488,9 +487,8 @@ final class STMSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends Tes
_ <- STM.fail("Error!")
} yield ()).commit.either
v <- tvar.get.commit
} yield
(e must be left "Error!") and
(v must_=== 0)
} yield (e must be left "Error!") and
(v must_=== 0)
)

def e37 =
Expand Down
4 changes: 2 additions & 2 deletions docs/interop/monix.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ needs to be available.
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import scalaz.zio.{ IO, DefaultRuntime }
import scalaz.zio.interop.monixio._
import scalaz.zio.interop.monix._

object UnsafeExample extends DefaultRuntime {
def main(args: Array[String]): Unit = {
Expand Down Expand Up @@ -71,7 +71,7 @@ def fromCoeval[A](coeval: eval.Coeval[A]): Task[A]
```scala
import monix.eval.Coeval
import scalaz.zio.{ IO, DefaultRuntime }
import scalaz.zio.interop.monixio._
import scalaz.zio.interop.monix._

object UnsafeExample extends DefaultRuntime {
def main(args: Array[String]): Unit = {
Expand Down
31 changes: 31 additions & 0 deletions docs/interop/twitter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
---
id: interop_twitter
mijicd marked this conversation as resolved.
Show resolved Hide resolved
title: "Twitter"
---

`interop-twitter` module provides capability to convert Twitter `Future` into ZIO `Task`.

### Example

```scala
import com.twitter.util.Future
import scalaz.zio.{ App, Task }
import scalaz.zio.console._
import scalaz.zio.interop.twitter._

object Example extends App {
def run(args: List[String]) = {
val program =
for {
_ <- putStrLn("Hello! What is your name?")
name <- getStrLn
greeting <- Task.fromTwitterFuture(Task(greet(name)))
_ <- putStrLn(greeting)
} yield ()

program.fold(_ => 1, _ => 0)
}

private def greet(name: String): Future[String] = Future.value(s"Hello, $name!")
}
```
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package scalaz.zio.interop

import monix.eval
import monix.execution.Scheduler
import _root_.monix.eval
import _root_.monix.execution.Scheduler
import org.specs2.concurrent.ExecutionEnv
import scalaz.zio.Exit.Cause.Fail
import scalaz.zio.{ FiberFailure, IO, TestRuntime }
import scalaz.zio.interop.monixio._
import scalaz.zio.interop.monix._

class MonixSpec(implicit ee: ExecutionEnv) extends TestRuntime {
def is = s2"""
Expand Down
8 changes: 4 additions & 4 deletions interop-monix/shared/src/main/scala/zio/interop/monix.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package scalaz.zio.interop

import monix.eval
import monix.execution.Scheduler
import _root_.monix.eval
import _root_.monix.execution.Scheduler
import scalaz.zio.{ IO, Task, UIO }

object monixio {
object monix {
implicit class IOObjOps(private val obj: IO.type) extends AnyVal {
def fromTask[A](task: eval.Task[A])(implicit scheduler: Scheduler): Task[A] =
Task.fromFuture(_ => task.runToFuture)
Expand All @@ -13,7 +13,7 @@ object monixio {
Task.fromTry(coeval.runTry())
}

implicit class IOThrowableOps[A](private val io: Task[A]) extends AnyVal {
implicit class TaskOps[A](private val io: Task[A]) extends AnyVal {
def toTask: UIO[eval.Task[A]] =
io.fold(eval.Task.raiseError, eval.Task.now)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2017-2019 John A. De Goes and the ZIO Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package scalaz.zio.interop

import com.twitter.util.{ Future, FutureCancelledException, Return, Throw }
import scalaz.zio.{ Task, UIO }

package object twitter {
implicit class TaskObjOps(private val obj: Task.type) extends AnyVal {
final def fromTwitterFuture[A](future: Task[Future[A]]): Task[A] =
future.flatMap { f =>
Task.effectAsyncInterrupt { cb =>
f.respond {
case Return(a) => cb(Task.succeed(a))
case Throw(e) => cb(Task.fail(e))
}

Left(UIO(f.raise(new FutureCancelledException)))
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package scalaz.zio.interop

import java.util.concurrent.atomic.AtomicInteger

import com.twitter.util.{ Duration => TwitterDuration, Future, JavaTimer }
import org.specs2.concurrent.ExecutionEnv
import scalaz.zio.{ FiberFailure, Task, TestRuntime }
import scalaz.zio.Exit.Cause.Fail
import scalaz.zio.duration.Duration
import scalaz.zio.interop.twitter._

import scala.concurrent.duration._

class TwitterSpec(implicit ee: ExecutionEnv) extends TestRuntime {
def is =
"Twitter spec".title ^ s2"""
`Task.fromTwitterFuture` must
return failing `Task` if future failed. $propagateFailures
return successful `Task` if future succeeded. $propagateResults
ensure future is interrupted together with task. $propagateInterrupts
"""

private def propagateFailures = {
val error = new Exception
val future = Task(Future.exception[Int](error))
val task = Task.fromTwitterFuture(future)

unsafeRun(task) must throwAn(FiberFailure(Fail(error)))
}

private def propagateResults = {
val value = 10
val future = Task(Future.value(value))
val task = Task.fromTwitterFuture(future)

unsafeRun(task) ==== value
}

private def propagateInterrupts = {
implicit val timer = new JavaTimer(true)

val value = new AtomicInteger(0)
val futureDelay = TwitterDuration.fromMilliseconds(300)
val future = Task(Future.sleep(futureDelay).map(_ => value.incrementAndGet()))

val taskTimeout = Duration.fromScala(100.millis)
val task = Task.fromTwitterFuture(future).timeout(taskTimeout)

unsafeRun(task) must beNone

MILLISECONDS.sleep(500)

value.get() ==== 0
}
}
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ addSbtPlugin("com.dwijnand" % "sbt-travisci" %
addSbtPlugin("org.scala-js" % "sbt-scalajs" % "0.6.27")
addSbtPlugin("org.portable-scala" % "sbt-scalajs-crossproject" % "0.6.0")
addSbtPlugin("com.47deg" % "sbt-microsites" % "0.9.0" exclude ("org.scalameta", "mdoc"))
addSbtPlugin("com.geirsson" % "sbt-scalafmt" % "1.6.0-RC4")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.0.0")
addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.9.0")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.1")
addSbtPlugin("com.thoughtworks.sbt-api-mappings" % "sbt-api-mappings" % "3.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ object ArbitraryStream {
for {
it <- Arbitrary.arbitrary[List[T]]
n <- Gen.choose(0, it.size)
} yield
ZStream.unfoldM((n, it)) {
case (_, Nil) | (0, _) =>
IO.fail("fail-case")
case (n, head :: rest) => IO.succeed(Some((head, (n - 1, rest))))
}
} yield ZStream.unfoldM((n, it)) {
case (_, Nil) | (0, _) =>
IO.fail("fail-case")
case (n, head :: rest) => IO.succeed(Some((head, (n - 1, rest))))
}
}
3 changes: 2 additions & 1 deletion website/sidebars.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
"interop/interop_monix",
"interop/interop_scalaz7x",
"interop/interop_scalaz_8",
"interop/interop_reactivestreams"
"interop/interop_reactivestreams",
"interop/interop_twitter"
]
},
"usecases-sidebar": {
Expand Down