Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Commit

Permalink
Incorporate review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
johnynek committed Oct 23, 2013
1 parent f1990de commit 3937cb8
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 8 deletions.
Expand Up @@ -58,8 +58,10 @@ case class Dependants[P <: Platform[P]](tail: Producer[P, Any]) {
* until write/sum to see if certain optimizations can be enabled
*/
def transitiveDependantsTillOutput(p: Producer[P, Any]): List[Producer[P, Any]] = {
val neighborFn = { (p: Producer[P, Any]) =>
if(Producer.isOutput(p)) Iterable.empty else graph(p)
val neighborFn = { (p: Producer[P, Any]) => p match {
case t: TailProducer[P, Any] => Iterable.empty // all legit writes are tails

This comment has been minimized.

Copy link
@ryanlecompte

ryanlecompte Oct 23, 2013

Contributor

Nothing to change right now, but this will generate warnings on 2.10 due to type erasure. :) It's jumping out at me because I spent all day changing these to be, for example, case t: TailProducer[_,_] => ... - we are in the process of moving from 2.9.3 to 2.10 :)

case _ => graph(p)
}
}
depthFirstOf(p)(neighborFn).toList
}
Expand Down
Expand Up @@ -78,7 +78,14 @@ object Producer {
case NamedProducer(_, _) => true
case MergedProducer(_, _) => true
case AlsoProducer(_, _) => true
case _ => false
// The rest do something
case Source(_) => false
case OptionMappedProducer(_, _) => false
case FlatMappedProducer(_, _) => false
case KeyFlatMappedProducer(_, _) => false
case WrittenProducer(_, _) => false
case LeftJoinedProducer(_, _) => false
case Summer(_, _, _) => false
}

def isOutput[P <: Platform[P]](p: Producer[P, Any]): Boolean = p match {
Expand Down
Expand Up @@ -154,8 +154,7 @@ object DependantsTest extends Properties("Dependants") {
val dependants = Dependants(prod)
dependants.nodes.forall { n =>
val output = dependants.transitiveDependantsTillOutput(n).collect {
case s@Summer(_,_,_) => s
case w@WrittenProducer(_,_) => w
case t: TailProducer[Memory,Any] => t
}.toSet[Producer[Memory, Any]]

(dependants.transitiveDependantsOf(n).toSet intersect output) == output
Expand All @@ -168,8 +167,7 @@ object DependantsTest extends Properties("Dependants") {
dependants.nodes.forall { n =>
val depTillWrite = dependants.transitiveDependantsTillOutput(n)
val writerDependencies = depTillWrite.collect {
case s@Summer(_,_,_) => s
case w@WrittenProducer(_,_) => w
case t: TailProducer[Memory,Any] => t
}.flatMap { n => n :: Producer.transitiveDependenciesOf(n) }.toSet

writerDependencies.isEmpty || ((depTillWrite.toSet intersect writerDependencies) == depTillWrite.toSet)
Expand Down
Expand Up @@ -317,7 +317,7 @@ object Scalding {
val downStream = dependants.transitiveDependantsTillOutput(kfm)
val prepare: TypedPipe[(Any,Any)] => TypedPipe[(Any,Any)] = downStream.collect(Producer.isOutput) match {
case List(s@Summer(_, _, monoid)) =>
if(downStream.iterator.filterNot(Producer.isNoOp).filterNot(_ != s).isEmpty) {
if(downstream.forall(d => Producer.isNoOp(d) && d != s)) {
//only downstream are no-ops and the summer GO!
// we know the types are right, by construction, but we have lost them here:
// TODO, add the batchID to the key, and pivot time into value
Expand Down

0 comments on commit 3937cb8

Please sign in to comment.