Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #197 from johnynek/feature/custom-joiner

Rewrite of CoGrouping2 in Typed-API, allows custom joiners
  • Loading branch information...
commit b517fd9c8fb6cd9fcaceb2f6c6dd7fe334b3c4c5 2 parents bb8742a + 7cd6277
@azymnis azymnis authored
View
7 src/main/scala/com/twitter/scalding/TupleConversions.scala
@@ -43,6 +43,13 @@ trait TupleConversions extends GeneratedConversions {
}
}
+ def tupleAt(idx: Int)(tup: CTuple): CTuple = {
+ val obj = tup.getObject(idx)
+ val res = CTuple.size(1)
+ res.set(0, obj)
+ res
+ }
+
implicit object TupleEntryConverter extends TupleConverter[TupleEntry] {
override def apply(tup : TupleEntry) = tup
override def arity = -1
View
138 src/main/scala/com/twitter/scalding/TypedPipe.scala
@@ -9,6 +9,7 @@ import cascading.tuple.TupleEntry
import java.io.Serializable
import com.twitter.algebird.{Monoid, Ring}
+import com.twitter.scalding.typed.{Joiner, CoGrouped2}
/***************
** WARNING: This is a new an experimental API. Expect API breaks. If you want
@@ -178,11 +179,11 @@ trait KeyedList[K,T] {
*/
def mapValueStream[V](smfn : Iterator[T] => Iterator[V]) : KeyedList[K,V]
/** This is a special case of mapValueStream, but can be optimized because it doesn't need
- * all the values for a given key at once. An unoptimized implementation would be:
+ * all the values for a given key at once. An unoptimized implementation is:
* mapValueStream { _.map { fn } }
* but for Grouped we can avoid resorting to mapValueStream
*/
- def mapValues[V](fn : T => V) : KeyedList[K,V]
+ def mapValues[V](fn : T => V) : KeyedList[K,V] = mapValueStream { _.map { fn } }
/** reduce with fn which must be associative and commutative.
* Like the above this can be optimized in some Grouped cases.
*/
@@ -258,14 +259,15 @@ object Grouped {
/** Represents a grouping which is the transition from map to reduce phase in hadoop.
* Grouping is on a key of type K by ordering Ordering[K].
*/
-class Grouped[K,T](pipe : Pipe, ordering : Ordering[K],
- streamMapFn : Option[(Iterator[TupleEntry]) => Iterator[T]],
- valueSort : Option[(Fields,Boolean)],
- reducers : Int = -1)
+class Grouped[K,T](private[scalding] val pipe : Pipe,
+ val ordering : Ordering[K],
+ streamMapFn : Option[(Iterator[Tuple]) => Iterator[T]],
+ private[scalding] val valueSort : Option[(Fields,Boolean)],
+ val reducers : Int = -1)
extends KeyedList[K,T] with Serializable {
import Dsl._
- protected val groupKey = Grouped.sorting("key", ordering)
+ private[scalding] val groupKey = Grouped.sorting("key", ordering)
protected def sortIfNeeded(gb : GroupBuilder) : GroupBuilder = {
valueSort.map { fb =>
@@ -319,7 +321,7 @@ class Grouped[K,T](pipe : Pipe, ordering : Ordering[K],
//Actually execute the mapValueStream:
streamMapFn.map { fn =>
operate[T] {
- _.mapStream[TupleEntry,T]('value -> 'value)(fn)(TupleEntryConverter,SingleSetter)
+ _.mapStream[Tuple,T]('value -> 'value)(fn)(CTupleConverter,SingleSetter)
}
}.getOrElse {
// This case happens when someone does .groupAll.sortBy { }.write
@@ -352,118 +354,24 @@ class Grouped[K,T](pipe : Pipe, ordering : Ordering[K],
reduceLeft(fn)
}
}
- override def mapValueStream[V](nmf : Iterator[T] => Iterator[V]) : Grouped[K,V] = {
- val tconv = singleConverter[T]
- val newStreamMapFn = streamMapFn.map { _.andThen(nmf) }.orElse {
+ private[scalding] lazy val streamMapping : (Iterator[Tuple]) => Iterator[T] = {
+ streamMapFn.getOrElse {
// Set up the initial stream mapping:
- Some({(tei : Iterator[TupleEntry]) => nmf(tei.map { te => tconv(te) })})
+ {(ti : Iterator[Tuple]) => ti.map { _.getObject(0).asInstanceOf[T] }}
}
+ }
+ override def mapValueStream[V](nmf : Iterator[T] => Iterator[V]) : Grouped[K,V] = {
+ val newStreamMapFn = Some(streamMapping.andThen(nmf))
new Grouped[K,V](pipe, ordering, newStreamMapFn, valueSort, reducers)
}
// SMALLER PIPE ALWAYS ON THE RIGHT!!!!!!!
- def join[W](smaller : Grouped[K,W]) = new InnerCoGrouped2[K,T,W](this, smaller)
- def leftJoin[W](smaller : Grouped[K,W]) = new LeftCoGrouped2[K,T,W](this, smaller)
- def rightJoin[W](smaller : Grouped[K,W]) = new RightCoGrouped2[K,T,W](this, smaller)
- def outerJoin[W](smaller : Grouped[K,W]) = new OuterCoGrouped2[K,T,W](this, smaller)
- // TODO: implement blockJoin, joinWithTiny
-}
-
-
-/** Represents a result of CoGroup operation on two Grouped pipes.
- * users should probably never directly construct them, but instead use the
- * (outer/left/right)Join methods of Grouped.
- */
-class CoGrouped2[K,V,W,Result]
- (bigger : Grouped[K,V], bigMode : JoinMode, smaller : Grouped[K,W], smallMode : JoinMode,
- conv : ((V,W)) => Result, streamMapFn : Option[(Iterator[(V,W)]) => Iterator[Result]],
- reducers : Int = -1)
- extends KeyedList[K,Result] with Serializable {
+ def cogroup[W,R](smaller: Grouped[K,W])(joiner: (K, Iterator[T], () => Iterator[W]) => Iterator[R])
+ : KeyedList[K,R] = new CoGrouped2[K,T,W,R](this, smaller, joiner)
- import Dsl._
-
- protected def nonNullKey(tup : Tuple) : K = {
- val first = tup.getObject(0)
- val second = tup.getObject(1)
- val idx = if(null == first) 1 else 0
- // TODO, POB: I think type erasure is making this TupleGetter[AnyRef]
- // And so, some of this *might* break if Cascading starts handling primitives
- // better
- implicitly[TupleGetter[K]].get(tup, idx)
- }
-
- // resultFields should have the two key fields first
- protected def operate[B](op : CoGroupBuilder => GroupBuilder,
- resultFields : Fields, finish : Tuple => B) : TypedPipe[(K,B)] = {
- // Rename the key and values:
- def pipeOf[V1](g : Grouped[K,V1], f : Fields) : Pipe = g.toTypedPipe.toPipe(f)(Tup2Setter)
-
- // TODO a custom joiner here could combine a prior function on Grouped
- val rsmaller = pipeOf(smaller, ('key2, 'value2))
- val newPipe = pipeOf(bigger, ('key, 'value)).coGroupBy('key, bigMode) { gb =>
- op(gb.coGroup('key2, rsmaller, smallMode)).reducers(reducers)
- }.project(resultFields)
- new TypedPipe[(K,B)](newPipe, resultFields, { te =>
- val tup = te.getTuple
- Some(nonNullKey(tup), finish(tup))
- })
- }
- // If you don't reduce, this should be an implicit CoGrouped => TypedPipe
- override lazy val toTypedPipe : TypedPipe[(K,Result)] = {
- if (streamMapFn.isEmpty) {
- // This is the case of no reduce or mapValueStream after the join (common case)
- operate({ gb => gb },
- ('key, 'key2, 'value, 'value2),
- {tup : Tuple => conv((tup.getObject(2).asInstanceOf[V], tup.getObject(3).asInstanceOf[W]))})
- }
- else {
- // We do the mapStream. NOTE conv must be null if we get here
- assert(conv == null, "Cannot have non-null conv and streamMapFn")
- operate({gb => gb.mapStream(('value,'value2)->('valuer))(streamMapFn.get)},
- ('key, 'key2, 'valuer),
- {tup : Tuple => tup.getObject(2).asInstanceOf[Result]})
- }
- }
- override def mapValues[B]( f : (Result) => B) : KeyedList[K,B] = {
- if (streamMapFn.isEmpty) {
- // We can just chain the conv function from (V,W) => Result => B
- new CoGrouped2[K,V,W,B](bigger, bigMode, smaller, smallMode,
- conv.andThen(f), None, reducers)
- }
- else {
- // we might as well use mapValueStream
- mapValueStream { iter => iter.map { f } }
- }
- }
- override def mapValueStream[U](f : Iterator[Result] => Iterator[U]) : KeyedList[K,U] = {
- val newStreamMapFn = streamMapFn.map {
- // If it is defined, just chain in f after what we already have
- _.andThen { f }
- }
- .orElse {
- // This the first mapValueStream call, need to setup initial streamMapFn:
- Some( (vwit : Iterator[(V,W)]) => f(vwit.map { conv }) )
- }
- new CoGrouped2[K,V,W,U](bigger, bigMode, smaller, smallMode,
- null, newStreamMapFn, reducers)
- }
+ def join[W](smaller : Grouped[K,W]) = cogroup(smaller)(Joiner.inner2)
+ def leftJoin[W](smaller : Grouped[K,W]) = cogroup(smaller)(Joiner.left2)
+ def rightJoin[W](smaller : Grouped[K,W]) = cogroup(smaller)(Joiner.right2)
+ def outerJoin[W](smaller : Grouped[K,W]) = cogroup(smaller)(Joiner.outer2)
- def withReducers(red : Int) : CoGrouped2[K,V,W,Result] = {
- new CoGrouped2(bigger, bigMode, smaller, smallMode, conv, streamMapFn, red)
- }
+ // TODO: implement blockJoin, joinWithTiny
}
-
-class InnerCoGrouped2[K,V,W](bigger : Grouped[K,V], smaller : Grouped[K,W])
- extends CoGrouped2[K,V,W,(V,W)](bigger, InnerJoinMode, smaller, InnerJoinMode,
- { in : (V,W) => in }, None)
-
-class LeftCoGrouped2[K,V,W](bigger : Grouped[K,V], smaller : Grouped[K,W])
- extends CoGrouped2[K,V,W,(V,Option[W])](bigger, InnerJoinMode, smaller, OuterJoinMode,
- { in : (V,W) => (in._1, Option(in._2))}, None)
-
-class RightCoGrouped2[K,V,W](bigger : Grouped[K,V], smaller : Grouped[K,W])
- extends CoGrouped2[K,V,W,(Option[V],W)](bigger, OuterJoinMode, smaller, InnerJoinMode,
- { in : (V,W) => (Option(in._1), in._2)}, None)
-
-class OuterCoGrouped2[K,V,W](bigger : Grouped[K,V], smaller : Grouped[K,W])
- extends CoGrouped2[K,V,W,(Option[V],Option[W])](bigger, OuterJoinMode, smaller, OuterJoinMode,
- { in : (V,W) => (Option(in._1), Option(in._2))}, None)
View
81 src/main/scala/com/twitter/scalding/typed/CoGrouped2.scala
@@ -0,0 +1,81 @@
+/*
+Copyright 2012 Twitter, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package com.twitter.scalding.typed
+
+import cascading.pipe.CoGroup
+import cascading.pipe.joiner.{Joiner => CJoiner, JoinerClosure}
+import cascading.tuple.{Tuple => CTuple, Fields, TupleEntry}
+
+import com.twitter.scalding._
+
+import scala.collection.JavaConverters._
+
+class CoGrouped2[K,V,W,R](left: Grouped[K,V],
+ right: Grouped[K,W],
+ joiner: (K, Iterator[V], () => Iterator[W]) => Iterator[R])
+ extends KeyedList[K,R] with java.io.Serializable {
+
+ override lazy val toTypedPipe : TypedPipe[(K,R)] = {
+ // Actually make a new coGrouping:
+ assert(left.valueSort == None, "secondary sorting unsupported in CoGrouped2")
+ assert(right.valueSort == None, "secondary sorting unsupported in CoGrouped2")
+
+ import Dsl._
+ val rightGroupKey = RichFields(StringField[K]("key1")(right.ordering, None))
+
+ val newPipe = new CoGroup(left.pipe, left.groupKey,
+ right.pipe.rename(('key, 'value) -> ('key1, 'value1)),
+ rightGroupKey,
+ new Joiner2(left.streamMapping, right.streamMapping, joiner))
+
+ val reducers = scala.math.max(left.reducers, right.reducers)
+ val pipeWithRed = RichPipe.setReducers(newPipe, reducers).project('key, 'value)
+ //Construct the new TypedPipe
+ TypedPipe.from(pipeWithRed, ('key, 'value))
+ }
+
+ override def mapValueStream[U](fn: Iterator[R] => Iterator[U]): KeyedList[K,U] = {
+ new CoGrouped2[K,V,W,U](left, right, {(k,vit,wit) => fn(joiner(k,vit,wit))})
+ }
+}
+
+class Joiner2[K,V,W,R](leftGetter : Iterator[CTuple] => Iterator[V],
+ rightGetter: Iterator[CTuple] => Iterator[W],
+ joiner: (K, Iterator[V], () => Iterator[W]) => Iterator[R]) extends CJoiner {
+
+ import Joiner._
+
+ override def getIterator(jc: JoinerClosure) = {
+ // The left one cannot be iterated multiple times on Hadoop:
+ val (lkopt, left) = getKeyValue[K](jc.getIterator(0))
+ // It's safe to call getIterator more than once on index > 0
+ val (rkopt, _) = getKeyValue[K](jc.getIterator(1))
+ // Try to get from the right-hand-side
+ val goodKey = lkopt.orElse(rkopt).get
+
+ val rightG = { () => rightGetter(jc.getIterator(1).asScala.map { Dsl.tupleAt(1) }) }
+
+ joiner(goodKey, leftGetter(left), rightG).map { rval =>
+ // There always has to be four resulting fields
+ // or otherwise the flow planner will throw
+ val res = CTuple.size(4)
+ res.set(0, goodKey)
+ res.set(1, rval)
+ res
+ }.asJava
+ }
+ override val numJoins = 1
+}
View
60 src/main/scala/com/twitter/scalding/typed/Joiner.scala
@@ -0,0 +1,60 @@
+/*
+Copyright 2012 Twitter, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package com.twitter.scalding.typed
+
+import cascading.tuple.{Tuple => CTuple}
+
+import com.twitter.scalding._
+
+import scala.collection.JavaConverters._
+
+object Joiner extends java.io.Serializable {
+ // Returns the key from the FIRST tuple. Suitable for a single JoinerClosure
+ def getKeyValue[K](tupit: java.util.Iterator[CTuple]): (Option[K], Iterator[CTuple]) = {
+ val stupit = tupit.asScala
+ if (stupit.isEmpty) {
+ (None, stupit)
+ }
+ else {
+ val first = stupit.next
+ val key = Some(first.getObject(0).asInstanceOf[K])
+ val value = Iterator(Dsl.tupleAt(1)(first))
+ (key, value ++ stupit.map { Dsl.tupleAt(1) })
+ }
+ }
+
+ def inner2[K,V,U] = { (key: K, itv: Iterator[V], itu: () => Iterator[U]) =>
+ itv.flatMap { v => itu().map { u => (v,u) } }
+ }
+ def asOuter[U](it : Iterator[U]) : Iterator[Option[U]] = {
+ if(it.isEmpty) {
+ Iterator(None)
+ }
+ else {
+ it.map { Some(_) }
+ }
+ }
+ def outer2[K,V,U] = { (key: K, itv: Iterator[V], itu: () => Iterator[U]) =>
+ asOuter(itv).flatMap { v => asOuter(itu()).map { u => (v,u) } }
+ }
+ def left2[K,V,U] = { (key: K, itv: Iterator[V], itu: () => Iterator[U]) =>
+ itv.flatMap { v => asOuter(itu()).map { u => (v,u) } }
+ }
+ def right2[K,V,U] = { (key: K, itv: Iterator[V], itu: () => Iterator[U]) =>
+ asOuter(itv).flatMap { v => itu().map { u => (v,u) } }
+ }
+ // TODO: implement CoGroup3, and inner3, outer3 (probably best to leave the other modes as custom)
+}
View
2  src/test/scala/com/twitter/scalding/TypedPipeTest.scala
@@ -130,7 +130,7 @@ class TypedPipeJoinCountTest extends Specification {
noDetailedDiffs() //Fixes an issue with scala 2.9
import Dsl._
"A TJoinCountJob" should {
- JobTest("com.twitter.scalding.TJoinCountJob")
+ JobTest(new com.twitter.scalding.TJoinCountJob(_))
.source(Tsv("in0",(0,1)), List((0,1),(0,2),(1,1),(1,5),(2,10)))
.source(Tsv("in1",(0,1)), List((0,10),(1,20),(1,10),(1,30)))
.sink[(Int,Long)](Tsv("out")) { outbuf =>
Please sign in to comment.
Something went wrong with that request. Please try again.