From f73714e8d5168af9338f5f3d231611bbca20c435 Mon Sep 17 00:00:00 2001 From: Pavel Chlupacek Date: Thu, 4 Jan 2018 18:33:52 +0100 Subject: [PATCH] Added special handling of Uncons and CloseScope in case of interrupt --- build.sbt | 14 ++++---- core/jvm/src/test/scala/fs2/Pipe2Spec.scala | 10 +++++- .../src/main/scala/fs2/internal/Algebra.scala | 32 ++++++++++++++++--- 3 files changed, 44 insertions(+), 12 deletions(-) diff --git a/build.sbt b/build.sbt index 06bafd10a4..7ee7056497 100644 --- a/build.sbt +++ b/build.sbt @@ -27,7 +27,7 @@ lazy val commonSettings = Seq( "-language:postfixOps", "-Ypartial-unification" ) ++ - (if (scalaBinaryVersion.value startsWith "2.12") + (if (scalaBinaryVersion.value.startsWith("2.12")) List( "-Xlint", "-Xfatal-warnings", @@ -35,7 +35,7 @@ lazy val commonSettings = Seq( "-Ywarn-value-discard", "-Ywarn-unused-import" ) - else Nil) ++ (if (scalaBinaryVersion.value startsWith "2.11") + else Nil) ++ (if (scalaBinaryVersion.value.startsWith("2.11")) List("-Xexperimental") else Nil), // 2.11 needs -Xexperimental to enable SAM conversion @@ -93,7 +93,7 @@ lazy val scaladocSettings = Seq( "-implicits-sound-shadowing", "-implicits-show-all" ), - scalacOptions in (Compile, doc) ~= { _ filterNot { _ == "-Xfatal-warnings" } }, + scalacOptions in (Compile, doc) ~= { _.filterNot { _ == "-Xfatal-warnings" } }, autoAPIMappings := true ) @@ -101,9 +101,9 @@ lazy val publishingSettings = Seq( publishTo := { val nexus = "https://oss.sonatype.org/" if (version.value.trim.endsWith("SNAPSHOT")) - Some("snapshots" at nexus + "content/repositories/snapshots") + Some("snapshots".at(nexus + "content/repositories/snapshots")) else - Some("releases" at nexus + "service/local/staging/deploy/maven2") + Some("releases".at(nexus + "service/local/staging/deploy/maven2")) }, credentials ++= (for { username <- Option(System.getenv().get("SONATYPE_USERNAME")) @@ -281,7 +281,7 @@ lazy val benchmarkMacros = project .settings(noPublish) .settings( name := "fs2-benchmark-macros", - addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.1" cross CrossVersion.patch), + addCompilerPlugin(("org.scalamacros" % "paradise" % "2.1.1").cross(CrossVersion.patch)), libraryDependencies += scalaOrganization.value % "scala-reflect" % scalaVersion.value ) @@ -294,7 +294,7 @@ lazy val benchmark = project name := "fs2-benchmark" ) .settings( - addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.1" cross CrossVersion.patch), + addCompilerPlugin(("org.scalamacros" % "paradise" % "2.1.1").cross(CrossVersion.patch)), libraryDependencies += scalaOrganization.value % "scala-reflect" % scalaVersion.value ) .enablePlugins(JmhPlugin) diff --git a/core/jvm/src/test/scala/fs2/Pipe2Spec.scala b/core/jvm/src/test/scala/fs2/Pipe2Spec.scala index 03da8fc255..410c7323ec 100644 --- a/core/jvm/src/test/scala/fs2/Pipe2Spec.scala +++ b/core/jvm/src/test/scala/fs2/Pipe2Spec.scala @@ -215,8 +215,15 @@ class Pipe2Spec extends Fs2Spec { "interrupt (4)" in { // tests the interruption of the constant stream with flatMap combinator + // for { i <- 0 until 1000} yield { val interrupt = - mkScheduler.flatMap { _.sleep_[IO](20.millis) }.compile.drain.attempt + mkScheduler + .flatMap { + _.sleep_[IO](20.millis) + } + .compile + .drain + .attempt Stream .constant(true) .covary[IO] @@ -227,6 +234,7 @@ class Pipe2Spec extends Fs2Spec { .compile .drain .unsafeRunSync + // } } "interrupt (5)" in { diff --git a/core/shared/src/main/scala/fs2/internal/Algebra.scala b/core/shared/src/main/scala/fs2/internal/Algebra.scala index fbce12adaa..afd2f9fc8e 100644 --- a/core/shared/src/main/scala/fs2/internal/Algebra.scala +++ b/core/shared/src/main/scala/fs2/internal/Algebra.scala @@ -159,7 +159,7 @@ private[fs2] object Algebra { } case alg: Effectful[F, O, r] => - F.flatMap(compileShared(scope, alg)) { + F.flatMap(compileEffect(scope, alg)) { case (scope, r) => compileUncons(scope, f(r), chunkSize, maxSteps) } @@ -216,19 +216,43 @@ private[fs2] object Algebra { } case alg: Effectful[F, O, _] => - F.flatMap(compileShared(scope, alg)) { + F.flatMap(compileEffect(scope, alg)) { case (scope, r) => compileFoldLoop(scope, acc, g, f(r)) } } - case Some(rsn) => compileFoldLoop(scope, acc, g, f(Left(rsn))) + case Some(int: Interrupted) => + fx match { + case uncons: Algebra.Uncons[F, x, O] => + F.flatMap( + F.attempt( + compileUncons(scope, + uncons.s.asHandler(int), + uncons.chunkSize, + uncons.maxSteps))) { + case Right((scope, u)) => + compileFoldLoop(scope, acc, g, f(Right(u))) + case Left(err) => compileFoldLoop(scope, acc, g, f(Left(err))) + } + + case c: Algebra.CloseScope[F, O] => + F.flatMap(c.toClose.close) { result => + F.flatMap(c.toClose.openAncestor) { scopeAfterClose => + compileFoldLoop(scopeAfterClose, acc, g, f(Left(int))) + } + } + + case other => compileFoldLoop(scope, acc, g, f(Left(int))) + } + case Some(rsn) => + compileFoldLoop(scope, acc, g, f(Left(rsn))) } case e => sys.error("FreeC.ViewL structure must be Pure(a), Fail(e), or Bind(Eval(fx),k), was: " + e) } - def compileShared[F[_], O]( + def compileEffect[F[_], O]( scope: CompileScope[F], eff: Effectful[F, O, _] )(implicit F: Sync[F]): F[(CompileScope[F], Either[Throwable, Any])] =