Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Add a method to prepare for optimizing FlatMapKeys #305

Merged
merged 6 commits into from Oct 24, 2013
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -51,5 +51,16 @@ case class Dependants[P <: Platform[P]](tail: Producer[P, Any]) {
* Does not include itself
*/
def transitiveDependantsOf(p: Producer[P, Any]): List[Producer[P, Any]] =
depthFirstOf(p.asInstanceOf[Producer[P, Any]])(graph).toList
depthFirstOf(p)(graph).toList

/** Return all the dependants of this node, but don't go past any output
* nodes, such as Summer or WrittenProducer. This is for searching downstream
* until write/sum to see if certain optimizations can be enabled
*/
def transitiveDependantsTillOutput(p: Producer[P, Any]): List[Producer[P, Any]] = {
val neighborFn = { (p: Producer[P, Any]) =>
if(Producer.isOutput(p)) Iterable.empty else graph(p)
}
depthFirstOf(p)(neighborFn).toList
}
}
Expand Up @@ -35,9 +35,6 @@ object Producer {
p :: above
}

def retrieveSummer[P <: Platform[P]](paths: List[Producer[P, _]]): Option[Summer[P, _, _]] =
paths.collectFirst { case s: Summer[P, _, _] => s }

/**
* Begin from some base representation. An iterator for in-memory,
* for example.
Expand Down Expand Up @@ -74,6 +71,20 @@ object Producer {
}
}

/** Returns true if this node does not directly change the data (does not apply any transformation)
*/
def isNoOp[P <: Platform[P]](p: Producer[P, Any]): Boolean = p match {
case IdentityKeyedProducer(_) => true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style points for later: bunch up cases with case A(_) | B(_) => true

case NamedProducer(_, _) => true
case MergedProducer(_, _) => true
case AlsoProducer(_, _) => true
case _ => false
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove wildcard

}

def isOutput[P <: Platform[P]](p: Producer[P, Any]): Boolean = p match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TailProducer?

case Summer(_, _, _) | WrittenProducer(_, _) => true
case _ => false
}
/**
* Return all dependencies of a given node in depth first, left first order.
*/
Expand Down
Expand Up @@ -149,4 +149,43 @@ object DependantsTest extends Properties("Dependants") {
allNodes.toSet == sAndDown
}

property("transitiveDependantsTillOutput finds outputs as a subset of dependants") =
forAll { (prod: Producer[Memory, Any]) =>
val dependants = Dependants(prod)
dependants.nodes.forall { n =>
val output = dependants.transitiveDependantsTillOutput(n).collect {
case s@Summer(_,_,_) => s
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TailProducer

case w@WrittenProducer(_,_) => w
}.toSet[Producer[Memory, Any]]

(dependants.transitiveDependantsOf(n).toSet intersect output) == output
}
}

property("transitiveDependantsTillOutput is a subset of writers dependencies") =
forAll { (prod: Producer[Memory, Any]) =>
val dependants = Dependants(prod)
dependants.nodes.forall { n =>
val depTillWrite = dependants.transitiveDependantsTillOutput(n)
val writerDependencies = depTillWrite.collect {
case s@Summer(_,_,_) => s
case w@WrittenProducer(_,_) => w
}.flatMap { n => n :: Producer.transitiveDependenciesOf(n) }.toSet

writerDependencies.isEmpty || ((depTillWrite.toSet intersect writerDependencies) == depTillWrite.toSet)
}
}

