Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added SeqBuffer<T> class as a replacement for some of Seq.toList() calls #305

Merged
merged 1 commit into from
Jul 28, 2017

Conversation

tlinkowski
Copy link

I propose a solution to the problems described in #195 for the following methods:

  • crossSelfJoin() (crossJoin() needs to be adapted by @lukaseder in jOOQ-tools)
  • inner(Self)Join()
  • leftOuter(Self)Join()

The solution for nearly all the remaining methods will consist in applying Seq.lazy() to them for which I'll create a separate PR in order to demonstrate its usefulness, as discussed in #302.


PS. SeqBuffer could be used to create a simpler implementation of Seq.duplicate.
PPS. SeqBuffer could also be used to implement method Seq.duplicate(int count). Such method would return a List<Seq<T>> containing count Seqs. However, the only application for such method that I can think of is implementing #49 which (to be honest) I don't find very useful either.

This was referenced Apr 27, 2017
tlinkowski pushed a commit to tlinkowski/jOOL that referenced this pull request Apr 30, 2017
tlinkowski pushed a commit to tlinkowski/jOOL that referenced this pull request Apr 30, 2017
@tlinkowski
Copy link
Author

I added a new implementation of Seq.splitAt [#308] that uses the proposed SeqBuffer.

@lukaseder
Copy link
Member

That looks very interesting, thank you very much for your suggestion. I'll review this later (hopefully in the next 2 days). We'll definitely need a reusable SeqBuffer.

tlinkowski pushed a commit to tlinkowski/jOOL that referenced this pull request May 1, 2017
tlinkowski pushed a commit to tlinkowski/jOOL that referenced this pull request May 1, 2017
}

private final Spliterator<T> source;
private final List<T> buffer = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this buffer is not thread safe. It may well be that one consumer of a buffered result consumes the buffer while another consumer produces new elements at the same time. E.g., this could happen in your splitAt() implementation suggestion here: b345bbb

v2.map(t -> t.v1)
));
SeqBuffer<T> buffer = SeqBuffer.of(stream);
return tuple(buffer.seq().limit(position), buffer.seq().skip(position));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much nicer, indeed. Only caveat: Thread safety in the current SeqBuffer implementation

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a side note: do you realize that the current implementation of Seq.splitAt (as well as Seq.partition) is not thread safe? :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, splitAt() doesn't look wrong to me, but partition may well be.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was under the impression that Seq wasn't thread safe to begin with or was it just that it will not be a parallel stream?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's currently no guarantee in the API, as there are too many flaws, still. But in principle, the results from methods like splitAt() (which produce several Seq) should be thread safe. Or at least, we should have an option for them to be thread safe.

Perhaps this is a concept that should be reviewed more globally.

Note that thread safety and parallelism aren't the same thing in this context. Stream's parallelism allows for parallel processing of operations like map() or filter(). By keeping operations independent of one another, parallelism can drastically speed up the processing of a stream. Thread safety in our context just means that consuming two things (e.g. the splitAt() results) on different threads, we don't want to get wrong results.

But again, perhaps we should implement this more thoroughly, with a specific thread safety flag...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, Seq.splitAt was directly based on Seq.partition :) And Seq.partition definitely isn't thread-safe because both Seqs write to buffer1 and buffer2 without any synchronization.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@billoneil If I understand it right I think @lukaseder distinguishes between two things:

  • single Seq is sequential so as a consequence it's not thread safe and two threads cannot operate on the same Seq
  • there seems to be no restriction to consuming one Seq on one thread and another Seq on the other thread even if they both "derive" from the same Seq (hence need for thread safety in such cases)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I wasn't thinking, makes sense.

@lukaseder lukaseder added this to the Version 0.9.13 milestone May 1, 2017
tlinkowski pushed a commit to tlinkowski/jOOL that referenced this pull request May 2, 2017
tlinkowski pushed a commit to tlinkowski/jOOL that referenced this pull request May 2, 2017
Copy link
Author

@tlinkowski tlinkowski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lukaseder, here's my shot at thread-safety of SeqBuffer. Let me know what you think of it.

tlinkowski pushed a commit to tlinkowski/jOOL that referenced this pull request May 2, 2017
if (estimateSize > MAX_ARRAY_SIZE)
throw new IllegalArgumentException("Stream is too long to be buffered: " + estimateSize);

return new ArrayList<>((int) estimateSize);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, no I don't agree with this. If we create a large capacitied array list, we're back to the original solution that you wanted to avoid: Up-front large memory consumption for intermediate buffers. Imagine:

Seq.seq(Collections.nCopies(1000000, "value")).splitAt(999999).v1.limit(1).forEach(System.out::println);

The splitAt() operation would create a large array list only to consume its first element...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha, you're right! I'll revert it.

PS. We wouldn't be entirely back to square one because I mainly wanted to avoid up-front processor consumption (i.e. consuming entire Seq) but you're absolutely right that it's best to also avoid up-front memory consumption.

tlinkowski pushed a commit to tlinkowski/jOOL that referenced this pull request May 3, 2017
…stimateSize()" because it introduced unnecessary up-front memory consumption

Reverted from commit f54482e
@lukaseder
Copy link
Member

Thanks for the fix. Now, I wonder again if it would be possible to squash all commits into one for this PR (GitHub has such a feature). That would make the final review much simpler for me.

[jOOQ#195] Applied SeqBuffer to certain Seq method implementations
[jOOQ#122] Tread-safe Seq.duplicate()
@tlinkowski
Copy link
Author

Done.

PS. I couldn't find such feature (available to the PR author) anywhere on GitHub so I squashed them manually.

@lukaseder
Copy link
Member

PS. I couldn't find such feature (available to the PR author) anywhere on GitHub so I squashed them manually.

Oh, interesting. That was a wrong assumption then, sorry. I can indeed squash and merge, but not squash, review again, then merge... Will send a feature request to GitHub.

Thanks very much!


return tuple(seq(new Duplicate()), seq(new Duplicate()));
SeqBuffer<T> buffer = SeqBuffer.of(stream);
return tuple(buffer.seq(), buffer.seq());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I see. Much simpler, sure, but the flip side of this implementation is that if both duplicates are consumed at the same speed, we're wasting a lot of memory for a buffer that might no longer be needed. What are your thoughts on this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're absolutely right. On the other hand, current implementation is thread safe thanks to SeqBuffer, and the previous one wasn't.

All in all, I'd be more inclined to this simple implementation but I guess I'd leave an appropriate comment in this method in case someone reports any memory-related issues in the future.

v2.map(t -> t.v1)
));
SeqBuffer<T> buffer = SeqBuffer.of(stream);
return tuple(buffer.seq().limit(position), buffer.seq().skip(position));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, splitAt() doesn't look wrong to me, but partition may well be.

@lukaseder lukaseder merged commit 10741dd into jOOQ:master Jul 28, 2017
@lukaseder
Copy link
Member

Thanks again for this change. I'll review this later in detail, and will let you know.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants