Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mapBatch for Iterant #622

Merged
merged 5 commits into from Mar 30, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 20 additions & 0 deletions monix-tail/shared/src/main/scala/monix/tail/Iterant.scala
Expand Up @@ -18,6 +18,7 @@
package monix.tail

import java.io.PrintStream

import cats.arrow.FunctionK
import cats.effect.{Async, Effect, Sync, _}
import cats.{Applicative, CoflatMap, Eq, Monoid, MonoidK, Order, Parallel}
Expand All @@ -27,6 +28,7 @@ import monix.execution.misc.NonFatal
import monix.tail.batches.{Batch, BatchCursor}
import monix.tail.internal._
import org.reactivestreams.Publisher

import scala.collection.immutable.LinearSeq
import scala.collection.mutable
import scala.concurrent.duration.{Duration, FiniteDuration}
Expand Down Expand Up @@ -760,6 +762,24 @@ sealed abstract class Iterant[F[_], A] extends Product with Serializable {
final def map[B](f: A => B)(implicit F: Sync[F]): Iterant[F, B] =
IterantMap(this, f)(F)

/** Returns a new stream by mapping the supplied function over the
* elements of the source yielding `Iterant` consisting of `NextBatch` nodes.
*
* {{{
* // Yields 1, 2, 3, 4, 5
* Iterant[Task].of(List(1, 2, 3), List(4), List(5)).mapBatch(Batch.fromSeq(_))
* // Yields 2, 4, 6
* Iterant[Task].of(1, 2, 3).mapBatch(x => Batch(x * 2))
* }}}
*
* @param f is the mapping function that transforms the source into batches.
*
* @return a new iterant that's the result of mapping the given
* function over the source
*/
final def mapBatch[B](f: A => Batch[B])(implicit F: Sync[F]): Iterant[F, B] =
IterantMapBatch(this, f)(F)

/** Optionally selects the first element.
*
* {{{
Expand Down
@@ -0,0 +1,95 @@
/*
* Copyright (c) 2014-2018 by The Monix Project Developers.
* See the project homepage at: https://monix.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.tail.internal

import cats.effect.Sync
import cats.syntax.all._
import monix.execution.misc.NonFatal
import monix.tail.Iterant
import monix.tail.Iterant.{Halt, Last, Next, NextBatch, NextCursor, Suspend}
import monix.tail.batches.Batch
import monix.tail.internal.IterantUtils.signalError

import scala.collection.mutable.ArrayBuffer

private[tail] object IterantMapBatch {
/**
* Implementation for `Iterant#mapBatch`
*/
def apply[F[_], A, B](source: Iterant[F, A], f: A => Batch[B])
(implicit F: Sync[F]): Iterant[F, B] = {

def processSeq(ref: NextCursor[F, A]): NextBatch[F, B] = {
val NextCursor(cursor, rest, stop) = ref
val buffer = ArrayBuffer.empty[B]
var toProcess = cursor.recommendedBatchSize
var nextRef: F[Iterant[F, B]] = null.asInstanceOf[F[Iterant[F, B]]]

// protects against infinite cursors
while (toProcess > 0 && cursor.hasNext()) {
val cursorB = f(cursor.next()).cursor()
while (toProcess > 0 && cursorB.hasNext()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When that mapping function gives you a Batch, there's no point in splitting it any further, because the Batch is already built for you and you can just serve it as is in a NextBatch node. All operations need to handle fairness concerns via recommendedBatchSize, so even if you serve bigger batches via this function, it doesn't matter.

The point of mapBatch is to be used in certain cases for optimizing a stream, e.g. for turning an Iterant[F, Array[A]] into an Iterant[F, A]. If we end up splitting those batches, then it becomes too heavy. At that point we might as well work with flatMap directly:

def mapBatch[F[_] : Sync, A](fa: Iterant[F, A])(f: A => Batch[B]): Iterant[F, B] =
  fa.flatMap { a =>
    Iterant.nextBatchS(f(a), Iterant.empty, F.unit)
  }

So here what I see happening is no batched processing at all, e.g..

val batch = f(cursor.next()).cursor()
val next = 
  if (cursor.hasNext()) NextCursor(cursor, rest.map(loop), stop)
  else rest.map(loop)

NextBatch(batch, next, stop)

buffer += cursorB.next()
toProcess -= 1
}
if (cursorB.hasNext()) {
val next: F[Iterant[F, A]] = if (cursor.hasNext()) F.pure(ref) else rest
nextRef = F.delay(NextCursor(cursorB, next.map(loop), stop))
}
}

val next: F[Iterant[F, B]] =
if (nextRef != null)
nextRef
else if (cursor.hasNext())
F.pure(ref).map(loop)
else
rest.map(loop)

NextBatch(Batch.fromSeq(buffer, cursor.recommendedBatchSize), next, stop)
}

def loop(source: Iterant[F, A]): Iterant[F, B] =
try source match {
case Next(head, tail, stop) =>
NextBatch[F, B](f(head), tail.map(loop), stop)
case ref@NextCursor(_, _, _) =>
processSeq(ref)
case NextBatch(batch, rest, stop) =>
processSeq(NextCursor(batch.cursor(), rest, stop))
case Suspend(rest, stop) =>
Suspend[F, B](rest.map(loop), stop)
case Last(item) =>
processSeq(NextCursor[F, A](Batch(item).cursor(), F.delay(Halt[F, A](None)), F.unit))
case empty@Halt(_) =>
empty.asInstanceOf[Iterant[F, B]]
} catch {
case ex if NonFatal(ex) => signalError(source, ex)
}

source match {
case Suspend(_, _) | Halt(_) => loop(source)
case _ =>
// Suspending execution in order to preserve laziness and
// referential transparency, since the provided function can
// be side effecting and because processing NextBatch and
// NextCursor states can have side effects
Suspend(F.delay(loop(source)), source.earlyStop)
}
}
}
233 changes: 233 additions & 0 deletions monix-tail/shared/src/test/scala/monix/tail/IterantMapBatchSuite.scala
@@ -0,0 +1,233 @@
/*
* Copyright (c) 2014-2018 by The Monix Project Developers.
* See the project homepage at: https://monix.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package monix.tail

import cats.laws._
import cats.laws.discipline._
import monix.eval.{Coeval, Task}
import monix.execution.cancelables.BooleanCancelable
import monix.execution.exceptions.DummyException
import monix.tail.batches.{Batch, BatchCursor}

import scala.util.Failure

object IterantMapBatchSuite extends BaseTestSuite {
test("Iterant[Task].mapBatch(f) equivalence with List.flatMap(f andThen (_.toList))") { implicit s =>
check2 { (stream: Iterant[Task, Array[Int]], f: Array[Int] => Long) =>
val g = f andThen (Batch.apply(_))
stream.mapBatch(g).toListL <->
stream.toListL.map(_.flatMap(g andThen (_.toList)))
}
}

test("Iterant[Task].mapBatch works for functions producing batches bigger than recommendedBatchSize") { implicit s =>
check2 { (list: List[Int], elem: Int) =>
val stream = Iterant[Task].nextBatchS(Batch.fromSeq(list, batches.defaultBatchSize), Task.delay(Iterant[Task].lastS[Int](elem)), Task.unit)
val f: Int => List[Int] = List.fill(batches.defaultBatchSize * 2)(_)

val received = stream.mapBatch(f andThen (Batch.fromSeq(_))).toListL
val expected = stream.toListL.map(_.flatMap(f))

received <-> expected
}
}

test("Iterant[Task].mapBatch can handle errors") { implicit s =>
val dummy = DummyException("dummy")
val stream = Iterant[Task].raiseError[Int](dummy)
assertEquals(stream, stream.mapBatch(Batch.apply(_)))
}

test("Iterant[Task].next.mapBatch guards against direct user code errors") { implicit s =>
val dummy = DummyException("dummy")
var isCanceled = false

val stream = Iterant[Task].nextS(1, Task(Iterant[Task].empty[Int]), Task {
isCanceled = true
})
val result = stream.mapBatch[Int](_ => throw dummy).toListL.runAsync

s.tick()
assertEquals(result.value, Some(Failure(dummy)))
assert(isCanceled, "isCanceled should be true")
}

test("Iterant[Task].nextCursor.mapBatch guards against direct user code errors") { implicit s =>
val dummy = DummyException("dummy")
var isCanceled = false

val stream = Iterant[Task].nextCursorS(BatchCursor(1, 2, 3), Task(Iterant[Task].empty[Int]), Task {
isCanceled = true
})
val result = stream.mapBatch[Int](_ => throw dummy).toListL.runAsync

s.tick()
assertEquals(result.value, Some(Failure(dummy)))
assert(isCanceled, "isCanceled should be true")
}

test("Iterant[Task].mapBatch should protect against direct exceptions") { implicit s =>
check2 { (l: List[Int], idx: Int) =>
val dummy = DummyException("dummy")
var effect = 0

val list = if (l.isEmpty) List(1) else l
val iterant = arbitraryListToIterant[Task, Int](list, idx)
val received = (iterant ++ Iterant[Task].of(1, 2))
.doOnEarlyStop(Task.eval {
effect += 1
})
.mapBatch[Int](_ => throw dummy)
.completeL.map(_ => 0)
.onErrorRecover { case _: DummyException => effect }

received <-> Task.pure(1)
}
}

test("Iterant[Task].mapBatch should protect against broken batches") { implicit s =>
check1 { (prefix: Iterant[Task, Int]) =>
val dummy = DummyException("dummy")
val cursor = new ThrowExceptionCursor(dummy)
val error = Iterant[Task].nextCursorS(cursor, Task.now(Iterant[Task].empty[Int]), Task.unit)
val stream = (prefix.onErrorIgnore ++ error).mapBatch(Batch.apply(_))
stream <-> prefix.onErrorIgnore ++ Iterant[Task].haltS[Int](Some(dummy))
}
}

test("Iterant[Task].mapBatch protects against broken cursors") { implicit s =>
check1 { (iter: Iterant[Task, Int]) =>
val dummy = DummyException("dummy")
val suffix = Iterant[Task].nextCursorS[Int](new ThrowExceptionCursor(dummy), Task.now(Iterant[Task].empty), Task.unit)
val stream = iter.onErrorIgnore ++ suffix
val received = stream.mapBatch(Batch.apply(_))
received <-> iter.onErrorIgnore.mapBatch(Batch.apply(_)) ++ Iterant[Task].haltS[Int](Some(dummy))
}
}

test("Iterant[Task].mapBatch should protect against broken generators") { implicit s =>
check1 { (prefix: Iterant[Task, Int]) =>
val dummy = DummyException("dummy")
val cursor = new ThrowExceptionBatch(dummy)
val error = Iterant[Task].nextBatchS(cursor, Task.now(Iterant[Task].empty[Int]), Task.unit)
val stream = (prefix.onErrorIgnore ++ error).mapBatch(Batch.apply(_))
stream <-> prefix.onErrorIgnore ++ Iterant[Task].haltS[Int](Some(dummy))
}
}

test("Iterant[Task].mapBatch suspends side effects") { implicit s =>
check1 { stream: Iterant[Task, Int] =>
stream.mapBatch(Batch.apply(_)) <-> stream.mapBatch(Batch.apply(_))
}
}

test("Iterant[Coeval].mapBatch works for infinite cursors") { implicit s =>
check1 { (el: Int) =>
val stream = Iterant[Coeval].nextCursorS(BatchCursor.continually(el), Coeval.now(Iterant[Coeval].empty[Int]), Coeval.unit)
val received = stream.mapBatch(Batch.apply(_)).take(10).toListL
val expected = Coeval(Stream.continually(el).take(10).toList)

received <-> expected
}
}

test("Iterant[Coeval].mapBatch triggers early stop on exception") { _ =>
check1 { (iter: Iterant[Coeval, Int]) =>
val cancelable = BooleanCancelable()
val dummy = DummyException("dummy")
val suffix = Iterant[Coeval].nextCursorS[Int](new ThrowExceptionCursor(dummy), Coeval.now(Iterant[Coeval].empty), Coeval.unit)
val stream = (iter.onErrorIgnore ++ suffix).doOnEarlyStop(Coeval.eval(cancelable.cancel()))

intercept[DummyException] {
stream.mapBatch(Batch.apply(_)).toListL.value
}
cancelable.isCanceled
}
}

test("Iterant[Coeval].mapBatch can handle errors") { implicit s =>
val dummy = DummyException("dummy")
val stream = Iterant[Coeval].raiseError[Int](dummy)
assertEquals(stream, stream.mapBatch(Batch.apply(_)))
}

test("Iterant[Coeval].next.mapBatch guards against direct user code errors") { _ =>
val dummy = DummyException("dummy")
var isCanceled = false

val stream = Iterant[Coeval].nextS(1, Coeval(Iterant[Coeval].empty[Int]), Coeval {
isCanceled = true
})
val result = stream.mapBatch[Int](_ => throw dummy).toListL.runTry

assertEquals(result, Failure(dummy))
assert(isCanceled, "isCanceled should be true")
}

test("Iterant[Coeval].nextCursor.mapBatch guards against direct user code errors") { _ =>
val dummy = DummyException("dummy")
var isCanceled = false

val stream = Iterant[Coeval].nextCursorS(BatchCursor(1, 2, 3), Coeval(Iterant[Coeval].empty[Int]), Coeval {
isCanceled = true
})
val result = stream.mapBatch[Int](_ => throw dummy).toListL.runTry

assertEquals(result, Failure(dummy))
assert(isCanceled, "isCanceled should be true")
}

test("Iterant[Coeval].mapBatch should protect against direct exceptions") { implicit s =>
check2 { (l: List[Int], idx: Int) =>
val dummy = DummyException("dummy")
val list = if (l.isEmpty) List(1) else l
val iterant = arbitraryListToIterant[Coeval, Int](list, idx)
val received = (iterant ++ Iterant[Coeval].now(1)).mapBatch[Int](_ => throw dummy)
received <-> Iterant[Coeval].haltS[Int](Some(dummy))
}
}

test("Iterant[Coeval].mapBatch should protect against broken batches") { implicit s =>
check1 { (prefix: Iterant[Coeval, Int]) =>
val dummy = DummyException("dummy")
val cursor: BatchCursor[Int] = new ThrowExceptionCursor(dummy)
val error = Iterant[Coeval].nextCursorS(cursor, Coeval.now(Iterant[Coeval].empty[Int]), Coeval.unit)
val stream = (prefix ++ error).mapBatch(Batch.apply(_))
stream <-> prefix ++ Iterant[Coeval].haltS[Int](Some(dummy))
}
}

test("Iterant[Coeval].mapBatch should protect against broken generators") { implicit s =>
check1 { (prefix: Iterant[Coeval, Int]) =>
val dummy = DummyException("dummy")
val cursor: Batch[Int] = new ThrowExceptionBatch(dummy)
val error = Iterant[Coeval].nextBatchS(cursor, Coeval.now(Iterant[Coeval].empty[Int]), Coeval.unit)
val stream = (prefix ++ error).mapBatch(Batch.apply(_))
stream <-> prefix ++ Iterant[Coeval].haltS[Int](Some(dummy))
}
}

test("Iterant[Coeval].mapBatch preserves the source earlyStop") { implicit s =>
var effect = 0
val stop = Coeval.eval(effect += 1)
val source = Iterant[Coeval].nextCursorS(BatchCursor(1, 2, 3), Coeval.now(Iterant[Coeval].empty[Int]), stop)
val stream = source.mapBatch(Batch.apply(_))
stream.earlyStop.value
assertEquals(effect, 1)
}
}