property("transitiveDependantsTillOutput finds no children of outputs") = forAll { (prod: Producer[Memory, Any]) =>
val dependants = Dependants(prod)
dependants.nodes.forall { n =>
val tillWrite = dependants.transitiveDependantsTillOutput(n)
val outputChildren = tillWrite.collect {
case s@Summer(_,_,_) => s
case w@WrittenProducer(_,_) => w
}.flatMap { dependants.transitiveDependantsOf(_) }.toSet[Producer[Memory, Any]]
(tillWrite.toSet & outputChildren.toSet).size == 0
}
}

}
Expand Up @@ -39,24 +39,24 @@ object TopologyPlannerLaws extends Properties("Online Dag") {

implicit def testStore: Memory#Store[Int, Int] = MMap[Int, Int]()

implicit val arbSource1: Arbitrary[Producer[Memory, Int]] =
implicit val arbSource1: Arbitrary[Producer[Memory, Int]] =
Arbitrary(Gen.listOfN(100, Arbitrary.arbitrary[Int]).map{
x: List[Int] =>
Memory.toSource(x)})
implicit val arbSource2: Arbitrary[KeyedProducer[Memory, Int, Int]] =
implicit val arbSource2: Arbitrary[KeyedProducer[Memory, Int, Int]] =
Arbitrary(Gen.listOfN(100, Arbitrary.arbitrary[(Int, Int)]).map{
x: List[(Int, Int)] =>
x: List[(Int, Int)] =>
IdentityKeyedProducer(Memory.toSource(x))})


lazy val genDag : Gen[MemoryDag]= for {
tail <- summed
tail <- summed
} yield OnlinePlan(tail)

implicit def genProducer: Arbitrary[MemoryDag] = Arbitrary(genDag)



val testFn = { i: Int => List((i -> i)) }

def sample[T: Arbitrary]: T = Arbitrary.arbitrary[T].sample.get
Expand All @@ -71,16 +71,6 @@ object TopologyPlannerLaws extends Properties("Online Dag") {
dumpNumber = dumpNumber + 1
}

def isNoOpProducer(p: Producer[_, _]): Boolean = {
p match {
case IdentityKeyedProducer(_) => true
case NamedProducer(_, _) => true
case MergedProducer(_, _) => true
case AlsoProducer(_, _) => true
case _ => false
}
}

property("Dag Nodes must be unique") = forAll { (dag: MemoryDag) =>
dag.nodes.size == dag.nodes.toSet.size
}
Expand All @@ -93,7 +83,7 @@ object TopologyPlannerLaws extends Properties("Online Dag") {

property("If a MemoryNode contains a Summer, all other producers must be NOP's") = forAll { (dag: MemoryDag) =>
dag.nodes.forall{n =>
val producersWithoutNOP = n.members.filterNot(isNoOpProducer(_))
val producersWithoutNOP = n.members.filterNot(Producer.isNoOp(_))
val firstP = producersWithoutNOP.headOption
producersWithoutNOP.forall{p =>
val inError = (p.isInstanceOf[Summer[_, _, _]] && producersWithoutNOP.size != 1)
Expand Down Expand Up @@ -130,7 +120,7 @@ object TopologyPlannerLaws extends Properties("Online Dag") {
val allProducersSet = dag.nodes.foldLeft(Set[Producer[Memory, _]]()){(runningSet, n) => runningSet | n.members.toSet}
numAllProducers == allProducersSet.size
}


property("All producers are in a Node") = forAll { (dag: MemoryDag) =>
val allProducers = Producer.entireGraphOf(dag.tail).toSet + dag.tail
Expand All @@ -145,20 +135,20 @@ object TopologyPlannerLaws extends Properties("Online Dag") {
case _ => dag.dependenciesOf(n).size > 0
}
if(!success) dumpGraph(dag)
success
success
}
}


property("Sources must have no incoming dependencies, and they must have dependants") = forAll { (dag: MemoryDag) =>
dag.nodes.forall{n =>
val success = n match {
case _: SourceNode[_] =>
case _: SourceNode[_] =>
dag.dependenciesOf(n).size == 0 && dag.dependantsOf(n).size > 0
case _ => true
}
if(!success) dumpGraph(dag)
success
success
}
}

Expand All @@ -174,7 +164,7 @@ object TopologyPlannerLaws extends Properties("Online Dag") {
case _ => true
}
if(!success) dumpGraph(dag)
success
success
}
}

Expand Down
Expand Up @@ -226,8 +226,15 @@ object Scalding {
producer: Producer[Scalding, T],
id: Option[String],
fanOuts: Set[Producer[Scalding, _]],
dependants: Dependants[Scalding],
built: Map[Producer[Scalding, _], PipeFactory[_]]): (PipeFactory[T], Map[Producer[Scalding, _], PipeFactory[_]]) = {

def recurse[U](p: Producer[Scalding, U],
id: Option[String] = id,
built: Map[Producer[Scalding, _], PipeFactory[_]] = built):
(PipeFactory[U], Map[Producer[Scalding, _], PipeFactory[_]]) =
buildFlow(options, p, id, fanOuts, dependants, built)

/**
* The scalding Typed-API does not deal with TypedPipes with fanout,
* it just computes both branches twice. We call this function
Expand Down Expand Up @@ -265,18 +272,16 @@ object Scalding {
}
(srcPf, built)
}
case IdentityKeyedProducer(producer) =>
buildFlow(options, producer, id, fanOuts, built)
case NamedProducer(producer, newId) =>
buildFlow(options, producer, id = Some(newId), fanOuts, built)
case IdentityKeyedProducer(producer) => recurse(producer)
case NamedProducer(producer, newId) => recurse(producer, id = Some(newId))
case Summer(producer, store, monoid) =>
/*
* The store may already have materialized values, so we don't need the whole
* input history, but to produce NEW batches, we may need some input.
* So, we pass the full PipeFactory to to the store so it can request only
* the time ranges that it needs.
*/
val (in, m) = buildFlow(options, producer, id, fanOuts, built)
val (in, m) = recurse(producer)
val isCommutative = getOrElse(options, id, MonoidIsCommutative.default)
(store.merge(in, monoid,
isCommutative.commutativity,
Expand All @@ -287,24 +292,46 @@ object Scalding {
* join with, so we pass in the left PipeFactory so that the service
* can compute how wuch it can actually handle and only load that much
*/
val (pf, m) = buildFlow(options, left, id, fanOuts, built)
val (pf, m) = recurse(left)
(service.lookup(pf), m)
case WrittenProducer(producer, sink) =>
val (pf, m) = buildFlow(options, producer, id, fanOuts, built)
val (pf, m) = recurse(producer)
(sink.write(pf), m)
case OptionMappedProducer(producer, op) =>
// Map in two monads here, first state then reader
val (fmp, m) = buildFlow(options, producer, id, fanOuts, built)
val (fmp, m) = recurse(producer)
(fmp.map { flowP =>
flowP.map { typedPipe =>
typedPipe.flatMap { case (time, item) =>
op(item).map((time, _))
}
}
}, m)
case KeyFlatMappedProducer(producer, op) =>
case kfm@KeyFlatMappedProducer(producer, op) =>
/**
* If the following is true, it is safe to put a mapside reduce before this node:
* 1) there is only one downstream output, which is a Summer
* 2) there are only NoOp Producers between this node and the Summer
*/
/*
val downStream = dependants.transitiveDependantsTillOutput(kfm)
val prepare: TypedPipe[(Any,Any)] => TypedPipe[(Any,Any)] = downStream.collect(Producer.isOutput) match {
case List(s@Summer(_, _, monoid)) =>
if(downStream.iterator.filterNot(Producer.isNoOp).filterNot(_ != s).isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

downstream.forall(d => Producer.isNoOp(d) && d != s)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be completely general here, this should be a isDistributive check (maybe named slightly differently -- isCommutativelyDistributive?). The idea is that it expresses that summing the inputs doesn't affect the summing of the outputs when the monoid is commutative. NoOp producers satisfy this trivially, but so do FlatMapKeys producers. And if we found a (good enough) way to implement a DistributiveFlatMapValues producer, that would work too.

As a side benefit, perhaps we could argue that Summer nodes themselves are distributive? Then we could get rid of the != s part.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jnievelt nice way of thinking about this. I'll try to refactor along these lines when I pick this back up this week.

//only downstream are no-ops and the summer GO!
// we know the types are right, by construction, but we have lost them here:
// TODO, add the batchID to the key, and pivot time into value
// check if the monoid is commutative
{ (kv: TypedPipe[(Any,Any)]) => mapsideReduce(kv)(monoid) }
}
else
indentity[TypedPipe[(Any,Any)]]
case _ =>
indentity[TypedPipe[(Any,Any)]]
}*/

// Map in two monads here, first state then reader
val (fmp, m) = buildFlow(options, producer, id, fanOuts, built)
val (fmp, m) = recurse(producer)
(fmp.map { flowP =>
flowP.map { typedPipe =>
typedPipe.flatMap { case (time, (k, v)) =>
Expand All @@ -314,7 +341,7 @@ object Scalding {
}, m)
case FlatMappedProducer(producer, op) =>
// Map in two monads here, first state then reader
val (fmp, m) = buildFlow(options, producer, id, fanOuts, built)
val (fmp, m) = recurse(producer)
(fmp.map { flowP =>
flowP.map { typedPipe =>
typedPipe.flatMap { case (time, item) =>
Expand All @@ -323,8 +350,8 @@ object Scalding {
}
}, m)
case MergedProducer(l, r) => {
val (pfl, ml) = buildFlow(options, l, id, fanOuts, built)
val (pfr, mr) = buildFlow(options, r, id, fanOuts, ml)
val (pfl, ml) = recurse(l)
val (pfr, mr) = recurse(r, built = ml)
val merged = for {
// concatenate errors (++) and find the intersection (&&) of times
leftAndRight <- pfl.join(pfr,
Expand All @@ -340,8 +367,8 @@ object Scalding {
* left could do to be consistent with the rest of this code.
*/
case AlsoProducer(l, r) => {
val (pfl, ml) = buildFlow(options, l, id, fanOuts, built)
val (pfr, mr) = buildFlow(options, r, id, fanOuts, ml)
val (pfl, ml) = recurse(l)
val (pfr, mr) = recurse(r, built = ml)
val onlyRight = for {
// concatenate errors (++) and find the intersection (&&) of times
leftAndRight <- pfl.join(pfr,
Expand All @@ -364,7 +391,7 @@ object Scalding {
val fanOutSet =
dep.nodes
.filter(dep.fanOut(_).exists(_ > 1)).toSet
buildFlow(options, prod, None, fanOutSet, Map.empty)._1
buildFlow(options, prod, None, fanOutSet, dep, Map.empty)._1
}

def plan[T](options: Map[String, Options], prod: TailProducer[Scalding, T]): PipeFactory[T] = {
Expand Down