-
Notifications
You must be signed in to change notification settings - Fork 703
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enforce invariant: mapGroup iterators all nonempty #1072
Conversation
@@ -146,7 +150,7 @@ case class IdentityReduce[K, V1]( | |||
UnsortedIdentityReduce(keyOrdering, mapped.filterKeys(fn), reducers) | |||
|
|||
override def mapGroup[V3](fn: (K, Iterator[V1]) => Iterator[V3]) = | |||
IteratorMappedReduce(keyOrdering, mapped, fn, reducers) | |||
IteratorMappedReduce(keyOrdering, mapped, Grouped.addEmptyGuard(fn), reducers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could these three really cause a problem? These are on Identity reduces, which mean the keys could not have been removed, I thought. Did you have a test that failed without them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I included a test that fails without this change.
On Oct 1, 2014 8:30 PM, "P. Oscar Boykin" notifications@github.com wrote:
In scalding-core/src/main/scala/com/twitter/scalding/typed/Grouped.scala:
@@ -146,7 +150,7 @@ case class IdentityReduce[K, V1](
UnsortedIdentityReduce(keyOrdering, mapped.filterKeys(fn), reducers)override def mapGroup[V3](fn: %28K, Iterator[V1]%29 => Iterator[V3]) =
- IteratorMappedReduce(keyOrdering, mapped, fn, reducers)
- IteratorMappedReduce(keyOrdering, mapped, Grouped.addEmptyGuard(fn), reducers)
could these three really cause a problem? These are on Identity reduces,
which mean the keys could not have been removed, I thought. Did you have a
test that failed without them?Reply to this email directly or view it on GitHub
https://github.com/twitter/scalding/pull/1072/files#r18317280.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To elaborate, my test breaks without the fix on IdentityReduce, but I think it is prudent to add the guard to enforce the mapGroup invariant that the iterators are empty in all places including UnsortedIdentityReduce and IdentityValueSortedReduce
Let's merge with the comment added back. Thanks for fixing this issue. Was going to bite us again, I guess. |
yep, it came up in one of our jobs. Added comment and refactored to make it clearer. |
// Only pass non-Empty iterators to subsequent functions | ||
if (step1.nonEmpty) fn(k, step1) else Iterator.empty | ||
} | ||
val newReduce = { (k: K, iter: Iterator[V1]) => fn(k, localRed(k, iter)) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this works actually. Consider the case where localRed(k, iter) == Iterator.empty, but fn(k, _) == Iterator(k). The newReduce will not behave as expected. I think your previous patch is correct: If any function maps to Iterator.empty, that is the end of the line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I follow... This refactor makes it so the invariant on
mapGroupNonEmptyIters is that fn(_, Empty) == Empty always
On Mon, Oct 6, 2014 at 12:30 PM, P. Oscar Boykin notifications@github.com
wrote:
In scalding-core/src/main/scala/com/twitter/scalding/typed/Grouped.scala:
// don't make a closure val localRed = reduceFn
- val newReduce = { (k: K, iter: Iterator[V1]) =>
val step1 = localRed(k, iter)
// Only pass non-Empty iterators to subsequent functions
if (step1.nonEmpty) fn(k, step1) else Iterator.empty
- }
- val newReduce = { (k: K, iter: Iterator[V1]) => fn(k, localRed(k, iter)) }
I don't think this works actually. Consider the case where localRed(k,
iter) == Iterator.empty, but fn(k, _) == Iterator(k). The newReduce will
not behave as expected. I think your previous patch is correct: If any
function maps to Iterator.empty, that is the end of the line.Reply to this email directly or view it on GitHub
https://github.com/twitter/scalding/pull/1072/files#r18467483.
I'll undo my refactor put in the comments and update the pull request. |
Can you re-run the build? The error looks like just a timeout not being able to get started on running "test matrix tutorials" |
squashed commits to force it to rebuild... |
This time even more tests failed.. your testing harness appears broken to me. |
Previously not all fixes in mapGroup were covered
Looks good to me. Travis is indeed having some issues (with some frequency). |
Enforce invariant: mapGroup iterators all nonempty
This change didn't make it into 0.12.0rc5, when will 0.12.0rc6 be released? |
@isnotinvain any ideas on when we should publish a new version. This fix was pretty important (we did hit it at Twitter, and I thought I fixed it, but this was a much more comprehensive solution). |
Looks like it is now in 0.12.0. thanks :) On Tue, Nov 25, 2014 at 8:24 PM, P. Oscar Boykin notifications@github.com
|
Yes! Sorry I didn't ping back on this thread. On Thu, Dec 4, 2014 at 10:32 AM, Adam Poswolsky notifications@github.com
Oscar Boykin :: @posco :: http://twitter.com/posco |
np... just glad it's in :) On Thu, Dec 4, 2014 at 3:43 PM, P. Oscar Boykin notifications@github.com
|
Previously not all fixes in mapGroup were covered