New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mapBatch for Iterant #622

Merged
merged 5 commits into from Mar 30, 2018

Conversation

Projects
None yet
2 participants
@Avasil
Collaborator

Avasil commented Mar 25, 2018

Closes #617

@codecov

This comment has been minimized.

codecov bot commented Mar 25, 2018

Codecov Report

Merging #622 into master will increase coverage by 0.02%.
The diff coverage is 94.11%.

@@            Coverage Diff             @@
##           master     #622      +/-   ##
==========================================
+ Coverage   90.89%   90.92%   +0.02%     
==========================================
  Files         377      378       +1     
  Lines        9984    10001      +17     
  Branches     1879     1887       +8     
==========================================
+ Hits         9075     9093      +18     
+ Misses        909      908       -1
@Avasil

This comment has been minimized.

Collaborator

Avasil commented Mar 25, 2018

btw I've noticed bugs with following:

val stream = Iterant[Task].nextBatchS(Batch.fromSeq(List(elem, elem), 2), Task.delay(Iterant[Task].lastS(elem)), Task.unit)
val f: Int => List[Int] = List.fill(3)(_)

so I'm trying to fix it right now

// protects against infinite cursors
while (toProcess > 0 && cursor.hasNext()) {
val cursorB = f(cursor.next()).cursor()
while (toProcess > 0 && cursorB.hasNext()) {

This comment has been minimized.

@alexandru

alexandru Mar 26, 2018

Member

When that mapping function gives you a Batch, there's no point in splitting it any further, because the Batch is already built for you and you can just serve it as is in a NextBatch node. All operations need to handle fairness concerns via recommendedBatchSize, so even if you serve bigger batches via this function, it doesn't matter.

The point of mapBatch is to be used in certain cases for optimizing a stream, e.g. for turning an Iterant[F, Array[A]] into an Iterant[F, A]. If we end up splitting those batches, then it becomes too heavy. At that point we might as well work with flatMap directly:

def mapBatch[F[_] : Sync, A](fa: Iterant[F, A])(f: A => Batch[B]): Iterant[F, B] =
  fa.flatMap { a =>
    Iterant.nextBatchS(f(a), Iterant.empty, F.unit)
  }

So here what I see happening is no batched processing at all, e.g..

val batch = f(cursor.next()).cursor()
val next = 
  if (cursor.hasNext()) NextCursor(cursor, rest.map(loop), stop)
  else rest.map(loop)

NextBatch(batch, next, stop)
def processBatch(ref: NextCursor[F, A]): NextBatch[F, B] = {
val NextCursor(cursor, rest, stop) = ref
val batch = if(cursor.hasNext()) f(cursor.next()) else Batch[B]()

This comment has been minimized.

@Avasil

Avasil Mar 26, 2018

Collaborator

@alexandru
I've changed it to what you suggested but I'm not sure about this case. Is it ok to handle it like this?
Alternatively I could return Suspend when I get empty cursor

This comment has been minimized.

@alexandru

alexandru Mar 29, 2018

Member

Sorry for not replying, I'm busy these days at work.

A Suspend is better than an empty NextBatch. Although it's not a major issue, since they should be equivalent.

The rest looks pretty good. Do this small change and it's ready to go.

@alexandru alexandru merged commit a90f686 into monix:master Mar 30, 2018

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details

@Avasil Avasil deleted the Avasil:iterant-mapBatch branch Sep 8, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment