Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add generic TypedPipe optimization rules #1724

Merged
merged 11 commits into from
Oct 3, 2017
Merged

Conversation

johnynek
Copy link
Collaborator

An example of optimization rules and how they can simplify and factor our implementation.

This also helps motivate #1718

cc @non @piyushnarang @ianoc


/**
* In map-reduce settings, Merge is almost free in two contexts:
* 1. the final write
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rule i'm not sure in a MR context what the gain is. I think deferring the merge is only useful for filters maybe? since if you have two steps

M -> R1
M -> R2
and do some (R1 ++ R2).filter(...) we know it should be good. But doing flatmaps and maps pushed in there isn't as clear of a win to me?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as I mentioned in the comment, if you can defer the merge until a groupBy, scalding can skip doing any work. In tez, if not, that is actually an extra materialization. On MR, cascading applies this same optimization under the hood (but not, for some reason, with Tez).

Also, clearly, for writes, it is a win.

Note, each tuple is only one one side or the other, by pushing it lower (closer to sinks) in the graphs, we don't do more work. So it is hard to see how it hurts also.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which side do the functions goto in a group by, this might change how things get planned on MR negatively?

I.e.

M -> R1
M -> R2
(R1 ++ R2).flatMap { huge data cubing function}.sumByKey

If the flat map is pushed through the merge its more likely it'll end up on the reducers and you'll write a lot more data. This isn't entirely under the control of scalding in general ofc, but this sort of rule could change how the DAG is exposed to cascading and cause odd regressions?

Ah so the notion here is that if you merge 2 sources say from disk, then do some map operations and a group by. Under tez cascading will introduce an extra step to do the merge pre-map operations?

For writes its a win under normal MR? is it?

given only 1 set of mappers or reducers will write to any sink won't the effects be sort of random depending on data sizes?

M -> R(f1,f2) ------> MergeMapperToSink
M -> R2(f1,f2) ---/

M -> R ------> MergeMapperToSink(f1,f2)
M -> R2 ---/

Which one of these is better largely depends on the shape of f1 and f2 no? (i.e. input + output data types and how well they serialize/compress, if they expand the data or similar?)

Some examples would be something like writing ngrams from multiple sources, in this case you'd want to merge to be late since you actually write more tuples possibly than you had in either of the previous steps. (Not so common granted, but I did it recently so comes to mind).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ianoc - yeah I think I ran into the merge + map + groupBy issue in cascading 3 - cwensel/cascading#59. From what I remember there was an extra merge node.


/**
* This is an optimization we didn't do in scalding 0.17 and earlier
* because .toTypedPipe on the group totally hid the structure from
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice! this is a great optimization to get in

@ianoc
Copy link
Collaborator

ianoc commented Sep 25, 2017

I presume our existing test coverage really validates these are fine changes to make from a correctness POV. Do you plan to merge this, if so any thoughts if we need tests to say the rules do what the rules say they do?

@johnynek
Copy link
Collaborator Author

@ianoc I think we should have tests for any rule.

To do this, we probably want to back all all rules from the default cascading planner so it just does a literal translation of scalding -> cascading. Then the test is, before and after the rule, the result is the same after we plan it to cascading and run it.

So, this is just hear to show a set of rules to motivate merging #1718 which I think is the high priority thing.

@johnynek
Copy link
Collaborator Author

johnynek commented Sep 26, 2017 via email

Copy link
Collaborator

@piyushnarang piyushnarang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't checked out these changes and run them so maybe this is a trivial question, do we log out the various optimization rules applied? I'm guessing it might be useful for users to see what set of optimization rules got applied to their Scalding job / execution graph. (Could follow this up in a future PR as we don't have it as of today).

@@ -334,11 +334,218 @@ object OptimizationRules {
Binary(recurse(hj.left), rightLit,
{ (ltp: TypedPipe[(K, V)], rtp: TypedPipe[(K, V2)]) =>
rtp match {
case ReduceStepPipe(hg: HashJoinable[K, V2]) =>
case ReduceStepPipe(hg: HashJoinable[K @unchecked, V2 @unchecked]) =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the @unchecked something we can move to the start of the case?

//
/////////////////////////////

object ComposeFlatMap extends PartialRule[TypedPipe] {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering if it would be a good idea to include a short snippet of code as a comment on the top of each rule showing which scenario it gets applied in?

* This is arguably not a great idea, but scalding has always
* done it to minimize accidental map-reduce steps
*/
object IgnoreNoOpGroup extends PartialRule[TypedPipe] {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something that has tripped a lot of first time users of scalding. Should we add a warning log line when this rule is being applied to say that we're ignoring the group?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a warning log line is not a bad idea in general. Not sure the right place to do that. I kind of hate to do it in the optimizer, since you might not even be running the result. Let me think about it.


/**
* In map-reduce settings, Merge is almost free in two contexts:
* 1. the final write
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ianoc - yeah I think I ran into the merge + map + groupBy issue in cascading 3 - cwensel/cascading#59. From what I remember there was an extra merge node.

@johnynek johnynek changed the title [WIP] Add generic TypedPipe optimization rules Add generic TypedPipe optimization rules Oct 1, 2017
@johnynek
Copy link
Collaborator Author

johnynek commented Oct 1, 2017

@ianoc can you take another look?

I have added tests of correctness, which I think are reasonable tests. They don't actually test that something about the graph got better. I want to do that too eventually, but maybe not in the first version. Something like either number of map-reduce steps is <= original or total DAG depth is <= original depth. Cascading doesn't make it trivial do that kind of test, I don't think (I have to print the DAG to a dot file and then parse it I think).

@johnynek johnynek changed the base branch from oscar/use-dagon to develop October 1, 2017 19:16
type JoinFn[K, V, U, R] = (K, Iterator[V], Iterable[U]) => Iterator[R]
type HashJoinFn[K, V, U, R] = (K, V, Iterable[U]) => Iterator[R]

def toCogroupJoiner2[K, V, U, R](hashJoiner: (K, V, Iterable[U]) => Iterator[R]): JoinFn[K, V, U, R] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment on this method? (motivation as much as anything -- not sure why we would normally want to turn a hash join into a JoinFn normally?)

also I think we can use the HashJoinFn[K, V, U,R] type alias as the type of the arg to this method

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do these in the follow up, if that's okay.

We never use this method in fact, but I didn't want to remove it in the interest of being a nice guy to users that may be.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep totes. and makes sense to leave or kill for the obvious reasons to do either

}

/**
* a.onComplete(f).onComplete(g) == a.onComplete { () => f(); g() }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when/where do we use an OnComplete (i.e. given its a side effect do we need to ensure all of these are ran // do try/error handling around each invocation?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we ever do. Users at Twitter used this to make some call to an API after they finished doing some side-effecting map or reduce. Other users have asked for it. I kind of hate the API now since I think it is an anti-pattern, but alas, I don't want to remove it (and we can optimize it, since we can move it to the very end of a map phase commuting with other map operations.

@travisbrown
Copy link
Contributor

@johnynek I'm not terribly familiar with the Cascading APIs but wouldn't Flow#getFlowStats give you the info you need for that test without having to parse the DOT file?

@ianoc
Copy link
Collaborator

ianoc commented Oct 3, 2017

This lgtm

we don't have code coverage for this project right? It might be nice to know how many of the rules our generated dag gives us coverage for. (not a merge blocker)

@johnynek johnynek merged commit 1a52975 into develop Oct 3, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants