Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#296] Added chunked(long), chunked(Predicate<T>) #320

Closed
wants to merge 1 commit into from

Conversation

billoneil
Copy link
Contributor

@billoneil billoneil commented Jul 25, 2017

#296
I have two of the implementations working and they should be truly lazy.

  • chunked(long chunkSize)
  • chunked(Predicate predicate)
  • chunked(Predicate predicate, boolean excludeDelimiter) ?
  • chunkedWithIndex(BiPredicate<T, Long> biPredicate)

I'm not sure happy with how the code turned out. The two iterators are a little dependent on each other. Maybe a better approach would be to make a more top level peeking iterator instead of hacking it like I did.

@billoneil
Copy link
Contributor Author

Refactored it a bit to clean it up. The peeking iterator seems to work. Guava's version special cases remove not sure if that's necessary if its not publicly exposed.

@lukaseder
Copy link
Member

Thank you very much for your suggestion. I'll review shortly

@billoneil
Copy link
Contributor Author

I realized some naming is off takeWhileIterator should probably be takeUntil.

/**
* Created by billoneil on 7/26/17.
*/
public class Iterators {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please make this class package-private

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant to make these package-private oops.


import java.util.Iterator;

public interface PeekingIterator<E> extends Iterator<E> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. This is an internal type and shouldn't be public, in my opinion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, I wonder if this needs to be an interface, given we only have a single implementation so far...

* <p>
* <code><pre>
* // ((1,1,2), (2), (3,4))
* Seq.of(1, 1, 2, 2, 3, 4).chunked(n -> n %2 == 0)
Copy link
Member

@lukaseder lukaseder Jul 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think we should exclude the chunk boundaries by default if using a predicate, and include them only explicitly. When including, the question is whether the boundary should belong to:

  1. The ending chunk ((1, 1, 2), (2), (3, 4), ())
  2. The beginning chunk ((1, 1), (2), (2, 3), (4))
  3. Both chunks (?)
  4. No chunk

Given that there is no clear preference between 1/2 and maybe even 3, the default must be 4. So, an enum might be a reasonable additional argument for this, where "no chunk" would be the default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also thinking an enum would be the appropriate solution for inclusion / exclusion. It makes sense to work for all four.

  1. The beginning chunk ((1, 1), (2), (2, 3), (4))

Should we assume the beginning chunk starts true? or should this actually return ((2), (2, 3), (4)) skipping anything before the first chunk.

  1. No chunk

Would this be excluding the matched element? ((1, 1), (), (3))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we assume the beginning chunk starts true? or should this actually return ((2), (2, 3), (4)) skipping anything before the first chunk.

Egh... :) OK, so in the no chunk case, we get:

  • 1 chunk for 0 true evaluations of the predicate
  • 2 chunks for 1 true evaluation of the predicate
  • 3 chunks for 2 true evaluations of the predicate

For example:

// ((1, 1, 2, 2, 3, 4))
Seq.of(1, 1, 2, 2, 3, 4).chunked(i -> false);

// ((),(),(),(),(),(),())
Seq.of(1, 1, 2, 2, 3, 4).chunked(i -> true);

I think that options 1-3 should adhere to this logic. My example 1 was wrong, there should be a trailing empty chunk. Will fix the comment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hence:

// ((1),(1),(2),(2),(3),(4),())
Seq.of(1, 1, 2, 2, 3, 4).chunked(i -> true, BEFORE);
// ((),(1),(1),(2),(2),(3),(4))
Seq.of(1, 1, 2, 2, 3, 4).chunked(i -> true, AFTER);
// ((1),(1, 1),(1, 2),(2, 2),(2, 3),(3, 4),(4))
Seq.of(1, 1, 2, 2, 3, 4).chunked(i -> true, BOTH);

Still a bit undecided about the BOTH part, although it could be useful and it would be consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could always ignore BOTH for now and easily add it later if requested since it will be an enum.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's do that.

}

