Skip to content

Rewrite of CoGrouping2 in Typed-API, allows custom joiners #197

Merged
merged 2 commits into from Oct 2, 2012

2 participants

@johnynek
johnynek commented Oct 2, 2012

This fixes the efficiency issue with doing operations on Grouped[K,V] before a join.

It also adds a syntax for custom cogrouping (see Grouped.cogroup)

Finally it implements all typed joins in terms of the above (which turned out to require splunking through several weird and underdocumented cascading issues).

@azymnis azymnis commented on an outdated diff Oct 2, 2012
...ain/scala/com/twitter/scalding/typed/CoGrouped2.scala
+
+ val reducers = scala.math.max(left.reducers, right.reducers)
+ val pipeWithRed = RichPipe.setReducers(newPipe, reducers).project('key, 'value)
+ //Construct the new TypedPipe
+ TypedPipe.from(pipeWithRed, ('key, 'value))
+ }
+
+ override def mapValueStream[U](fn: Iterator[R] => Iterator[U]): KeyedList[K,U] = {
+ new CoGrouped2[K,V,W,U](left, right, {(k,vit,wit) => fn(joiner(k,vit,wit))})
+ }
+}
+
+object Joiner extends java.io.Serializable {
+ def getOne[K](k1 : Option[K], k2 : Option[K]) = k1.getOrElse(k2.get)
+
+ def tupleAt(idx: Int)(tup: CTuple): CTuple = {
@azymnis
Twitter, Inc. member
azymnis added a note Oct 2, 2012

This feels a bit more generic than Joiner. Maybe we create a CascadingUtils class? Or TupleBase?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@azymnis azymnis commented on an outdated diff Oct 2, 2012
...ain/scala/com/twitter/scalding/typed/CoGrouped2.scala
+ joiner: (K, Iterator[V], () => Iterator[W]) => Iterator[R]) extends CJoiner {
+
+ import Joiner._
+
+ override def getIterator(jc: JoinerClosure) = {
+ // The left one cannot be iterated multiple times on Hadoop:
+ val (lkopt, left) = getKeyValue[K](jc.getIterator(0))
+ // It's safe to call getIterator more than once on index > 0
+ val (rkopt, _) = getKeyValue[K](jc.getIterator(1))
+ // Try to get from the right-hand-side
+ val goodKey = getOne(lkopt, rkopt)
+
+ val rightG = { () => rightGetter(jc.getIterator(1).asScala.map { tupleAt(1) }) }
+
+ joiner(goodKey, leftGetter(left), rightG).map { rval =>
+ // There always has to be four resuling fields:
@azymnis
Twitter, Inc. member
azymnis added a note Oct 2, 2012

Typo: resulting

@azymnis
Twitter, Inc. member
azymnis added a note Oct 2, 2012

Maybe add: otherwise the flow planner will throw an exception.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@azymnis azymnis merged commit b517fd9 into twitter:develop Oct 2, 2012

1 check passed

Details default The Travis build passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.