Feature/cogroup builder #52

Merged
merged 5 commits into from Apr 6, 2012

Conversation

Projects
None yet
2 participants
Contributor

azymnis commented Apr 5, 2012

Adds a CoGroupBuilder which is used internally to implement joins.

This can also be used for star joins, etc.

@johnynek johnynek commented on an outdated diff Apr 5, 2012

src/main/scala/com/twitter/scalding/JoinAlgorithms.scala
@@ -38,6 +38,10 @@ trait JoinAlgorithms {
def pipe : Pipe
+ def coGroupBy(f : Fields, j : JoinMode = InnerJoinMode)(builder : CoGroupBuilder => GroupBuilder) : Pipe = {
@johnynek

johnynek Apr 5, 2012

Collaborator

Need a comment that this should be called on the larger of the two pipes. Also mention that this is useful mostly for star-join situations or cases where you need aggregation after the cogroup, otherwise joinWithSmaller/joinWithTiny should be preferred (as more compact examples of the same thing).

@johnynek johnynek commented on an outdated diff Apr 5, 2012

src/main/scala/com/twitter/scalding/CoGroupBuilder.scala
+ *
+ */
+class CoGroupBuilder(groupFields : Fields, joinMode : JoinMode) extends GroupBuilder(groupFields) {
+ protected var coGroups : List[(Fields, Pipe, JoinMode)] = Nil
+
+ // Joins (cogroups) with pipe p on fields f.
+ // Make sure that pipe p is smaller than the left side pipe, otherwise this
+ // might take a while.
+ def coGroup(f : Fields, p : Pipe, j : JoinMode = InnerJoinMode) = {
+ coGroups ::= (f, RichPipe.assignName(p), j)
+ this
+ }
+
+ override def schedule(name : String, pipe : Pipe) : Pipe = {
+ assert(!sortBy.isDefined, "cannot use a sortBy when doing a coGroup")
+ val fields = (groupFields :: coGroups.map{ _._1 }).toArray
@johnynek

johnynek Apr 5, 2012

Collaborator

What if coGroups is empty? CoGroup doesn't make a lot of sense. I think we should:

assert(!coGroups.isEmpty, "coGroupBy requires at least one other pipe to .coGroup")

@johnynek johnynek commented on an outdated diff Apr 5, 2012

src/main/scala/com/twitter/scalding/CoGroupBuilder.scala
+ */
+class CoGroupBuilder(groupFields : Fields, joinMode : JoinMode) extends GroupBuilder(groupFields) {
+ protected var coGroups : List[(Fields, Pipe, JoinMode)] = Nil
+
+ // Joins (cogroups) with pipe p on fields f.
+ // Make sure that pipe p is smaller than the left side pipe, otherwise this
+ // might take a while.
+ def coGroup(f : Fields, p : Pipe, j : JoinMode = InnerJoinMode) = {
+ coGroups ::= (f, RichPipe.assignName(p), j)
+ this
+ }
+
+ override def schedule(name : String, pipe : Pipe) : Pipe = {
+ assert(!sortBy.isDefined, "cannot use a sortBy when doing a coGroup")
+ val fields = (groupFields :: coGroups.map{ _._1 }).toArray
+ val pipes = (pipe :: coGroups.map{ _._2 }).toArray
@johnynek

johnynek Apr 5, 2012

Collaborator

Shouldn't RichPipe.assignName(pipe) be put here instead of joinWithSmaller?

@johnynek johnynek commented on an outdated diff Apr 5, 2012

src/main/scala/com/twitter/scalding/JoinAlgorithms.scala
val intersection = asSet(fs._1).intersect(asSet(fs._2))
if (intersection.size == 0) {
// Common case: no intersection in names: just CoGroup, which duplicates the grouping fields:
- setReducers(new CoGroup(assignName(pipe), fs._1, assignName(that), fs._2, joiner), reducers)
+ assignName(pipe).coGroupBy(fs._1, joiners._1) {
@johnynek

johnynek Apr 5, 2012

Collaborator

I think the assignName needs to be moved inside CoGroupBuilder. We want .coGroupBy to work for people that don't think about pipe names.

@johnynek johnynek commented on an outdated diff Apr 5, 2012

src/main/scala/com/twitter/scalding/JoinAlgorithms.scala
/*
* Since it is an inner join, we only output if the key is present an equal in both sides.
* For this (common) case, it doesn't matter if we drop one of the matching grouping fields.
* So, we rename the right hand side to temporary names, then discard them after the operation
*/
val (renamedThat, newJoinFields, temp) = renameCollidingFields(that, fs._2, intersection)
- setReducers(new CoGroup(assignName(pipe), fs._1,
- assignName(renamedThat), newJoinFields, joiner), reducers)
- .discard(temp)
+ assignName(pipe).coGroupBy(fs._1, joiners._1) {
@johnynek

johnynek Apr 5, 2012

Collaborator

Move assignName to inside CoGroupBuilder.schedule

@johnynek johnynek commented on an outdated diff Apr 5, 2012

src/main/scala/com/twitter/scalding/CoGroupBuilder.scala
+
+ // Joins (cogroups) with pipe p on fields f.
+ // Make sure that pipe p is smaller than the left side pipe, otherwise this
+ // might take a while.
+ def coGroup(f : Fields, p : Pipe, j : JoinMode = InnerJoinMode) = {
+ coGroups ::= (f, RichPipe.assignName(p), j)
+ this
+ }
+
+ override def schedule(name : String, pipe : Pipe) : Pipe = {
+ assert(!sortBy.isDefined, "cannot use a sortBy when doing a coGroup")
+ val fields = (groupFields :: coGroups.map{ _._1 }).toArray
+ val pipes = (pipe :: coGroups.map{ _._2 }).toArray
+ val joinModes = (joinMode :: coGroups.map{ _._3 }).map{ _.booleanValue }.toArray
+ val mixedJoiner = new MixedJoin(joinModes)
+ val cg : Pipe = new CoGroup(pipes, fields, null, mixedJoiner)
@johnynek

johnynek Apr 5, 2012

Collaborator

Add a TODO to move here the automatic renaming of fields in inner joins and drop the duplicated fields as we do in joinWithSmaller/Tiny now.

@johnynek johnynek added a commit that referenced this pull request Apr 6, 2012

@johnynek johnynek Merge pull request #52 from azymnis/feature/cogroup_builder
Feature/cogroup builder
727562f

@johnynek johnynek merged commit 727562f into twitter:develop Apr 6, 2012

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment