New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implementation for Iterant#dropLast #509

Merged
merged 9 commits into from Jan 11, 2018

Conversation

Projects
None yet
2 participants
@Avasil
Collaborator

Avasil commented Jan 9, 2018

Fixes #495.

There are a lot of mutable things but it should mantain referential transparency if I'm not missing anything.

I would appreciate feedback about used data structures in particular. I have tried to make it sort of efficient but I might have overcomplicated it. :D

@codecov

This comment has been minimized.

codecov bot commented Jan 9, 2018

Codecov Report

Merging #509 into master will increase coverage by 0.05%.
The diff coverage is 100%.

@@            Coverage Diff             @@
##           master     #509      +/-   ##
==========================================
+ Coverage   89.98%   90.04%   +0.05%     
==========================================
  Files         357      358       +1     
  Lines        9295     9338      +43     
  Branches     1787     1792       +5     
==========================================
+ Hits         8364     8408      +44     
+ Misses        931      930       -1

Avasil added some commits Jan 9, 2018

while (toProcess > 0 && cursor.hasNext()) {
queue.enqueue(cursor.next())
toProcess -= 1
if (queueLength >= toDrop && bufferLength < limit) {

This comment has been minimized.

@alexandru

alexandru Jan 10, 2018

Member

I don't think bufferLength can ever go over limit, since you already guard for toProcess > 0. I might be mistaken, but I think you can get rid of both bufferLength and limit.

This comment has been minimized.

@Avasil

Avasil Jan 10, 2018

Collaborator

I think you're right, about 15 minutes ago I realized I have bugs related to infinite cursors and it became irrelevant but I've forgotten to get rid of it, will do it in the evening, thanks

}
}
source match {

This comment has been minimized.

@alexandru

alexandru Jan 10, 2018

Member

Well, if you're going to suspend the whole thing, you don't need a match.

*/
def apply[F[_], A](source: Iterant[F, A], n: Int)(implicit F: Sync[F]): Iterant[F, A] = {
def processCursor(toDrop: Int, length: Int, queue: mutable.Queue[A],

This comment has been minimized.

@alexandru

alexandru Jan 10, 2018

Member

Using a mutable data structure, if properly encapsulated, is fine in Monix's codebase 😀

However I try to avoid optimizations without having some proof that they work. Because if say this optimization here only brings marginal improvements, then we shouldn't do it because an immutable Queue is more reasonable.

So you can either revert to an immutable Queue for now, or make a JMH test in benchmarks that proves a mutable queue is better for a stream of 100000 events where you're dropping the last 1, 10 or 50 items lets say.

Don't commit the benchmark, just something to report back that indeed this is worth it.

This comment has been minimized.

@Avasil

Avasil Jan 10, 2018

Collaborator

I generally prefer to follow the same rule but I feel like operating on immutable.Queue is pretty weird, although it might be better if there is implementation similar to how State works.

Below are results from benchmarks but please tell me if I did some benchmarking "faux pas", I know there are a lot of catches which make results false.

Results:

jmh:run -i 50 -wi 10 -f 2 -t 1 monix.benchmarks.IterantDropLastBenchmark

mutable.Queue
[info] Benchmark                         Mode  Cnt   Score    Error  Units
[info] IterantDropLastBenchmark.drop1   thrpt  100  73,519 â–’ 4,664  ops/s
[info] IterantDropLastBenchmark.drop10  thrpt  100  79,834 â–’ 1,388  ops/s
[info] IterantDropLastBenchmark.drop50  thrpt  100  76,795 â–’ 3,076  ops/s

immutable.Queue
[info] Benchmark                         Mode  Cnt    Score   Error  Units
[info] IterantDropLastBenchmark.drop1   thrpt  100  113,273 â–’ 2,272  ops/s
[info] IterantDropLastBenchmark.drop10  thrpt  100  117,181 â–’ 1,738  ops/s
[info] IterantDropLastBenchmark.drop50  thrpt  100  119,765 â–’ 1,839  ops/s

Assuming those results are true... well that's a good lesson :D I was sure that all those extra tuple allocations in immutable.Queue would show in performance. I guess immutable.Queue is better implemented?

Benchmark code:

@State(Scope.Thread)
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.SECONDS)
class IterantDropLastBenchmark {

  @Benchmark
  def drop1(): List[Int] = IterantDropLastBenchmark.test(100000, 1)

  @Benchmark
  def drop10(): List[Int] = IterantDropLastBenchmark.test(100000, 1)

  @Benchmark
  def drop50(): List[Int] = IterantDropLastBenchmark.test(100000, 1)

}

object IterantDropLastBenchmark {
  def test(size: Int, toDrop: Int): List[Int] = {
    val list = List.range(1, size)
    val stream = Iterant[Task].fromList(list)

    Await.result(stream.dropLast(toDrop).toListL.runAsync, Duration.Inf)
  }
}

This is code for Iterant#dropLast using immutable.Queue (I have left it as a mutable-immutable hybrid monster to make this comparison more fair):

package monix.tail.internal

import cats.effect.Sync
import cats.syntax.all._
import monix.execution.misc.NonFatal
import monix.tail.Iterant
import monix.tail.Iterant.{Halt, Last, Next, NextBatch, NextCursor, Suspend}
import monix.tail.batches.{Batch, BatchCursor}

import scala.collection.immutable.Queue
import scala.collection.mutable

private[tail] object IterantDropLast {
  /**
    * Implementation for `Iterant#dropLast`
    */
  def apply[F[_], A](source: Iterant[F, A], n: Int)(implicit F: Sync[F]): Iterant[F, A] = {

    def processCursor(toDrop: Int, length: Int, queue: Queue[A],
                      ref: NextCursor[F, A]): Iterant[F, A] = {
      val NextCursor(cursor, rest, stop) = ref
      val limit = cursor.recommendedBatchSize
      val buffer = mutable.Buffer[A]()
      var queueLength = length

      @tailrec
      def queueLoop(queue: Queue[A], toProcess: Int): Queue[A] = {
        if (!(toProcess > 0 && cursor.hasNext())) {
          queue
        } else {
          val newQueue = queue.enqueue(cursor.next())
          if (queueLength >= toDrop) {
            val (a, nextQueue) = newQueue.dequeue
            buffer.append(a)
            queueLoop(nextQueue, toProcess - 1)
          } else {
            queueLength += 1
            queueLoop(newQueue, toProcess - 1)
          }
        }
      }
      val finalQueue = queueLoop(queue, limit)

      val next: F[Iterant[F, A]] = if (cursor.hasNext()) F.pure(ref) else rest
      NextBatch(Batch.fromSeq(buffer), next.map(loop(toDrop, queueLength, finalQueue)), stop)
    }

    def finalCursor(toDrop: Int, length: Int, queue: Queue[A]): Iterant[F, A] = {
      var queueLength = length
      val buffer = mutable.Buffer[A]()

      @tailrec
      def queueLoop(queue: Queue[A]): Queue[A] = {
        if (queueLength <= toDrop) {
          queue
        } else {
            val (a, nextQueue) = queue.dequeue
            buffer.append(a)
            queueLength -= 1
          queueLoop(nextQueue)
        }
      }
      queueLoop(queue)

      val cursor = BatchCursor.fromSeq(buffer)
      NextCursor(cursor, F.pure(Halt(None)), F.unit)
    }

    def loop(toDrop: Int, length: Int, queue: Queue[A])
            (source: Iterant[F, A]): Iterant[F, A] = {
      try if (toDrop <= 0) source else source match {
        case Next(elem, rest, stop) =>
          val newQueue = queue.enqueue(elem)
          if (length >= toDrop){
            val (next, nextQueue) = newQueue.dequeue
            Next(next, rest.map(loop(toDrop, length, nextQueue)), stop)
          }
          else Suspend(rest.map(loop(toDrop, length + 1, newQueue)), stop)
        case ref@NextCursor(_, _, _) =>
          processCursor(toDrop, length, queue, ref)
        case NextBatch(batch, rest, stop) =>
          processCursor(toDrop, length, queue, NextCursor(batch.cursor(), rest, stop))
        case Suspend(rest, stop) =>
          Suspend(rest.map(loop(toDrop, length, queue)), stop)
        case Last(item) =>
          finalCursor(toDrop, length + 1, queue.enqueue(item))
        case Halt(None) =>
          finalCursor(toDrop, length, queue)
        case halt@Halt(Some(_)) =>
          halt
      } catch {
        case ex if NonFatal(ex) =>
          val stop = source.earlyStop
          Suspend(stop.map(_ => Halt(Some(ex))), stop)
      }
    }

    // Suspending execution, because pushing into our queue
    // is side-effecting
    Suspend(F.delay(loop(n, 0, Queue.empty[A])(source)), source.earlyStop)
  }
}
@alexandru

This comment has been minimized.

Member

alexandru commented Jan 10, 2018

Thanks @Avasil, see my comments.

@Avasil

This comment has been minimized.

Collaborator

Avasil commented Jan 10, 2018

btw @alexandru I'm encountering weird behaviour in tests

If I add this to IterantDropLastSuite:

  test("Iterant.dropLast works for NextCursor") { implicit s =>
    check2 { (list: List[Int], nr: Int) =>
      val stream = Iterant[Task].nextCursorS(BatchCursor(list: _*), Task.now(Iterant[Task].empty[Int]), Task.unit).onErrorIgnore
      val n = if (nr == 0) 0 else math.abs(math.abs(nr) % 20)

      val stream1 = stream.dropLast(n).toListL
      val stream2 = stream.toListL.map(_.dropRight(n))

      stream1.foreach(x => println(s"s1 $x"))
      stream2.foreach(x => println(s"s2 $x"))

      stream1 <-> stream2
    }
  }

it passes but println returns:

s2 List()
s1 List(1297675174, 624554472, -905296189, 2147483647, 837085907)
s2 List()
s1 List(-761977668, -2047144408, -1, -2147483648, -1, -2147483648, 1, -2147483648, 0, -2147483648, 2147483647, 220996074, -308602378, 0, 1, 1, 0, -1, 1, 2137626787)
s2 List()

which should fail.
If I remove those foreach lines from tests - they fail.

@alexandru

This comment has been minimized.

Member

alexandru commented Jan 10, 2018

@Avasil make sure you suspend execution for side effects, because if evaluating a stream once changes the outcome of its second evaluation it means you probably have unsuspended side effects

@alexandru

This comment has been minimized.

Member

alexandru commented Jan 10, 2018

Speaking of which, a test like stream <-> stream would be good to guard against this.

@Avasil

This comment has been minimized.

Collaborator

Avasil commented Jan 10, 2018

That explains why second list is always empty, I will investigate in this direction.

@Avasil

This comment has been minimized.

Collaborator

Avasil commented Jan 11, 2018

@alexandru
I have investigated it a bit more and it seems it is a bigger problem unless this is desired behavior for BatchCursor.

Tests like that:

  test("Iterant.takeLast suspends side effects for NextCursor") { implicit s =>
    check2 { (list: List[Int], nr: Int) =>
      val n = if (nr == 0) 0 else math.abs(math.abs(nr) % 20)
      val stream = Iterant[Task].nextCursorS(BatchCursor(list: _*), Task.now(Iterant[Task].empty[Int]), Task.unit)
        .onErrorIgnore

      val s1 = stream.takeLast(n).toListL

      s1 <-> s1
    }
  }

fail not only for my Iterant.dropLast implementation but also Iterant.takeLast, Iterant.map and probably everything else. Is it how it is supposed to be?

@alexandru

This comment has been minimized.

Member

alexandru commented Jan 11, 2018

@alexandru

This comment has been minimized.

Member

alexandru commented Jan 11, 2018

Btw, what did you decide about the mutable versus immutable Queue.
Are we going forward with the immutable one or not?

@Avasil

This comment has been minimized.

Collaborator

Avasil commented Jan 11, 2018

@alexandru I wasn't sure about this Iterant behavior and was worried I have buggy code.

According to benchmarks there is no performance reason to choose mutable Queue so I'll go with immutable one as you suggested. Unless I did some mistake in benchmark?

@alexandru

This comment has been minimized.

Member

alexandru commented Jan 11, 2018

Going with the immutable one is the right approach.

Queues are special anyway. Mutable data structures have considerable performance benefits when backed by arrays, but the mutable queue you've used is probably not backed by arrays. An efficient implementation would be some sort of ArrayList. And in our case, given that the size of the queue used is fixed (n + 1), for performance you can do better by usage of a "circular buffer", which uses a fixed array internally, with no other heap allocations.

If you'd like to play, we actually have one in DropHeadOnOverflowQueue — it's a simple thing really, should be standard.

But I wouldn't sweat it, performance improvements should come as a result of benchmarking, which is freaking annoying and I'm perfectly happy with an implementation based on an immutable Queue right now.

I'm leaving the choice to you btw, you're the owner 😀

@Avasil

This comment has been minimized.

Collaborator

Avasil commented Jan 11, 2018

@alexandru I'd love to play with something like that, even if just to learn something new but IMO it is better for the project If I focus my "resources" on helping with 3.0.0 release which seems pretty close seeing how many people came to contribute!

@alexandru

This comment has been minimized.

Member

alexandru commented Jan 11, 2018

Thanks @Avasil, looking good.

@alexandru alexandru merged commit 4b19ec4 into monix:master Jan 11, 2018

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details

@Avasil Avasil deleted the Avasil:Iterant.dropLast branch Jan 21, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment