Skip to content

Commit

Permalink
Merge pull request #975 from twitter/fix_typed_merge
Browse files Browse the repository at this point in the history
Generalize handling of merged TypedPipes
  • Loading branch information
jcoveney committed Jul 27, 2014
2 parents 65364be + 89af4b1 commit 4edd134
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 8 deletions.
Expand Up @@ -696,16 +696,39 @@ final case class MergedTypedPipe[T](left: TypedPipe[T], right: TypedPipe[T]) ext
override def fork: TypedPipe[T] =
MergedTypedPipe(left.fork, right.fork)

@annotation.tailrec
private def flattenMerge(toFlatten: List[TypedPipe[T]], acc: List[TypedPipe[T]])(implicit fd: FlowDef, m: Mode): List[TypedPipe[T]] =
toFlatten match {
case MergedTypedPipe(l, r) :: rest => flattenMerge(l :: r :: rest, acc)
case TypedPipeFactory(next) :: rest => flattenMerge(next(fd, m) :: rest, acc)
case nonmerge :: rest => flattenMerge(rest, nonmerge :: acc)
case Nil => acc
}

override def toPipe[U >: T](fieldNames: Fields)(implicit flowDef: FlowDef, mode: Mode, setter: TupleSetter[U]): Pipe = {
// TODO this is easy to generalize correctly: flatten children merges to a Seq[TypedPipe[T]],
// groupBy(identity), then flatMap { t => Iterator.fill(grp.size)(t) }
if (left == right) {
//use map:
left.flatMap { t => List(t, t) }.toPipe[U](fieldNames)
/*
* Cascading can't handle duplicate pipes in merges. What we do here is see if any pipe appears
* multiple times and if it does we can do self merges using flatMap.
* Finally, if there is actually more than one distinct TypedPipe, we use the cascading
* merge primitive. When using the merge primitive we rename all pipes going into it as
* Cascading cannot handle multiple pipes with the same name.
*/
val merged = flattenMerge(List(this), Nil)
// check for repeated pipes
.groupBy(identity)
.mapValues(_.size)
.map {
case (pipe, 1) => pipe
case (pipe, cnt) => pipe.flatMap(List.fill(cnt)(_).iterator)
}
.map(_.toPipe[U](fieldNames)(flowDef, mode, setter))
.toList

if (merged.size == 1) {
// there is no actual merging here, no need to rename:
merged.head
} else {
import RichPipe.assignName
new cascading.pipe.Merge(assignName(left.toPipe[U](fieldNames)),
assignName(right.toPipe[U](fieldNames)))
new cascading.pipe.Merge(merged.map(RichPipe.assignName): _*)
}
}

Expand Down
Expand Up @@ -597,6 +597,7 @@ class TypedFlattenTest extends Specification {

class TypedMergeJob(args: Args) extends Job(args) {
val tp = TypedPipe.from(TypedTsv[String]("input"))
// This exercise a self merge
(tp ++ tp)
.write(TypedTsv[String]("output"))
(tp ++ (tp.map(_.reverse)))
Expand Down

0 comments on commit 4edd134

Please sign in to comment.