Skip to content

Commit

Permalink
Merge pull request twitter#35 from azymnis/develop
Browse files Browse the repository at this point in the history
Split RichPipe join methods into their own trait.
  • Loading branch information
johnynek committed Mar 23, 2012
2 parents 0cc94b5 + 6b5bfce commit 83d0b3b
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 119 deletions.
157 changes: 157 additions & 0 deletions src/main/scala/com/twitter/scalding/JoinAlgorithms.scala
@@ -0,0 +1,157 @@
/*
Copyright 2012 Twitter, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.twitter.scalding

import cascading.tap._
import cascading.scheme._
import cascading.pipe._
import cascading.pipe.assembly._
import cascading.pipe.joiner._
import cascading.flow._
import cascading.operation._
import cascading.operation.aggregator._
import cascading.operation.filter._
import cascading.tuple._
import cascading.cascade._

/*
* Keeps all the logic related to RichPipe joins.
*
*/
trait JoinAlgorithms {
import RichPipe._

def pipe : Pipe

/*
* WARNING! doing a cross product with even a moderate sized pipe can
* create ENORMOUS output. The use-case here is attaching a constant (e.g.
* a number or a dictionary or set) to each row in another pipe.
* A common use-case comes from a groupAll and reduction to one row,
* then you want to send the results back out to every element in a pipe
*
* This uses joinWithTiny, so tiny pipe is replicated to all Mappers. If it
* is large, this will blow up. Get it: be foolish here and LOSE IT ALL!
*
* Use at your own risk.
*/
def crossWithTiny(tiny : Pipe) = {
val tinyJoin = tiny.map(() -> '__joinTiny__) { (u:Unit) => 1 }
pipe.map(() -> '__joinBig__) { (u:Unit) => 1 }
.joinWithTiny('__joinBig__ -> '__joinTiny__, tinyJoin)
.discard('__joinBig__, '__joinTiny__)
}

// Rename the collisions and return the pipe and the new names, and the fields to discard
private def renameCollidingFields(p : Pipe, fields : Fields,
collisions: Set[Comparable[_]]) : (Pipe, Fields, Fields) = {
// Here is how we rename colliding fields
def rename(f : Comparable[_]) : String = "__temp_join_" + f.toString

// convert to list, so we are explicit that ordering is fixed below:
val renaming = collisions.toList
val orig = new Fields(renaming : _*)
val temp = new Fields(renaming.map { rename } : _*)
// Now construct the new join keys, where we check for a rename
// otherwise use the original key:
val newJoinKeys = new Fields( asList(fields)
.map { fname =>
// If we renamed, get the rename, else just use the field
if (collisions(fname)) {
rename(fname)
}
else fname
} : _*)
val renamedPipe = p.rename(orig -> temp)
(renamedPipe, newJoinKeys, temp)
}

/**
* joins the first set of keys in the first pipe to the second set of keys in the second pipe.
* All keys must be unique UNLESS it is an inner join, then duplicated join keys are allowed, but
* the second copy is deleted (as cascading does not allow duplicated field names).
*
* Avoid going crazy adding more explicit join modes. Instead do for some other join
* mode with a larger pipe:
* .then { pipe => other.
* joinWithSmaller(('other1, 'other2)->('this1, 'this2), pipe, new FancyJoin)
* }
*/
def joinWithSmaller(fs :(Fields,Fields), that : Pipe, joiner : Joiner = new InnerJoin, reducers : Int = -1) = {
// If we are not doing an inner join, the join fields must be disjoint:
val intersection = asSet(fs._1).intersect(asSet(fs._2))
if (intersection.size == 0) {
// Common case: no intersection in names: just CoGroup, which duplicates the grouping fields:
setReducers(new CoGroup(assignName(pipe), fs._1, assignName(that), fs._2, joiner), reducers)
}
else if (joiner.isInstanceOf[InnerJoin]) {
/*
* Since it is an inner join, we only output if the key is present an equal in both sides.
* For this (common) case, it doesn't matter if we drop one of the matching grouping fields.
* So, we rename the right hand side to temporary names, then discard them after the operation
*/
val (renamedThat, newJoinFields, temp) = renameCollidingFields(that, fs._2, intersection)
setReducers(new CoGroup(assignName(pipe), fs._1,
assignName(renamedThat), newJoinFields, joiner), reducers)
.discard(temp)
}
else {
throw new IllegalArgumentException("join keys must be disjoint unless you are doing an InnerJoin. Found: " +
fs.toString + ", which overlap with: " + intersection.toString)
}
}

def joinWithLarger(fs : (Fields, Fields), that : Pipe, joiner : Joiner = new InnerJoin, reducers : Int = -1) = {
that.joinWithSmaller((fs._2, fs._1), pipe, joiner, reducers)
}

def leftJoinWithSmaller(fs :(Fields,Fields), that : Pipe, reducers : Int = -1) = {
joinWithSmaller(fs, that, new LeftJoin, reducers)
}

def leftJoinWithLarger(fs :(Fields,Fields), that : Pipe, reducers : Int = -1) = {
//We swap the order, and turn left into right:
that.joinWithSmaller((fs._2, fs._1), pipe, new RightJoin, reducers)
}

/**
* This does an assymmetric join, using cascading's "Join". This only runs through
* this pipe once, and keeps the right hand side pipe in memory (but is spillable).
*
* joins the first set of keys in the first pipe to the second set of keys in the second pipe.
* All keys must be unique UNLESS it is an inner join, then duplicated join keys are allowed, but
* the second copy is deleted (as cascading does not allow duplicated field names).
*
* WARNING: this does not work with outer joins, or right joins, only inner and
* left join versions are given.
*/
def joinWithTiny(fs :(Fields,Fields), that : Pipe) = {
val intersection = asSet(fs._1).intersect(asSet(fs._2))
if (intersection.size == 0) {
new Join(assignName(pipe), fs._1, assignName(that), fs._2, new InnerJoin)
}
else {
val (renamedThat, newJoinFields, temp) = renameCollidingFields(that, fs._2, intersection)
(new Join(assignName(pipe), fs._1, assignName(renamedThat), newJoinFields, new InnerJoin))
.discard(temp)
}
}

def leftJoinWithTiny(fs :(Fields,Fields), that : Pipe) = {
//Rename these pipes to avoid cascading name conflicts
new Join(assignName(pipe), fs._1, assignName(that), fs._2, new LeftJoin)
}
}
120 changes: 1 addition & 119 deletions src/main/scala/com/twitter/scalding/RichPipe.scala
Expand Up @@ -57,7 +57,7 @@ object RichPipe extends FieldConversions with TupleConversions with java.io.Seri
}
}

class RichPipe(val pipe : Pipe) extends java.io.Serializable {
class RichPipe(val pipe : Pipe) extends java.io.Serializable with JoinAlgorithms {
import RichPipe._

// Rename the current pipe
Expand All @@ -70,25 +70,6 @@ class RichPipe(val pipe : Pipe) extends java.io.Serializable {
new Each(pipe, fields, new Identity(fields))
}

/*
* WARNING! doing a cross product with even a moderate sized pipe can
* create ENORMOUS output. The use-case here is attaching a constant (e.g.
* a number or a dictionary or set) to each row in another pipe.
* A common use-case comes from a groupAll and reduction to one row,
* then you want to send the results back out to every element in a pipe
*
* This uses joinWithTiny, so tiny pipe is replicated to all Mappers. If it
* is large, this will blow up. Get it: be foolish here and LOSE IT ALL!
*
* Use at your own risk.
*/
def crossWithTiny(tiny : Pipe) = {
val tinyJoin = tiny.map(() -> '__joinTiny__) { (u:Unit) => 1 }
map(() -> '__joinBig__) { (u:Unit) => 1 }
.joinWithTiny('__joinBig__ -> '__joinTiny__, tinyJoin)
.discard('__joinBig__, '__joinTiny__)
}

//Discard the given fields, and keep the rest
//Kind of the opposite previous.
def discard(f : Fields) = new Each(pipe, f, new NoOp, Fields.SWAP)
Expand Down Expand Up @@ -215,105 +196,6 @@ class RichPipe(val pipe : Pipe) extends java.io.Serializable {

def debug = new Each(pipe, new Debug())

// Rename the collisions and return the pipe and the new names, and the fields to discard
private def renameCollidingFields(pipe : Pipe, fields : Fields,
collisions: Set[Comparable[_]]) : (Pipe, Fields, Fields) = {
// Here is how we rename colliding fields
def rename(f : Comparable[_]) : String = "__temp_join_" + f.toString

// convert to list, so we are explicit that ordering is fixed below:
val renaming = collisions.toList
val orig = new Fields(renaming : _*)
val temp = new Fields(renaming.map { rename } : _*)
// Now construct the new join keys, where we check for a rename
// otherwise use the original key:
val newJoinKeys = new Fields( asList(fields)
.map { fname =>
// If we renamed, get the rename, else just use the field
if (collisions(fname)) {
rename(fname)
}
else fname
} : _*)
val renamedPipe = pipe.rename(orig -> temp)
(renamedPipe, newJoinKeys, temp)
}
/**
* joins the first set of keys in the first pipe to the second set of keys in the second pipe.
* All keys must be unique UNLESS it is an inner join, then duplicated join keys are allowed, but
* the second copy is deleted (as cascading does not allow duplicated field names).
*
* Avoid going crazy adding more explicit join modes. Instead do for some other join
* mode with a larger pipe:
* .then { pipe => other.
* joinWithSmaller(('other1, 'other2)->('this1, 'this2), pipe, new FancyJoin)
* }
*/
def joinWithSmaller(fs :(Fields,Fields), that : Pipe, joiner : Joiner = new InnerJoin, reducers : Int = -1) = {
// If we are not doing an inner join, the join fields must be disjoint:
val intersection = asSet(fs._1).intersect(asSet(fs._2))
if (intersection.size == 0) {
// Common case: no intersection in names: just CoGroup, which duplicates the grouping fields:
setReducers(new CoGroup(assignName(pipe), fs._1, assignName(that), fs._2, joiner), reducers)
}
else if (joiner.isInstanceOf[InnerJoin]) {
/*
* Since it is an inner join, we only output if the key is present an equal in both sides.
* For this (common) case, it doesn't matter if we drop one of the matching grouping fields.
* So, we rename the right hand side to temporary names, then discard them after the operation
*/
val (renamedThat, newJoinFields, temp) = renameCollidingFields(that, fs._2, intersection)
setReducers(new CoGroup(assignName(pipe), fs._1,
assignName(renamedThat), newJoinFields, joiner), reducers)
.discard(temp)
}
else {
throw new IllegalArgumentException("join keys must be disjoint unless you are doing an InnerJoin. Found: " +
fs.toString + ", which overlap with: " + intersection.toString)
}
}

def joinWithLarger(fs : (Fields, Fields), that : Pipe, joiner : Joiner = new InnerJoin, reducers : Int = -1) = {
that.joinWithSmaller((fs._2, fs._1), this.pipe, joiner, reducers)
}

def leftJoinWithSmaller(fs :(Fields,Fields), that : Pipe, reducers : Int = -1) = {
joinWithSmaller(fs, that, new LeftJoin, reducers)
}

def leftJoinWithLarger(fs :(Fields,Fields), that : Pipe, reducers : Int = -1) = {
//We swap the order, and turn left into right:
that.joinWithSmaller((fs._2, fs._1), this.pipe, new RightJoin, reducers)
}

/**
* This does an assymmetric join, using cascading's "Join". This only runs through
* this pipe once, and keeps the right hand side pipe in memory (but is spillable).
*
* joins the first set of keys in the first pipe to the second set of keys in the second pipe.
* All keys must be unique UNLESS it is an inner join, then duplicated join keys are allowed, but
* the second copy is deleted (as cascading does not allow duplicated field names).
*
* WARNING: this does not work with outer joins, or right joins, only inner and
* left join versions are given.
*/
def joinWithTiny(fs :(Fields,Fields), that : Pipe) = {
val intersection = asSet(fs._1).intersect(asSet(fs._2))
if (intersection.size == 0) {
new Join(assignName(pipe), fs._1, assignName(that), fs._2, new InnerJoin)
}
else {
val (renamedThat, newJoinFields, temp) = renameCollidingFields(that, fs._2, intersection)
(new Join(assignName(pipe), fs._1, assignName(renamedThat), newJoinFields, new InnerJoin))
.discard(temp)
}
}

def leftJoinWithTiny(fs :(Fields,Fields), that : Pipe) = {
//Rename these pipes to avoid cascading name conflicts
new Join(assignName(pipe), fs._1, assignName(that), fs._2, new LeftJoin)
}

def write(outsource : Source)(implicit flowDef : FlowDef, mode : Mode) = {
outsource.write(pipe)(flowDef, mode)
pipe
Expand Down

0 comments on commit 83d0b3b

Please sign in to comment.