@Test(expected = IllegalArgumentException.class)
public void testCountingPredicateThrowsIllegalArg() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, this is probably the reason why you made the class public...

  1. If the tests are placed in the same package, you can access package-private classes.
  2. I'm not too fan of testing internals with unit tests. The internals will be tested transitively by testing the chunked API. With tests on internals, refactoring those internals will be much harder...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they are in the same package so it should have been package-private. The first implementation was must have missed it when I cleaned it up.

  1. I'm not too fan of testing internals with unit tests. The internals will be tested transitively by testing the chunked API. With tests on internals, refactoring those internals will be much harder...

👍 totally agree wasn't sure of your preference so opted for more testing

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm all in for more testing, but high level tests will never break because the API won't change again. Low level tests are a bit of a maintenance burden.

return takeWhileIterator(peeking(iterator), predicate);
}

static <E> Iterator<E> takeWhileIterator(PeekingIterator<E> iterator, Predicate<E> predicate) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a feeling that we keep writing this iterator :) Perhaps, can this be refactored with existing code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to use the Seq.take* but it closed the stream after the predicate matched and we need it to stay open. I will look into a better way to refactor this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant to say: Perhaps Seq.take should use this new iterator. In that case, we should open up several tickets and do things one step at a time...

@billoneil
Copy link
Contributor Author

I will first look into making the TakeIterator and reusing it, then come back to this after. I'll make a new issue and PR once it's ready.

@lukaseder
Copy link
Member

I will first look into making the TakeIterator and reusing it, then come back to this after. I'll make a new issue and PR once it's ready.

Great, looking forward to this. Note: If it's too complex, we don't have to do it. I just thought if you do create new classes and API, it would be nice if they're reusable. Otherwise, I might prefer inlining them where they're really needed.

@lukaseder
Copy link
Member

Btw, there's also some work by @tlinkowski pending review. He's implemented a SeqBuffer. It goes in a similar direction as this TakeIterator: #305

Maybe, have a look at that first, prior to delving into a refactoring...?

@billoneil
Copy link
Contributor Author

Hmm the SeqBuffer doesn't seem like it would work on an infinite sized Seq the iterator would. I'll explore some more over the weekend.

@billoneil
Copy link
Contributor Author

My original use case for this would be the ability to Stream something like a very large file in S3 to a Seq, lazily batch N records, batch process N records, stream to a new S3 file. Ideally it should work on any sized file. So I have been working with the assumption of infinite sized streams.

@tlinkowski
Copy link

