Permalink
Browse files

Merge pull request #98 from johnynek/feature/tpipe-stream

Feature/tpipe stream
  • Loading branch information...
2 parents 4341a90 + 6bc1239 commit 3e20d9242740e439bf884833df5558ff16d366b5 @azymnis azymnis committed Jun 12, 2012
@@ -121,7 +121,7 @@ class TypedPipe[T](inpipe : Pipe, fields : Fields, flatMapFn : (TupleEntry) => I
// TODO due to type erasure, I'm fairly sure this is not using the primitive TupleGetters
// Note, lazy val pipe returns a single count tuple with an object of type T in position 0
val gpipe = pipe.mapTo(0 -> ('key, 'value)) { (t : T) => (g(t), t)}
- new Grouped[K,T](gpipe, ord, None)
+ Grouped.fromKVPipe(gpipe, ord)
}
def ++[U >: T](other : TypedPipe[U]) : TypedPipe[U] = {
TypedPipe.from(pipe ++ other.pipe, 0)(singleConverter[U])
@@ -161,14 +161,20 @@ class MappedOrdering[B,T](fn : (T) => B, ord : Ordering[B])
trait KeyedList[K,T] {
// These are the fundamental operations
def toTypedPipe : TypedPipe[(K,T)]
+ /** Operate on a Stream[T] of all the values for each key at one time.
+ * Avoid accumulating the whole list in memory if you can. Prefer reduce.
+ */
+ def mapValueStream[V](smfn : Iterable[T] => Iterable[V]) : KeyedList[K,V]
+ /** This is a special case of mapValueStream, but can be optimized because it doesn't need
+ * all the values for a given key at once. An unoptimized implementation would be:
+ * mapValueStream { _.map { fn } }
+ * but for Grouped we can avoid resorting to mapValueStream
+ */
def mapValues[V](fn : T => V) : KeyedList[K,V]
- def reduce(fn : (T,T) => T) : TypedPipe[(K,T)]
- // TODO these can be unified with mapValueStream
- // def mapValueStream[V](Iterable[T] => Iterable[V]) : KeyedList[K,V]
- // foldLeft = mapValueStream { _.foldLeft( )( ) }
- def foldLeft[B](z : B)(fn : (B,T) => B) : TypedPipe[(K,B)]
- // scanLeft = mapValueStream { _.scanLeft( )( ) }
- def scanLeft[B](z : B)(fn : (B,T) => B) : TypedPipe[(K,B)]
+ /** reduce with fn which must be associative and commutative.
+ * Like the above this can be optimized in some Grouped cases.
+ */
+ def reduce(fn : (T,T) => T) : TypedPipe[(K,T)] = reduceLeft(fn)
// The rest of these methods are derived from above
def sum(implicit monoid : Monoid[T]) = reduce(monoid.plus)
@@ -179,6 +185,29 @@ trait KeyedList[K,T] {
def forall(fn : T => Boolean) : TypedPipe[(K,Boolean)] = {
mapValues { fn(_) }.product
}
+ def foldLeft[B](z : B)(fn : (B,T) => B) : TypedPipe[(K,B)] = {
+ mapValueStream { stream => Some(stream.foldLeft(z)(fn)) }
+ .toTypedPipe
+ }
+ def scanLeft[B](z : B)(fn : (B,T) => B) : KeyedList[K,B] = {
+ mapValueStream { _.scanLeft(z)(fn) }
+ }
+ // Similar to reduce but always on the reduce-side (never optimized to mapside),
+ // and named for the scala function. fn need not be associative and/or commutative.
+ // Makes sense when you want to reduce, but in a particular sorted order.
+ // the old value comes in on the left.
+ def reduceLeft( fn : (T,T) => T) : TypedPipe[(K,T)] = {
+ mapValueStream[T] { stream =>
+ if (stream.isEmpty) {
+ // We have to guard this case, as cascading seems to give empty streams on occasions
+ None
+ }
+ else {
+ Some(stream.reduceLeft(fn))
+ }
+ }
+ .toTypedPipe
+ }
def size : TypedPipe[(K,Long)] = mapValues { x => 1L }.sum
def toList : TypedPipe[(K,List[T])] = mapValues { List(_) }.sum
def toSet : TypedPipe[(K,Set[T])] = mapValues { Set(_) }.sum
@@ -196,65 +225,109 @@ trait KeyedList[K,T] {
}
}
+object Grouped {
+ // Make a new Grouped from a pipe with two fields: 'key, 'value
+ def fromKVPipe[K,V](pipe : Pipe, ordering : Ordering[K])
+ (implicit conv : TupleConverter[V]) : Grouped[K,V] = {
+ new Grouped[K,V](pipe, ordering, None, None, -1)
+ }
+ def valueSorting[T](implicit ord : Ordering[T]) : Fields = sorting("value", ord)
+
+ def sorting[T](key : String, ord : Ordering[T]) : Fields = {
+ val f = new Fields(key)
+ f.setComparator(key, ord)
+ f
+ }
+}
/** Represents a grouping which is the transition from map to reduce phase in hadoop.
* Grouping is on a key of type K by ordering Ordering[K].
*/
-class Grouped[K,T](val pipe : Pipe, ordering : Ordering[K], sortfn : Option[Ordering[T]] = None,
- reducers : Int = -1) extends KeyedList[K,T] with Serializable {
+class Grouped[K,T](val pipe : Pipe, ordering : Ordering[K],
+ streamMapFn : Option[(Iterable[TupleEntry]) => Iterable[T]],
+ valueSort : Option[(Fields,Boolean)],
+ reducers : Int = -1)
+ extends KeyedList[K,T] with Serializable {
import Dsl._
- protected val groupKey = {
- val f = new Fields("key")
- f.setComparator("key", ordering)
- f
- }
+ protected val groupKey = Grouped.sorting("key", ordering)
+
protected def sortIfNeeded(gb : GroupBuilder) : GroupBuilder = {
- sortfn.map { cmp =>
- val f = new Fields("value")
- f.setComparator("value", cmp)
- gb.sortBy(f)
+ valueSort.map { fb =>
+ val gbSorted = gb.sortBy(fb._1)
+ if (fb._2) gbSorted.reverse else gbSorted
}.getOrElse(gb)
}
- // Here only for KeyedList, probably never useful
- def toTypedPipe : TypedPipe[(K,T)] = {
- TypedPipe.from(pipe, ('key, 'value))(implicitly[TupleConverter[(K,T)]])
- }
- def mapValues[V](fn : T => V) : Grouped[K,V] = {
- new Grouped(pipe.map('value -> 'value)(fn)(singleConverter[T], SingleSetter),
- ordering, None, reducers)
- }
def withSortOrdering(so : Ordering[T]) : Grouped[K,T] = {
- new Grouped[K,T](pipe, ordering, Some(so), reducers)
+ // Set the sorting with unreversed
+ assert(valueSort.isEmpty, "Can only call withSortOrdering once")
+ assert(streamMapFn.isEmpty, "Cannot sort after a mapValueStream")
+ val newValueSort = Some(Grouped.valueSorting(so)).map { f => (f,false) }
+ new Grouped(pipe, ordering, None, newValueSort, reducers)
}
def withReducers(red : Int) : Grouped[K,T] = {
- new Grouped[K,T](pipe, ordering, sortfn, red)
+ new Grouped(pipe, ordering, streamMapFn, valueSort, red)
}
def sortBy[B](fn : (T) => B)(implicit ord : Ordering[B]) : Grouped[K,T] = {
withSortOrdering(new MappedOrdering(fn, ord))
}
def sortWith(lt : (T,T) => Boolean) : Grouped[K,T] = {
withSortOrdering(new LtOrdering(lt))
}
- def reverse : Grouped[K,T] = new Grouped(pipe, ordering, sortfn.map { _.reverse }, reducers)
+ def reverse : Grouped[K,T] = {
+ assert(streamMapFn.isEmpty, "Cannot reverse after mapValueStream")
+ val newValueSort = valueSort.map { f => (f._1, !(f._2)) }
+ new Grouped(pipe, ordering, None, newValueSort, reducers)
+ }
protected def operate[T1](fn : GroupBuilder => GroupBuilder) : TypedPipe[(K,T1)] = {
val reducedPipe = pipe.groupBy(groupKey) { gb =>
fn(sortIfNeeded(gb)).reducers(reducers)
}
TypedPipe.from(reducedPipe, ('key, 'value))(implicitly[TupleConverter[(K,T1)]])
}
-
- // If there is no ordering, this operation is pushed map-side
- def reduce(fn : (T,T) => T) : TypedPipe[(K,T)] = {
- operate[T] { _.reduce[T]('value -> 'value)(fn)(SingleSetter, singleConverter[T]) }
+ // Here are the required KeyedList methods:
+ override lazy val toTypedPipe : TypedPipe[(K,T)] = {
+ if (streamMapFn.isEmpty) {
+ // There was no reduce AND no mapValueStream, no need to groupBy:
+ TypedPipe.from(pipe, ('key, 'value))(implicitly[TupleConverter[(K,T)]])
+ }
+ else {
+ //Actually execute the mapValueStream:
+ operate[T] { _.mapStream[TupleEntry,T]('value -> 'value)(streamMapFn.get)(TupleEntryConverter,SingleSetter)
+ }
+ }
}
- // Ordered traversal of the data
- def foldLeft[B](z : B)(fn : (B,T) => B) : TypedPipe[(K,B)] = {
- operate[B] { _.foldLeft[B,T]('value -> 'value)(z)(fn)(SingleSetter, singleConverter[T]) }
+ override def mapValues[V](fn : T => V) : Grouped[K,V] = {
+ if(valueSort.isEmpty && streamMapFn.isEmpty) {
+ // We have no sort defined yet, so we should operate on the pipe so we can sort by V after
+ // if we need to:
+ new Grouped(pipe.map('value -> 'value)(fn)(singleConverter[T], SingleSetter),
+ ordering, None, None, reducers)
+ }
+ else {
+ // There is a sorting, which invalidates map-side optimizations,
+ // so we might as well use mapValueStream
+ mapValueStream { iter => iter.map { fn } }
+ }
}
-
- def scanLeft[B](z : B)(fn : (B,T) => B) : TypedPipe[(K,B)] = {
- operate[B] { _.scanLeft[B,T]('value -> 'value)(z)(fn)(SingleSetter, singleConverter[T]) }
+ // If there is no ordering, this operation is pushed map-side
+ override def reduce(fn : (T,T) => T) : TypedPipe[(K,T)] = {
+ if(valueSort.isEmpty && streamMapFn.isEmpty) {
+ // We can optimize mapside:
+ operate[T] { _.reduce[T]('value -> 'value)(fn)(SingleSetter, singleConverter[T]) }
+ }
+ else {
+ // Just fall back to the mapValueStream based implementation:
+ reduceLeft(fn)
+ }
+ }
+ override def mapValueStream[V](nmf : Iterable[T] => Iterable[V]) : Grouped[K,V] = {
+ val tconv = singleConverter[T]
+ val newStreamMapFn = streamMapFn.map { _.andThen(nmf) }.orElse {
+ // Set up the initial stream mapping:
+ Some({(tei : Iterable[TupleEntry]) => nmf(tei.map { te => tconv(te) })})
+ }
+ new Grouped[K,V](pipe, ordering, newStreamMapFn, valueSort, reducers)
}
// SMALLER PIPE ALWAYS ON THE RIGHT!!!!!!!
def join[W](smaller : Grouped[K,W]) = new InnerCoGrouped2[K,T,W](this, smaller)
@@ -271,7 +344,8 @@ class Grouped[K,T](val pipe : Pipe, ordering : Ordering[K], sortfn : Option[Orde
*/
class CoGrouped2[K,V,W,Result]
(bigger : Grouped[K,V], bigMode : JoinMode, smaller : Grouped[K,W], smallMode : JoinMode,
- conv : ((V,W)) => Result, reducers : Int = -1)
+ conv : ((V,W)) => Result, streamMapFn : Option[(Iterable[(V,W)]) => Iterable[Result]],
+ reducers : Int = -1)
extends KeyedList[K,Result] with Serializable {
import Dsl._
@@ -293,60 +367,69 @@ class CoGrouped2[K,V,W,Result]
val rsmaller = smaller.pipe.rename(('key, 'value) -> ('key2, 'value2))
val newPipe = bigger.pipe.coGroupBy('key, bigMode) { gb =>
op(gb.coGroup('key2, rsmaller, smallMode)).reducers(reducers)
- }
+ }.project(resultFields)
new TypedPipe[(K,B)](newPipe, resultFields, { te =>
val tup = te.getTuple
Some(nonNullKey(tup), finish(tup))
})
}
// If you don't reduce, this should be an implicit CoGrouped => TypedPipe
- def toTypedPipe : TypedPipe[(K,Result)] = {
- operate({ gb => gb },
- ('key, 'key2, 'value, 'value2),
- {tup : Tuple => conv((tup.getObject(2).asInstanceOf[V], tup.getObject(3).asInstanceOf[W]))})
- }
- def mapValues[B]( f : (Result) => B) : KeyedList[K,B] = {
- new CoGrouped2[K,V,W,B](bigger, bigMode, smaller,
- smallMode, conv.andThen(f), reducers)
- }
- override def reduce(f : (Result,Result) => Result) : TypedPipe[(K,Result)] = {
- operate({ gb =>
- gb.mapReduceMap(('value, 'value2) -> ('result))(conv)(f)(identity
- _)(implicitly[TupleConverter[(V,W)]], SingleSetter, singleConverter[Result],SingleSetter)
- },
- ('key, 'key2, 'result),
- {(tup : Tuple) => tup.getObject(2).asInstanceOf[Result]})
- }
- def foldLeft[B](z : B)(f : (B,Result) => B) : TypedPipe[(K,B)] = {
- def newFoldFn(old : B, data : (V,W)) : B = f(old, conv(data))
- operate({gb => gb.foldLeft(('value,'value2)->('valueb))(z)(newFoldFn _)},
- ('key, 'key2, 'valueb),
- {tup : Tuple => tup.getObject(2).asInstanceOf[B]})
- }
- def scanLeft[B](z : B)(f : (B,Result) => B) : TypedPipe[(K,B)] = {
- def newFoldFn(old : B, data : (V,W)) : B = f(old, conv(data))
- operate({gb => gb.scanLeft(('value,'value2)->('valueb))(z)(newFoldFn _)},
- ('key, 'key2, 'valueb),
- {tup : Tuple => tup.getObject(2).asInstanceOf[B]})
+ override lazy val toTypedPipe : TypedPipe[(K,Result)] = {
+ if (streamMapFn.isEmpty) {
+ // This is the case of no reduce or mapValueStream after the join (common case)
+ operate({ gb => gb },
+ ('key, 'key2, 'value, 'value2),
+ {tup : Tuple => conv((tup.getObject(2).asInstanceOf[V], tup.getObject(3).asInstanceOf[W]))})
+ }
+ else {
+ // We do the mapStream. NOTE conv must be null if we get here
+ assert(conv == null, "Cannot have non-null conv and streamMapFn")
+ operate({gb => gb.mapStream(('value,'value2)->('valuer))(streamMapFn.get)},
+ ('key, 'key2, 'valuer),
+ {tup : Tuple => tup.getObject(2).asInstanceOf[Result]})
+ }
+ }
+ override def mapValues[B]( f : (Result) => B) : KeyedList[K,B] = {
+ if (streamMapFn.isEmpty) {
+ // We can just chain the conv function from (V,W) => Result => B
+ new CoGrouped2[K,V,W,B](bigger, bigMode, smaller, smallMode,
+ conv.andThen(f), None, reducers)
+ }
+ else {
+ // we might as well use mapValueStream
+ mapValueStream { iter => iter.map { f } }
+ }
+ }
+ override def mapValueStream[U](f : Iterable[Result] => Iterable[U]) : KeyedList[K,U] = {
+ val newStreamMapFn = streamMapFn.map {
+ // If it is defined, just chain in f after what we already have
+ _.andThen { f }
+ }
+ .orElse {
+ // This the first mapValueStream call, need to setup initial streamMapFn:
+ Some( (vwit : Iterable[(V,W)]) => f(vwit.map { conv }) )
+ }
+ new CoGrouped2[K,V,W,U](bigger, bigMode, smaller, smallMode,
+ null, newStreamMapFn, reducers)
}
def withReducers(red : Int) : CoGrouped2[K,V,W,Result] = {
- new CoGrouped2(bigger, bigMode, smaller, smallMode, conv, red)
+ new CoGrouped2(bigger, bigMode, smaller, smallMode, conv, streamMapFn, red)
}
}
class InnerCoGrouped2[K,V,W](bigger : Grouped[K,V], smaller : Grouped[K,W])
extends CoGrouped2[K,V,W,(V,W)](bigger, InnerJoinMode, smaller, InnerJoinMode,
- { in : (V,W) => in })
+ { in : (V,W) => in }, None)
class LeftCoGrouped2[K,V,W](bigger : Grouped[K,V], smaller : Grouped[K,W])
extends CoGrouped2[K,V,W,(V,Option[W])](bigger, InnerJoinMode, smaller, OuterJoinMode,
- { in : (V,W) => (in._1, Option(in._2))})
+ { in : (V,W) => (in._1, Option(in._2))}, None)
class RightCoGrouped2[K,V,W](bigger : Grouped[K,V], smaller : Grouped[K,W])
extends CoGrouped2[K,V,W,(Option[V],W)](bigger, OuterJoinMode, smaller, InnerJoinMode,
- { in : (V,W) => (Option(in._1), in._2)})
+ { in : (V,W) => (Option(in._1), in._2)}, None)
class OuterCoGrouped2[K,V,W](bigger : Grouped[K,V], smaller : Grouped[K,W])
extends CoGrouped2[K,V,W,(Option[V],Option[W])](bigger, OuterJoinMode, smaller, OuterJoinMode,
- { in : (V,W) => (Option(in._1), Option(in._2))})
+ { in : (V,W) => (Option(in._1), Option(in._2))}, None)
Oops, something went wrong.

0 comments on commit 3e20d92

Please sign in to comment.