Permalink
Browse files

Allow forceToReducers to be called on Grouped.

  • Loading branch information...
1 parent 00a4c9c commit d9cf1bf03027bbe1ad707027e5a8d95dbdb2eb9f @azymnis azymnis committed Nov 22, 2012
@@ -262,7 +262,7 @@ object Grouped {
// Make a new Grouped from a pipe with two fields: 'key, 'value
def fromKVPipe[K,V](pipe : Pipe, ordering : Ordering[K])
(implicit conv : TupleConverter[V]) : Grouped[K,V] = {
- new Grouped[K,V](pipe, ordering, None, None, -1)
+ new Grouped[K,V](pipe, ordering, None, None, -1, false)
}
def valueSorting[T](implicit ord : Ordering[T]) : Fields = sorting("value", ord)
@@ -279,7 +279,8 @@ class Grouped[K,T] private (private[scalding] val pipe : Pipe,
val ordering : Ordering[K],
streamMapFn : Option[(Iterator[Tuple]) => Iterator[T]],
private[scalding] val valueSort : Option[(Fields,Boolean)],
- val reducers : Int = -1)
+ val reducers : Int = -1,
+ val toReducers: Boolean = false)
extends KeyedList[K,T] with Serializable {
import Dsl._
@@ -291,15 +292,17 @@ class Grouped[K,T] private (private[scalding] val pipe : Pipe,
if (fb._2) gbSorted.reverse else gbSorted
}.getOrElse(gb)
}
+ def forceToReducers: Grouped[K,T] =
+ new Grouped(pipe, ordering, streamMapFn, valueSort, reducers, true)
def withSortOrdering(so : Ordering[T]) : Grouped[K,T] = {
// Set the sorting with unreversed
assert(valueSort.isEmpty, "Can only call withSortOrdering once")
assert(streamMapFn.isEmpty, "Cannot sort after a mapValueStream")
val newValueSort = Some(Grouped.valueSorting(so)).map { f => (f,false) }
- new Grouped(pipe, ordering, None, newValueSort, reducers)
+ new Grouped(pipe, ordering, None, newValueSort, reducers, toReducers)
}
def withReducers(red : Int) : Grouped[K,T] = {
- new Grouped(pipe, ordering, streamMapFn, valueSort, red)
+ new Grouped(pipe, ordering, streamMapFn, valueSort, red, toReducers)
}
def sortBy[B](fn : (T) => B)(implicit ord : Ordering[B]) : Grouped[K,T] = {
withSortOrdering(new MappedOrdering(fn, ord))
@@ -318,12 +321,13 @@ class Grouped[K,T] private (private[scalding] val pipe : Pipe,
def reverse : Grouped[K,T] = {
assert(streamMapFn.isEmpty, "Cannot reverse after mapValueStream")
val newValueSort = valueSort.map { f => (f._1, !(f._2)) }
- new Grouped(pipe, ordering, None, newValueSort, reducers)
+ new Grouped(pipe, ordering, None, newValueSort, reducers, toReducers)
}
protected def operate[T1](fn : GroupBuilder => GroupBuilder) : TypedPipe[(K,T1)] = {
val reducedPipe = pipe.groupBy(groupKey) { gb =>
- fn(sortIfNeeded(gb)).reducers(reducers)
+ val out = fn(sortIfNeeded(gb)).reducers(reducers)
+ if(toReducers) out.forceToReducers else out
}
TypedPipe.from(reducedPipe, ('key, 'value))(implicitly[TupleConverter[(K,T1)]])
}
@@ -351,7 +355,7 @@ class Grouped[K,T] private (private[scalding] val pipe : Pipe,
// We have no sort defined yet, so we should operate on the pipe so we can sort by V after
// if we need to:
new Grouped(pipe.map('value -> 'value)(fn)(singleConverter[T], SingleSetter),
- ordering, None, None, reducers)
+ ordering, None, None, reducers, toReducers)
}
else {
// There is a sorting, which invalidates map-side optimizations,
@@ -378,7 +382,7 @@ class Grouped[K,T] private (private[scalding] val pipe : Pipe,
}
override def mapValueStream[V](nmf : Iterator[T] => Iterator[V]) : Grouped[K,V] = {
val newStreamMapFn = Some(streamMapping.andThen(nmf))
- new Grouped[K,V](pipe, ordering, newStreamMapFn, valueSort, reducers)
+ new Grouped[K,V](pipe, ordering, newStreamMapFn, valueSort, reducers, toReducers)
}
// SMALLER PIPE ALWAYS ON THE RIGHT!!!!!!!
def cogroup[W,R](smaller: Grouped[K,W])(joiner: (K, Iterator[T], Iterable[W]) => Iterator[R])
@@ -18,6 +18,7 @@ class TypedPipeJob(args : Args) extends Job(args) {
.forceToDisk
.group
.sum
+ .forceToReducers
.write(Tsv("outputFile"))
}
@@ -111,6 +112,7 @@ class TypedImplicitJob(args : Args) extends Job(args) {
.sum
.groupAll
.mapValues { revTup _ }
+ .forceToReducers
.max
// Throw out the Unit key and reverse the value tuple
.map { _._2 }
@@ -263,6 +265,7 @@ class TJoinWordCount(args : Args) extends Job(args) {
pipe.flatMap { _.split("\\s+").map(_.toLowerCase) }
.groupBy(identity)
.mapValueStream(input => Iterator(input.size))
+ .forceToReducers
}
val first = countWordsIn(TypedPipe.from(TextLine("in0")))

0 comments on commit d9cf1bf

Please sign in to comment.