I think that for this (#296) and for Seq.duplicate to work optimally, SeqBuffer would need to support the following things:

  • SeqBuffer.shutdown() or some other method - after calling this method no more calls to seq() would be legal (similarly to Java's ExecutorService.submit())
  • field SeqBuffer.activeSpliterators inside SeqBuffer that would store all created BufferSpliterators that haven't yet reached the end of iteration
  • field SeqBuffer.startIndex that would store the index corresponding to the first element in List<T> buffer
  • then, upon each call to SeqBuffer.BufferSpliterator.tryAdvance(), SeqBuffer would be notified by the BufferSpliterator, and if this SeqBuffer has been shut down, it would:
    -- find minimal nextIndex among all activeSpliterators
    -- remove all redundant elements from List<T> buffer
    -- and use this minimal index as a new startIndex
  • additionally, List<T> buffer could be converted to LinkedList upon call to shutdown() to improve removal performance

It's not trivial but it seems doable. What do you think, @billoneil, @lukaseder?

@billoneil
Copy link
Contributor Author

I see two use cases when splitting a Seq

  1. You know the bounds and want to be able to operate on both returned Seq's
  • It would be totally reasonable to buffer the entire Seq (SeqBuffer) into memory and have logic so that both returned Seq's can be read concurrently (even if they are backed by a single Seq
  1. You don't know the bounds and are splitting for some type of chunking.
  • In this case You would be forced to read both Seq's sequentially. If you try to read the second before the first is fully consumed you can either throw an error or forward the first Seq all the way to completion.

They are both very valid but different use cases. I wouldn't know the best way to portray that in an API since they have different contracts. Maybe allow some flags like Seq.buffered vs Seq.lazy as a second argument or different methods to handle different implementations? Maybe the API just picks one way always.

One of the reasons I liked the iterator approach is because it's lazy by default and if you want the values in memory you can always call .toList() on either or both Seq. The issue is what happens if you call .toList() on the second before the first?

This also just reminded me of something when I first brought this up.

Maybe the chunked method should return Seq<List<E>> because what happens if someone does the following.

Seq<Seq<Integer>> seqs = ...
seqs.map(s -> seq.limit(5))

This would mess up sequential streams unles sunder the hood it ignored the limit and traversed the whole iterator.

@billoneil
Copy link
Contributor Author

I'm going to hold off until this is discussed a bit more.

@billoneil
Copy link
Contributor Author

@lukaseder @tlinkowski Either of you have any thoughts here?

@tlinkowski
Copy link

@billoneil Well, I'm under the impression that the modified SeqBuffer that I described above (supporting shutdown method) would absolutely suffice to implement what you described under point no. 2 (BTW. you posted twice almost the same thing - you could delete your first post).

Even more, the advantage of implementing chunked using such a modified SeqBuffer is that you can read the resulting Seqs in any order you like! The only penalty for reading them in a "strange" order are potential performance problems (e.g. if you start reading "from the end", SeqBuffer will have to buffer the entire source Seq into memory). However, if you read these returned "chunk" Seqs in order, SeqBuffer won't actually have to buffer anything! This is because SeqBuffer will know (because it will have been shut down) that the item you've just read from the source Seq won't be accessible from any other Seq. In such scenario, of course, anyone can call anything on the resulting Seqs (e.g. seq.limit(5) that you mentioned) and it doesn't break anything - all calls just forward to SeqBuffer.BufferSpliterator, which (together with SeqBuffer) takes care of what needs to be buffered and what can be discarded

One more thought about the modified SeqBuffer implementation - to construct such chunked Seqs we couldn't just use SeqBuffer.seq().skip(n) because this would yield SeqBuffer.BufferSpliterators with nextIndex = 0, which in turn would lead to a situation where SeqBuffer cannot discard anything until the Spliterators of all the Seqs have been advanced. So we'd need to add some extra method (like SeqBuffer.seqFrom(n)) that would create a SeqBuffer.BufferSpliterator with nextIndex = n.

@billoneil
Copy link
Contributor Author

The only penalty for reading them in a "strange" order are potential performance problems (e.g. if you start reading "from the end", SeqBuffer will have to buffer the entire source Seq into memory)

This sounds reasonable as long as it's documented.

However, if you read these returned "chunk" Seqs in order, SeqBuffer won't actually have to buffer anything!

I might have missed this part when I read the source the first time. This does indeed seem like it can cover all use cases.

In such scenario, of course, anyone can call anything on the resulting Seqs (e.g. seq.limit(5) that you mentioned) and it doesn't break anything - all calls just forward to SeqBuffer.BufferSpliterator, which (together with SeqBuffer) takes care of what needs to be buffered and what can be discarded

I think this would also work and it just needs to be documented since it could be doing extra work under the hood than expected.

@tlinkowski
Copy link

@billoneil You haven't missed anything in the source code :) I might have been imprecise but please note that in my most recent comment I was referring to my previous comment that contains only a proposal of adapting SeqBuffer to do what is needed to efficiently implement chunked. But it's just an idea - there's no implementation yet, although I'd gladly provide one if @lukaseder confirms that he likes the idea and is willing to have this implemented.

@lukaseder
Copy link
Member

The PR got closed by github due to a recent rename of the main branch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants