Skip to content

Commit

Permalink
Iterant, version 2 (#683)
Browse files Browse the repository at this point in the history
* WIP: introduce Scope to Iterant encoding

* Encode resource using Ref; fix Scope concat.

* Change encoding

* Update to Cats-Effect 1.0.0-RC2

* Fix onErrorHandle and attempt

* 18/19 MapBatch

* Fixed attempt/onErrorHandleWith

* IterantFoldLeft progress

Signed-off-by: Piotr Gawryś <pgawrys2@gmail.com>

* Restore `Iterant#map` tests

* Fix Iterant#zipWithIndex

* Restore `Iterant#collect` tests

* Fix Iterant#drop

* Fix Iterant#bufferSliding and batched

* fix IterantDropWhile

* Fix Iterant#filter

* Fix Iterant#liftMap

* Restore tests for Iterant#mapEval

* Fix Iterant#skipSuspendL

* Fix Iterant#headOptionL

* Fix Iterant#take and Iterant#scan

* Restore fromIndexedSeq, fromIterable, fromList

Signed-off-by: Avasil <pgawrys2@gmail.com>

* Remove Iterant#skipSuspendL

* Fix `guarantee` with `Iterant#take`

* Fix Iterant#distinctUntilChanged{ByKey}

* Fix IterantDropWhile

* Fix Iterant#dropLast

* Fix Iterant#foldWhileLeft{Eval}L

* Fix Iterant#intersperse

* Fix Iterant#reduce

* Fix IterantTakeWhile

* Fix IterantTakeWhileWithIndex

* Fix IterantDropWhileWithIndex

* Remove Iterant's DoOnEarlyStop / DoOnFinish tests

* Fix IterantDump

* Restore Iterant FromSeq, FromStateAction, Ranges suites

* IterantDump handle concat/scope like suspend

* Introduce the visitor pattern

* Fix IterantTakeLast

* Refactor Iterant.foldLeft to use Visitor

* Restore Iterant#scanMap suite

* Refactor Iterant.map, add benchmark

* Restore Iterant#scanEval

* Optimize Concat

* Cleanup junk from Iterant.concat

* Fix IterantRepeat

* Refactor Iterant#collect

* Refactor Iterant.completeL

* Refactor Iterant#distinctUntilChanged

* Fixed Iterant#drop, refactored Iterant#distinctUntilChanged

* Remove stack from Iterant.drop implementation

* Remove stack from Iterant.distinctUntilChanged implementation

* Refactor Iterant.dropLast

* Refactor Iterant.dropWhile, Iterant.filter

* Refactor Iterant.dropWhileWithIndex + dump

* Fix Iterant.foldRightL

* Refactor Iterant.foldWhileLeft

* Refactor Iterant.mapBatch

* Refactor Iterant.mapEval

* Refactor Iterant.liftMap

* Add AndThen tests

* Remove AndThen for now

* Refactor Iterant.attempt and onErrorHandleWith

* Iterant.reduce refactoring

* Refactor IterantScanEval

* Refactor Iterant.headOptionL

* Iterant.switchIfEmpty fix

* Refactor Iterant.tail

* Refactor Iterant.take

* Refactor Iterant.takeLast

* Refactor Iterant.takeEveryNth

* Refactor Iterant.takeWhile

* Refactor Iterant.takeWhileWithIndex

* Change Iterant.Scope to Iterant.Resource w/ new encoding

* Fix Iterant.attempt + onErrorHandleWith

* Fix Iterant.foldWhileLeftL

* Fix Iterant folds

* Fix Iterant.dump test

* Rename Resource back to Scope

* Reintroduce stack usage in Iterant.foldLeftL

* Fix Iterant.completeL + foldLeftL + foldWhileLeftL

* Refactor Iterant.headOptionL

* Refactor Iterant.foldRight

* Fix Iterant.zip, zipMap and parZipMap, parZip

* Remove junk

* Fix Iterant.interleave

* Fix Iterant.interleave

* Fix Iterant.toReactivePublisher

* Fix Iterant.toReactivePublisher

* Partial fromReactiveStream implementation

* Change Iterant.fromReactivePublisher

* Fix tests

* Fix tests, builders

* Fix Mima issues

* Fix Scala 2.12.6 warning
  • Loading branch information
alexandru committed Jul 4, 2018
1 parent b9dc4a0 commit e98d64d
Show file tree
Hide file tree
Showing 122 changed files with 6,270 additions and 4,819 deletions.
@@ -0,0 +1,77 @@
/*
* Copyright (c) 2014-2018 by The Monix Project Developers.
* See the project homepage at: https://monix.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.benchmarks

import java.util.concurrent.TimeUnit
import monix.eval.Coeval
import monix.tail.Iterant
import org.openjdk.jmh.annotations._

/** To do comparative benchmarks between versions:
*
* benchmarks/run-benchmark IterantMapFoldBenchmark
*
* This will generate results in `benchmarks/results`.
*
* Or to run the benchmark from within SBT:
*
* jmh:run -i 10 -wi 10 -f 2 -t 1 monix.benchmarks.IterantMapFoldBenchmark
*
* Which means "10 iterations", "10 warm-up iterations", "2 forks", "1 thread".
* Please note that benchmarks should be usually executed at least in
* 10 iterations (as a rule of thumb), but more is better.
*/
@State(Scope.Thread)
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.SECONDS)
class IterantMapFoldBenchmark {
import IterantMapFoldBenchmark._

@Benchmark
def granular(): Long =
granularRef.map(_ + 1).foldLeftL(0L)(_ + _).value

@Benchmark
def batched(): Long =
batchedRef.map(_ + 1).foldLeftL(0L)(_ + _).value
}

object IterantMapFoldBenchmark {
val size = 1000

val granularRef: Iterant[Coeval, Int] = {
var stream = Iterant[Coeval].empty[Int]
var idx = 0
while (idx < size) {
stream = idx +: stream
idx += 1
}
stream
}

val batchedRef: Iterant[Coeval, Int] = {
var stream = Iterant[Coeval].empty[Int]
var idx = 0
while (idx < size) {
val rest = stream
stream = rest ++ Iterant[Coeval].range(0, 10)
idx += 1
}
stream
}
}
22 changes: 15 additions & 7 deletions build.sbt
Expand Up @@ -12,7 +12,9 @@ addCommandAlias("ci-js", ";clean ;coreJS/test:compile ;coreJS/test")
addCommandAlias("release", ";project monix ;+clean ;+package ;+publishSigned ;sonatypeReleaseAll")

val catsVersion = "1.1.0"
val catsEffectVersion = "1.0.0-RC2"
// Hash version is safe, containing a laws fix over 1.0.0-RC2:
// https://github.com/typelevel/cats-effect/pull/277
val catsEffectVersion = "1.0.0-RC2-93ac33d"
val jcToolsVersion = "2.1.1"
val reactiveStreamsVersion = "1.0.2"
val scalaTestVersion = "3.0.4"
Expand Down Expand Up @@ -44,8 +46,8 @@ lazy val warnUnusedImport = Seq(

lazy val sharedSettings = warnUnusedImport ++ Seq(
organization := "io.monix",
scalaVersion := "2.12.4",
crossScalaVersions := Seq("2.11.12", "2.12.4"),
scalaVersion := "2.12.6",
crossScalaVersions := Seq("2.11.12", "2.12.6"),

scalacOptions ++= Seq(
// warnings
Expand All @@ -57,7 +59,9 @@ lazy val sharedSettings = warnUnusedImport ++ Seq(
"-language:implicitConversions",
"-language:experimental.macros",
// possibly deprecated options
"-Ywarn-inaccessible"
"-Ywarn-inaccessible",
// absolutely necessary for Iterant
"-Ypartial-unification"
),

// Force building with Java 8
Expand Down Expand Up @@ -297,6 +301,8 @@ def mimaSettings(projectName: String) = Seq(
exclude[IncompatibleResultTypeProblem]("monix.eval.Task.foreach"),
// Breakage - changed type
exclude[IncompatibleResultTypeProblem]("monix.execution.Cancelable.empty"),
// Breackage — made CompositeException final
exclude[FinalClassProblem]("monix.execution.exceptions.CompositeException"),
// Breakage — extra implicit param
exclude[DirectMissingMethodProblem]("monix.eval.TaskInstancesLevel0.catsEffect"),
exclude[DirectMissingMethodProblem]("monix.eval.instances.CatsConcurrentEffectForTask.this"),
Expand Down Expand Up @@ -359,7 +365,9 @@ def mimaSettings(projectName: String) = Seq(
exclude[IncompatibleResultTypeProblem]("monix.execution.internal.collection.ArrayStack.clone"),
exclude[MissingTypesProblem]("monix.execution.internal.collection.ArrayStack"),
exclude[DirectMissingMethodProblem]("monix.eval.internal.TaskCancellation#RaiseCancelable.this"),
exclude[MissingClassProblem]("monix.eval.internal.TaskBracket$ReleaseRecover")
exclude[MissingClassProblem]("monix.eval.internal.TaskBracket$ReleaseRecover"),
exclude[MissingClassProblem]("monix.eval.instances.ParallelApplicative$"),
exclude[MissingClassProblem]("monix.eval.instances.ParallelApplicative")
)
)

Expand Down Expand Up @@ -442,14 +450,14 @@ lazy val tailCommon =

lazy val tailJVM = project.in(file("monix-tail/jvm"))
.configure(profile)
.dependsOn(evalJVM % "compile->compile; test->test")
.dependsOn(evalJVM % "test->test")
.dependsOn(executionJVM)
.settings(tailCommon)

lazy val tailJS = project.in(file("monix-tail/js"))
.enablePlugins(ScalaJSPlugin)
.configure(profile)
.dependsOn(evalJS % "compile->compile; test->test")
.dependsOn(evalJS % "test->test")
.dependsOn(executionJS)
.settings(scalaJSSettings)
.settings(tailCommon)
Expand Down
Expand Up @@ -17,7 +17,7 @@

package monix.eval.internal

import monix.eval.Task.{Async, Context, FlatMap, Map}
import monix.eval.Task.{Async, Context, ContextSwitch, FlatMap, Map}
import monix.eval.{Callback, Task}

import scala.annotation.tailrec
Expand Down Expand Up @@ -48,6 +48,7 @@ private[eval] object ForkedRegister {
case Async(_: ForkedRegister[_], _, _, _) => true
case FlatMap(other, _) => detect(other, limit - 1)
case Map(other, _, _) => detect(other, limit - 1)
case ContextSwitch(other, _, _) => detect(other, limit - 1)
case _ => false
} else {
false
Expand Down
Expand Up @@ -195,7 +195,7 @@ trait ArbitraryInstancesBase extends monix.execution.ArbitraryInstances {
implicit def arbitraryPfExToA[A](implicit A: Arbitrary[A]): Arbitrary[PartialFunction[Throwable, A]] =
Arbitrary {
val fun = implicitly[Arbitrary[Int => A]]
for (f <- fun.arbitrary) yield PartialFunction((t: Throwable) => f(t.hashCode()))
for (f <- fun.arbitrary) yield { case (t: Throwable) => f(t.hashCode()) }
}

implicit def arbitraryCoevalToLong[A, B](implicit A: Arbitrary[A], B: Arbitrary[B]): Arbitrary[Coeval[A] => B] =
Expand All @@ -216,7 +216,9 @@ trait ArbitraryInstancesBase extends monix.execution.ArbitraryInstances {
implicit def equalityCoeval[A](implicit A: Eq[A]): Eq[Coeval[A]] =
new Eq[Coeval[A]] {
def eqv(lh: Coeval[A], rh: Coeval[A]): Boolean = {
Eq[Try[A]].eqv(lh.runTry(), rh.runTry())
val lht = lh.runTry()
val rht = rh.runTry()
Eq[Try[A]].eqv(lht, rht)
}
}

Expand Down
69 changes: 69 additions & 0 deletions monix-eval/shared/src/test/scala/monix/eval/TestUtils.scala
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2014-2018 by The Monix Project Developers.
* See the project homepage at: https://monix.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.eval

import java.io.{ByteArrayOutputStream, PrintStream}
import scala.util.control.NonFatal

/**
* INTERNAL API — test utilities.
*/
trait TestUtils {

/**
* Silences `System.err`, only printing the output in case exceptions are
* thrown by the executed `thunk`.
*/
def silenceSystemErr[A](thunk: => A): A = synchronized {
// Silencing System.err
val oldErr = System.err
val outStream = new ByteArrayOutputStream()
val fakeErr = new PrintStream(outStream)
System.setErr(fakeErr)
try {
val result = thunk
System.setErr(oldErr)
result
} catch {
case NonFatal(e) =>
System.setErr(oldErr)
// In case of errors, print whatever was caught
fakeErr.close()
val out = outStream.toString("utf-8")
if (out.nonEmpty) oldErr.println(out)
throw e
}
}

/**
* Catches `System.err` output, for testing purposes.
*/
def catchSystemErr(thunk: => Unit): String = synchronized {
val oldErr = System.err
val outStream = new ByteArrayOutputStream()
val fakeErr = new PrintStream(outStream)
System.setErr(fakeErr)
try {
thunk
} finally {
System.setErr(oldErr)
fakeErr.close()
}
outStream.toString("utf-8")
}
}
Expand Up @@ -19,7 +19,8 @@ package monix.eval

import cats.Applicative
import cats.laws.discipline.ApplicativeTests
import monix.eval.instances.{CatsParallelForTask, ParallelApplicative}
import monix.eval.instances.CatsParallelForTask
import monix.execution.internal.ParallelApplicative

object TypeClassLawsForParallelApplicativeSuite extends BaseLawsSuite {
implicit val ap: Applicative[Task] =
Expand Down
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2014-2018 by The Monix Project Developers.
* See the project homepage at: https://monix.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.execution
package internal

import scala.concurrent.ExecutionContext

/**
* INTERNAL API — implements [[UncaughtExceptionReporter.default]].
*/
private[execution] object DefaultUncaughtExceptionReporter
extends UncaughtExceptionReporter {

def reportFailure(e: Throwable): Unit =
logger(e)

private[this] lazy val logger =
ExecutionContext.defaultReporter
}
Expand Up @@ -19,7 +19,6 @@ package monix.execution.internal

import monix.execution.exceptions.CompositeException
import monix.execution.schedulers.CanBlock

import scala.concurrent.Awaitable
import scala.concurrent.duration.Duration

Expand Down Expand Up @@ -96,17 +95,22 @@ private[monix] object Platform {
rest.filter(_ ne first).toList match {
case Nil => first
case nonEmpty =>
CompositeException(first :: nonEmpty)
first match {
case CompositeException(errors) =>
val list = errors.toList
CompositeException(list ::: nonEmpty)
case _ =>
CompositeException(first :: nonEmpty)
}
}

/** Useful utility that combines an `Either` result, which is what
/**
* Useful utility that combines an `Either` result, which is what
* `MonadError#attempt` returns.
*/
def composeErrors(first: Throwable, second: Either[Throwable, _]): Throwable =
second match {
case Left(e2) if e2 ne first =>
CompositeException(List(first, e2))
case _ =>
first
case Left(e2) => composeErrors(first, e2)
case _ => first
}
}
Expand Up @@ -44,4 +44,4 @@ private[execution] class StandardContext(reporter: UncaughtExceptionReporter)
}

private[execution] object StandardContext
extends StandardContext(UncaughtExceptionReporter.LogExceptionsToStandardErr)
extends StandardContext(UncaughtExceptionReporter.default)
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2014-2018 by The Monix Project Developers.
* See the project homepage at: https://monix.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.execution
package internal

/**
* INTERNAL API — implements [[UncaughtExceptionReporter.default]].
*/
private[execution] object DefaultUncaughtExceptionReporter
extends UncaughtExceptionReporter {

def reportFailure(e: Throwable): Unit =
Thread.getDefaultUncaughtExceptionHandler match {
case null => e.printStackTrace()
case h => h.uncaughtException(Thread.currentThread(), e)
}
}
Expand Up @@ -18,12 +18,14 @@
package monix.execution.schedulers

import java.util.concurrent.{Executors, ScheduledExecutorService}
import monix.execution.UncaughtExceptionReporter.LogExceptionsToStandardErr
import monix.execution.UncaughtExceptionReporter

private[schedulers] object Defaults {
/** Internal. Provides the `Scheduler.DefaultScheduledExecutor` instance. */
/**
* Internal. Provides the `Scheduler.DefaultScheduledExecutor` instance.
*/
lazy val scheduledExecutor: ScheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(
ThreadFactoryBuilder("monix-scheduler", LogExceptionsToStandardErr, daemonic = true)
ThreadFactoryBuilder("monix-scheduler", UncaughtExceptionReporter.default, daemonic = true)
)
}

0 comments on commit e98d64d

Please sign in to comment.