Skip to content
This repository

Rewrite of CoGrouping2 in Typed-API, allows custom joiners #197

Merged
merged 2 commits into from almost 2 years ago

2 participants

P. Oscar Boykin Argyris Zymnis
P. Oscar Boykin
Collaborator

This fixes the efficiency issue with doing operations on Grouped[K,V] before a join.

It also adds a syntax for custom cogrouping (see Grouped.cogroup)

Finally it implements all typed joins in terms of the above (which turned out to require splunking through several weird and underdocumented cascading issues).

...ain/scala/com/twitter/scalding/typed/CoGrouped2.scala
((43 lines not shown))
  43 +
  44 + val reducers = scala.math.max(left.reducers, right.reducers)
  45 + val pipeWithRed = RichPipe.setReducers(newPipe, reducers).project('key, 'value)
  46 + //Construct the new TypedPipe
  47 + TypedPipe.from(pipeWithRed, ('key, 'value))
  48 + }
  49 +
  50 + override def mapValueStream[U](fn: Iterator[R] => Iterator[U]): KeyedList[K,U] = {
  51 + new CoGrouped2[K,V,W,U](left, right, {(k,vit,wit) => fn(joiner(k,vit,wit))})
  52 + }
  53 +}
  54 +
  55 +object Joiner extends java.io.Serializable {
  56 + def getOne[K](k1 : Option[K], k2 : Option[K]) = k1.getOrElse(k2.get)
  57 +
  58 + def tupleAt(idx: Int)(tup: CTuple): CTuple = {
1
Argyris Zymnis Collaborator
azymnis added a note

This feels a bit more generic than Joiner. Maybe we create a CascadingUtils class? Or TupleBase?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...ain/scala/com/twitter/scalding/typed/CoGrouped2.scala
((103 lines not shown))
  103 + joiner: (K, Iterator[V], () => Iterator[W]) => Iterator[R]) extends CJoiner {
  104 +
  105 + import Joiner._
  106 +
  107 + override def getIterator(jc: JoinerClosure) = {
  108 + // The left one cannot be iterated multiple times on Hadoop:
  109 + val (lkopt, left) = getKeyValue[K](jc.getIterator(0))
  110 + // It's safe to call getIterator more than once on index > 0
  111 + val (rkopt, _) = getKeyValue[K](jc.getIterator(1))
  112 + // Try to get from the right-hand-side
  113 + val goodKey = getOne(lkopt, rkopt)
  114 +
  115 + val rightG = { () => rightGetter(jc.getIterator(1).asScala.map { tupleAt(1) }) }
  116 +
  117 + joiner(goodKey, leftGetter(left), rightG).map { rval =>
  118 + // There always has to be four resuling fields:
2
Argyris Zymnis Collaborator
azymnis added a note

Typo: resulting

Argyris Zymnis Collaborator
azymnis added a note

Maybe add: otherwise the flow planner will throw an exception.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Argyris Zymnis azymnis merged commit b517fd9 into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
7 src/main/scala/com/twitter/scalding/TupleConversions.scala
@@ -43,6 +43,13 @@ trait TupleConversions extends GeneratedConversions {
43 43 }
44 44 }
45 45
  46 + def tupleAt(idx: Int)(tup: CTuple): CTuple = {
  47 + val obj = tup.getObject(idx)
  48 + val res = CTuple.size(1)
  49 + res.set(0, obj)
  50 + res
  51 + }
  52 +
46 53 implicit object TupleEntryConverter extends TupleConverter[TupleEntry] {
47 54 override def apply(tup : TupleEntry) = tup
48 55 override def arity = -1
138 src/main/scala/com/twitter/scalding/TypedPipe.scala
@@ -9,6 +9,7 @@ import cascading.tuple.TupleEntry
9 9 import java.io.Serializable
10 10
11 11 import com.twitter.algebird.{Monoid, Ring}
  12 +import com.twitter.scalding.typed.{Joiner, CoGrouped2}
12 13
13 14 /***************
14 15 ** WARNING: This is a new an experimental API. Expect API breaks. If you want
@@ -178,11 +179,11 @@ trait KeyedList[K,T] {
178 179 */
179 180 def mapValueStream[V](smfn : Iterator[T] => Iterator[V]) : KeyedList[K,V]
180 181 /** This is a special case of mapValueStream, but can be optimized because it doesn't need
181   - * all the values for a given key at once. An unoptimized implementation would be:
  182 + * all the values for a given key at once. An unoptimized implementation is:
182 183 * mapValueStream { _.map { fn } }
183 184 * but for Grouped we can avoid resorting to mapValueStream
184 185 */
185   - def mapValues[V](fn : T => V) : KeyedList[K,V]
  186 + def mapValues[V](fn : T => V) : KeyedList[K,V] = mapValueStream { _.map { fn } }
186 187 /** reduce with fn which must be associative and commutative.
187 188 * Like the above this can be optimized in some Grouped cases.
188 189 */
@@ -258,14 +259,15 @@ object Grouped {
258 259 /** Represents a grouping which is the transition from map to reduce phase in hadoop.
259 260 * Grouping is on a key of type K by ordering Ordering[K].
260 261 */
261   -class Grouped[K,T](pipe : Pipe, ordering : Ordering[K],
262   - streamMapFn : Option[(Iterator[TupleEntry]) => Iterator[T]],
263   - valueSort : Option[(Fields,Boolean)],
264   - reducers : Int = -1)
  262 +class Grouped[K,T](private[scalding] val pipe : Pipe,
  263 + val ordering : Ordering[K],
  264 + streamMapFn : Option[(Iterator[Tuple]) => Iterator[T]],
  265 + private[scalding] val valueSort : Option[(Fields,Boolean)],
  266 + val reducers : Int = -1)
265 267 extends KeyedList[K,T] with Serializable {
266 268
267 269 import Dsl._
268   - protected val groupKey = Grouped.sorting("key", ordering)
  270 + private[scalding] val groupKey = Grouped.sorting("key", ordering)
269 271
270 272 protected def sortIfNeeded(gb : GroupBuilder) : GroupBuilder = {
271 273 valueSort.map { fb =>
@@ -319,7 +321,7 @@ class Grouped[K,T](pipe : Pipe, ordering : Ordering[K],
319 321 //Actually execute the mapValueStream:
320 322 streamMapFn.map { fn =>
321 323 operate[T] {
322   - _.mapStream[TupleEntry,T]('value -> 'value)(fn)(TupleEntryConverter,SingleSetter)
  324 + _.mapStream[Tuple,T]('value -> 'value)(fn)(CTupleConverter,SingleSetter)
323 325 }
324 326 }.getOrElse {
325 327 // This case happens when someone does .groupAll.sortBy { }.write
@@ -352,118 +354,24 @@ class Grouped[K,T](pipe : Pipe, ordering : Ordering[K],
352 354 reduceLeft(fn)
353 355 }
354 356 }
355   - override def mapValueStream[V](nmf : Iterator[T] => Iterator[V]) : Grouped[K,V] = {
356   - val tconv = singleConverter[T]
357   - val newStreamMapFn = streamMapFn.map { _.andThen(nmf) }.orElse {
  357 + private[scalding] lazy val streamMapping : (Iterator[Tuple]) => Iterator[T] = {
  358 + streamMapFn.getOrElse {
358 359 // Set up the initial stream mapping:
359   - Some({(tei : Iterator[TupleEntry]) => nmf(tei.map { te => tconv(te) })})
  360 + {(ti : Iterator[Tuple]) => ti.map { _.getObject(0).asInstanceOf[T] }}
360 361 }
  362 + }
  363 + override def mapValueStream[V](nmf : Iterator[T] => Iterator[V]) : Grouped[K,V] = {
  364 + val newStreamMapFn = Some(streamMapping.andThen(nmf))
361 365 new Grouped[K,V](pipe, ordering, newStreamMapFn, valueSort, reducers)
362 366 }
363 367 // SMALLER PIPE ALWAYS ON THE RIGHT!!!!!!!
364   - def join[W](smaller : Grouped[K,W]) = new InnerCoGrouped2[K,T,W](this, smaller)
365   - def leftJoin[W](smaller : Grouped[K,W]) = new LeftCoGrouped2[K,T,W](this, smaller)
366   - def rightJoin[W](smaller : Grouped[K,W]) = new RightCoGrouped2[K,T,W](this, smaller)
367   - def outerJoin[W](smaller : Grouped[K,W]) = new OuterCoGrouped2[K,T,W](this, smaller)
368   - // TODO: implement blockJoin, joinWithTiny
369   -}
370   -
371   -
372   -/** Represents a result of CoGroup operation on two Grouped pipes.
373   - * users should probably never directly construct them, but instead use the
374   - * (outer/left/right)Join methods of Grouped.
375   - */
376   -class CoGrouped2[K,V,W,Result]
377   - (bigger : Grouped[K,V], bigMode : JoinMode, smaller : Grouped[K,W], smallMode : JoinMode,
378   - conv : ((V,W)) => Result, streamMapFn : Option[(Iterator[(V,W)]) => Iterator[Result]],
379   - reducers : Int = -1)
380   - extends KeyedList[K,Result] with Serializable {
  368 + def cogroup[W,R](smaller: Grouped[K,W])(joiner: (K, Iterator[T], () => Iterator[W]) => Iterator[R])
  369 + : KeyedList[K,R] = new CoGrouped2[K,T,W,R](this, smaller, joiner)
381 370
382   - import Dsl._
383   -
384   - protected def nonNullKey(tup : Tuple) : K = {
385   - val first = tup.getObject(0)
386   - val second = tup.getObject(1)
387   - val idx = if(null == first) 1 else 0
388   - // TODO, POB: I think type erasure is making this TupleGetter[AnyRef]
389   - // And so, some of this *might* break if Cascading starts handling primitives
390   - // better
391   - implicitly[TupleGetter[K]].get(tup, idx)
392   - }
393   -
394   - // resultFields should have the two key fields first
395   - protected def operate[B](op : CoGroupBuilder => GroupBuilder,
396   - resultFields : Fields, finish : Tuple => B) : TypedPipe[(K,B)] = {
397   - // Rename the key and values:
398   - def pipeOf[V1](g : Grouped[K,V1], f : Fields) : Pipe = g.toTypedPipe.toPipe(f)(Tup2Setter)
399   -
400   - // TODO a custom joiner here could combine a prior function on Grouped
401   - val rsmaller = pipeOf(smaller, ('key2, 'value2))
402   - val newPipe = pipeOf(bigger, ('key, 'value)).coGroupBy('key, bigMode) { gb =>
403   - op(gb.coGroup('key2, rsmaller, smallMode)).reducers(reducers)
404   - }.project(resultFields)
405   - new TypedPipe[(K,B)](newPipe, resultFields, { te =>
406   - val tup = te.getTuple
407   - Some(nonNullKey(tup), finish(tup))
408   - })
409   - }
410   - // If you don't reduce, this should be an implicit CoGrouped => TypedPipe
411   - override lazy val toTypedPipe : TypedPipe[(K,Result)] = {
412   - if (streamMapFn.isEmpty) {
413   - // This is the case of no reduce or mapValueStream after the join (common case)
414   - operate({ gb => gb },
415   - ('key, 'key2, 'value, 'value2),
416   - {tup : Tuple => conv((tup.getObject(2).asInstanceOf[V], tup.getObject(3).asInstanceOf[W]))})
417   - }
418   - else {
419   - // We do the mapStream. NOTE conv must be null if we get here
420   - assert(conv == null, "Cannot have non-null conv and streamMapFn")
421   - operate({gb => gb.mapStream(('value,'value2)->('valuer))(streamMapFn.get)},
422   - ('key, 'key2, 'valuer),
423   - {tup : Tuple => tup.getObject(2).asInstanceOf[Result]})
424   - }
425   - }
426   - override def mapValues[B]( f : (Result) => B) : KeyedList[K,B] = {
427   - if (streamMapFn.isEmpty) {
428   - // We can just chain the conv function from (V,W) => Result => B
429   - new CoGrouped2[K,V,W,B](bigger, bigMode, smaller, smallMode,
430   - conv.andThen(f), None, reducers)
431   - }
432   - else {
433   - // we might as well use mapValueStream
434   - mapValueStream { iter => iter.map { f } }
435   - }
436   - }
437   - override def mapValueStream[U](f : Iterator[Result] => Iterator[U]) : KeyedList[K,U] = {
438   - val newStreamMapFn = streamMapFn.map {
439   - // If it is defined, just chain in f after what we already have
440   - _.andThen { f }
441   - }
442   - .orElse {
443   - // This the first mapValueStream call, need to setup initial streamMapFn:
444   - Some( (vwit : Iterator[(V,W)]) => f(vwit.map { conv }) )
445   - }
446   - new CoGrouped2[K,V,W,U](bigger, bigMode, smaller, smallMode,
447   - null, newStreamMapFn, reducers)
448   - }
  371 + def join[W](smaller : Grouped[K,W]) = cogroup(smaller)(Joiner.inner2)
  372 + def leftJoin[W](smaller : Grouped[K,W]) = cogroup(smaller)(Joiner.left2)
  373 + def rightJoin[W](smaller : Grouped[K,W]) = cogroup(smaller)(Joiner.right2)
  374 + def outerJoin[W](smaller : Grouped[K,W]) = cogroup(smaller)(Joiner.outer2)
449 375
450   - def withReducers(red : Int) : CoGrouped2[K,V,W,Result] = {
451   - new CoGrouped2(bigger, bigMode, smaller, smallMode, conv, streamMapFn, red)
452   - }
  376 + // TODO: implement blockJoin, joinWithTiny
453 377 }
454   -
455   -class InnerCoGrouped2[K,V,W](bigger : Grouped[K,V], smaller : Grouped[K,W])
456   - extends CoGrouped2[K,V,W,(V,W)](bigger, InnerJoinMode, smaller, InnerJoinMode,
457   - { in : (V,W) => in }, None)
458   -
459   -class LeftCoGrouped2[K,V,W](bigger : Grouped[K,V], smaller : Grouped[K,W])
460   - extends CoGrouped2[K,V,W,(V,Option[W])](bigger, InnerJoinMode, smaller, OuterJoinMode,
461   - { in : (V,W) => (in._1, Option(in._2))}, None)
462   -
463   -class RightCoGrouped2[K,V,W](bigger : Grouped[K,V], smaller : Grouped[K,W])
464   - extends CoGrouped2[K,V,W,(Option[V],W)](bigger, OuterJoinMode, smaller, InnerJoinMode,
465   - { in : (V,W) => (Option(in._1), in._2)}, None)
466   -
467   -class OuterCoGrouped2[K,V,W](bigger : Grouped[K,V], smaller : Grouped[K,W])
468   - extends CoGrouped2[K,V,W,(Option[V],Option[W])](bigger, OuterJoinMode, smaller, OuterJoinMode,
469   - { in : (V,W) => (Option(in._1), Option(in._2))}, None)
81 src/main/scala/com/twitter/scalding/typed/CoGrouped2.scala
... ... @@ -0,0 +1,81 @@
  1 +/*
  2 +Copyright 2012 Twitter, Inc.
  3 +
  4 +Licensed under the Apache License, Version 2.0 (the "License");
  5 +you may not use this file except in compliance with the License.
  6 +You may obtain a copy of the License at
  7 +
  8 +http://www.apache.org/licenses/LICENSE-2.0
  9 +
  10 +Unless required by applicable law or agreed to in writing, software
  11 +distributed under the License is distributed on an "AS IS" BASIS,
  12 +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 +See the License for the specific language governing permissions and
  14 +limitations under the License.
  15 +*/
  16 +package com.twitter.scalding.typed
  17 +
  18 +import cascading.pipe.CoGroup
  19 +import cascading.pipe.joiner.{Joiner => CJoiner, JoinerClosure}
  20 +import cascading.tuple.{Tuple => CTuple, Fields, TupleEntry}
  21 +
  22 +import com.twitter.scalding._
  23 +
  24 +import scala.collection.JavaConverters._
  25 +
  26 +class CoGrouped2[K,V,W,R](left: Grouped[K,V],
  27 + right: Grouped[K,W],
  28 + joiner: (K, Iterator[V], () => Iterator[W]) => Iterator[R])
  29 + extends KeyedList[K,R] with java.io.Serializable {
  30 +
  31 + override lazy val toTypedPipe : TypedPipe[(K,R)] = {
  32 + // Actually make a new coGrouping:
  33 + assert(left.valueSort == None, "secondary sorting unsupported in CoGrouped2")
  34 + assert(right.valueSort == None, "secondary sorting unsupported in CoGrouped2")
  35 +
  36 + import Dsl._
  37 + val rightGroupKey = RichFields(StringField[K]("key1")(right.ordering, None))
  38 +
  39 + val newPipe = new CoGroup(left.pipe, left.groupKey,
  40 + right.pipe.rename(('key, 'value) -> ('key1, 'value1)),
  41 + rightGroupKey,
  42 + new Joiner2(left.streamMapping, right.streamMapping, joiner))
  43 +
  44 + val reducers = scala.math.max(left.reducers, right.reducers)
  45 + val pipeWithRed = RichPipe.setReducers(newPipe, reducers).project('key, 'value)
  46 + //Construct the new TypedPipe
  47 + TypedPipe.from(pipeWithRed, ('key, 'value))
  48 + }
  49 +
  50 + override def mapValueStream[U](fn: Iterator[R] => Iterator[U]): KeyedList[K,U] = {
  51 + new CoGrouped2[K,V,W,U](left, right, {(k,vit,wit) => fn(joiner(k,vit,wit))})
  52 + }
  53 +}
  54 +
  55 +class Joiner2[K,V,W,R](leftGetter : Iterator[CTuple] => Iterator[V],
  56 + rightGetter: Iterator[CTuple] => Iterator[W],
  57 + joiner: (K, Iterator[V], () => Iterator[W]) => Iterator[R]) extends CJoiner {
  58 +
  59 + import Joiner._
  60 +
  61 + override def getIterator(jc: JoinerClosure) = {
  62 + // The left one cannot be iterated multiple times on Hadoop:
  63 + val (lkopt, left) = getKeyValue[K](jc.getIterator(0))
  64 + // It's safe to call getIterator more than once on index > 0
  65 + val (rkopt, _) = getKeyValue[K](jc.getIterator(1))
  66 + // Try to get from the right-hand-side
  67 + val goodKey = lkopt.orElse(rkopt).get
  68 +
  69 + val rightG = { () => rightGetter(jc.getIterator(1).asScala.map { Dsl.tupleAt(1) }) }
  70 +
  71 + joiner(goodKey, leftGetter(left), rightG).map { rval =>
  72 + // There always has to be four resulting fields
  73 + // or otherwise the flow planner will throw
  74 + val res = CTuple.size(4)
  75 + res.set(0, goodKey)
  76 + res.set(1, rval)
  77 + res
  78 + }.asJava
  79 + }
  80 + override val numJoins = 1
  81 +}
60 src/main/scala/com/twitter/scalding/typed/Joiner.scala
... ... @@ -0,0 +1,60 @@
  1 +/*
  2 +Copyright 2012 Twitter, Inc.
  3 +
  4 +Licensed under the Apache License, Version 2.0 (the "License");
  5 +you may not use this file except in compliance with the License.
  6 +You may obtain a copy of the License at
  7 +
  8 +http://www.apache.org/licenses/LICENSE-2.0
  9 +
  10 +Unless required by applicable law or agreed to in writing, software
  11 +distributed under the License is distributed on an "AS IS" BASIS,
  12 +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 +See the License for the specific language governing permissions and
  14 +limitations under the License.
  15 +*/
  16 +package com.twitter.scalding.typed
  17 +
  18 +import cascading.tuple.{Tuple => CTuple}
  19 +
  20 +import com.twitter.scalding._
  21 +
  22 +import scala.collection.JavaConverters._
  23 +
  24 +object Joiner extends java.io.Serializable {
  25 + // Returns the key from the FIRST tuple. Suitable for a single JoinerClosure
  26 + def getKeyValue[K](tupit: java.util.Iterator[CTuple]): (Option[K], Iterator[CTuple]) = {
  27 + val stupit = tupit.asScala
  28 + if (stupit.isEmpty) {
  29 + (None, stupit)
  30 + }
  31 + else {
  32 + val first = stupit.next
  33 + val key = Some(first.getObject(0).asInstanceOf[K])
  34 + val value = Iterator(Dsl.tupleAt(1)(first))
  35 + (key, value ++ stupit.map { Dsl.tupleAt(1) })
  36 + }
  37 + }
  38 +
  39 + def inner2[K,V,U] = { (key: K, itv: Iterator[V], itu: () => Iterator[U]) =>
  40 + itv.flatMap { v => itu().map { u => (v,u) } }
  41 + }
  42 + def asOuter[U](it : Iterator[U]) : Iterator[Option[U]] = {
  43 + if(it.isEmpty) {
  44 + Iterator(None)
  45 + }
  46 + else {
  47 + it.map { Some(_) }
  48 + }
  49 + }
  50 + def outer2[K,V,U] = { (key: K, itv: Iterator[V], itu: () => Iterator[U]) =>
  51 + asOuter(itv).flatMap { v => asOuter(itu()).map { u => (v,u) } }
  52 + }
  53 + def left2[K,V,U] = { (key: K, itv: Iterator[V], itu: () => Iterator[U]) =>
  54 + itv.flatMap { v => asOuter(itu()).map { u => (v,u) } }
  55 + }
  56 + def right2[K,V,U] = { (key: K, itv: Iterator[V], itu: () => Iterator[U]) =>
  57 + asOuter(itv).flatMap { v => itu().map { u => (v,u) } }
  58 + }
  59 + // TODO: implement CoGroup3, and inner3, outer3 (probably best to leave the other modes as custom)
  60 +}
2  src/test/scala/com/twitter/scalding/TypedPipeTest.scala
@@ -130,7 +130,7 @@ class TypedPipeJoinCountTest extends Specification {
130 130 noDetailedDiffs() //Fixes an issue with scala 2.9
131 131 import Dsl._
132 132 "A TJoinCountJob" should {
133   - JobTest("com.twitter.scalding.TJoinCountJob")
  133 + JobTest(new com.twitter.scalding.TJoinCountJob(_))
134 134 .source(Tsv("in0",(0,1)), List((0,1),(0,2),(1,1),(1,5),(2,10)))
135 135 .source(Tsv("in1",(0,1)), List((0,10),(1,20),(1,10),(1,30)))
136 136 .sink[(Int,Long)](Tsv("out")) { outbuf =>

Tip: You can add notes to lines in a file. Hover to the left of a line to make a note

Something went wrong with that request. Please try again.