Skip to content

Commit

Permalink
Merge 4ea3239 into cdaf2be
Browse files Browse the repository at this point in the history
  • Loading branch information
Adam Poswolsky committed May 21, 2015
2 parents cdaf2be + 4ea3239 commit d72c482
Show file tree
Hide file tree
Showing 9 changed files with 312 additions and 30 deletions.
Expand Up @@ -328,6 +328,9 @@ object Config {
/** Whether the number of reducers has been set explicitly using a `withReducers` */
val WithReducersSetExplicitly = "scalding.with.reducers.set.explicitly"

/** Manual description for use in .dot and MR step names set using a `withDescription`. */
val WithDescriptionSetExplicitly = "scalding.with.description.set.explicitly"

val empty: Config = Config(Map.empty)

/*
Expand Down
Expand Up @@ -15,9 +15,14 @@ limitations under the License.
*/
package com.twitter.scalding

import cascading.flow.hadoop.HadoopFlow
import cascading.flow.planner.BaseFlowStep
import cascading.flow.{ FlowDef, Flow }
import cascading.pipe.Pipe
import com.twitter.scalding.reducer_estimation.ReducerEstimatorStepStrategy
import com.twitter.scalding.serialization.CascadingBinaryComparator
import org.apache.hadoop.mapred.JobConf
import scala.collection.JavaConverters._
import scala.concurrent.Future
import scala.util.{ Failure, Success, Try }

Expand All @@ -31,6 +36,38 @@ trait ExecutionContext {
def flowDef: FlowDef
def mode: Mode

private def getIdentifierOpt(descriptions: Seq[String]): Option[String] = {
if (descriptions.nonEmpty) Some(descriptions.distinct.mkString(", ")) else None
}

private def updateJobName(step: BaseFlowStep[JobConf], descriptions: Seq[String]): Unit = {
val conf = step.getConfig
getIdentifierOpt(getDesc(step)).foreach(id => {
val newJobName = "%s %s".format(conf.getJobName, id)
println("Added descriptions to job name: %s".format(newJobName))
conf.setJobName(newJobName)
})
}

private def updateFlowStepName(step: BaseFlowStep[JobConf], descriptions: Seq[String]): BaseFlowStep[JobConf] = {
getIdentifierOpt(descriptions).foreach(newIdentifier => {
val stepXofYData = """\(\d+/\d+\)""".r.findFirstIn(step.getName).getOrElse("")
// Reflection is only temporary. Latest cascading has setName public: https://github.com/cwensel/cascading/commit/487a6e9ef#diff-0feab84bc8832b2a39312dbd208e3e69L175
// https://github.com/twitter/scalding/issues/1294
val x = classOf[BaseFlowStep[JobConf]].getDeclaredMethod("setName", classOf[String])
x.setAccessible(true)
x.invoke(step, "%s %s".format(stepXofYData, newIdentifier))
})
step
}

private def getDesc(baseFlowStep: BaseFlowStep[JobConf]): Seq[String] = {
baseFlowStep.getGraph.vertexSet.asScala.toSeq.flatMap(_ match {
case pipe: Pipe => RichPipe.getPipeDescriptions(pipe)
case _ => List() // no descriptions
})
}

final def buildFlow: Try[Flow[_]] =
// For some horrible reason, using Try( ) instead of the below gets me stuck:
// [error]
Expand All @@ -52,6 +89,18 @@ trait ExecutionContext {
}
val flow = mode.newFlowConnector(withId).connect(flowDef)

flow match {
case hadoopFlow: HadoopFlow =>
val flowSteps = hadoopFlow.getFlowSteps.asScala
flowSteps.foreach(step => {
val baseFlowStep: BaseFlowStep[JobConf] = step.asInstanceOf[BaseFlowStep[JobConf]]
val descriptions = getDesc(baseFlowStep)
updateFlowStepName(baseFlowStep, descriptions)
updateJobName(baseFlowStep, descriptions)
})
case _ => // descriptions not yet supported in other modes
}

// if any reducer estimators have been set, register the step strategy
// which instantiates and runs them
mode match {
Expand Down
Expand Up @@ -70,6 +70,11 @@ class GroupBuilder(val groupFields: Fields) extends FoldOperations[GroupBuilder]
*/
private var numReducers: Option[Int] = None

/**
* Holds an optional user-specified description to be used in .dot and MR step names.
*/
private var descriptions: Seq[String] = Nil

/**
* Limit of number of keys held in SpillableTupleMap on an AggregateBy
*/
Expand All @@ -90,6 +95,14 @@ class GroupBuilder(val groupFields: Fields) extends FoldOperations[GroupBuilder]
this
}

/**
* Override the description to be used in .dot and MR step names.
*/
def setDescriptions(newDescriptions: Seq[String]) = {
descriptions = newDescriptions
this
}

/**
* Override the spill threshold on AggregateBy
*/
Expand All @@ -111,6 +124,10 @@ class GroupBuilder(val groupFields: Fields) extends FoldOperations[GroupBuilder]
numReducers.map { r => RichPipe.setReducers(p, r) }.getOrElse(p)
}

protected def overrideDescription(p: Pipe): Pipe = {
RichPipe.setPipeDescriptions(p, descriptions)
}

/**
* == Warning ==
* This may significantly reduce performance of your job.
Expand Down Expand Up @@ -268,6 +285,7 @@ class GroupBuilder(val groupFields: Fields) extends FoldOperations[GroupBuilder]
case Some(sf) => new GroupBy(name, in, groupFields, sf, isReversed)
}
overrideReducers(gb)
overrideDescription(gb)
gb
}

Expand All @@ -294,6 +312,7 @@ class GroupBuilder(val groupFields: Fields) extends FoldOperations[GroupBuilder]
redlist.reverse.toArray: _*)

overrideReducers(ag.getGroupBy())
overrideDescription(ag.getGroupBy())
ag
}
}
Expand Down
33 changes: 33 additions & 0 deletions scalding-core/src/main/scala/com/twitter/scalding/RichPipe.scala
Expand Up @@ -15,6 +15,7 @@ limitations under the License.
*/
package com.twitter.scalding

import cascading.property.ConfigDef.Getter
import cascading.pipe._
import cascading.flow._
import cascading.operation._
Expand Down Expand Up @@ -53,6 +54,38 @@ object RichPipe extends java.io.Serializable {
}
p
}

// A pipe can have more than one description when merged together, so we store them delimited with 254.toChar.
private def encodePipeDescriptions(descriptions: Seq[String]): String = {
descriptions.map(_.replace(254.toChar, ' ')).filter(_.nonEmpty).mkString(254.toChar.toString)
}

private def decodePipeDescriptions(encoding: String): Seq[String] = {
encoding.split(254.toChar).toSeq
}

def getPipeDescriptions(p: Pipe): Seq[String] = {
if (p.getStepConfigDef.isEmpty)
Nil
else {
val encodedResult = p.getStepConfigDef.apply(Config.WithDescriptionSetExplicitly, new Getter {
override def update(s: String, s1: String): String = ???
override def get(s: String): String = null
})
Some(encodedResult)
.filterNot(x => x == null || x.isEmpty)
.map(decodePipeDescriptions)
.getOrElse(Nil)
}
}

def setPipeDescriptions(p: Pipe, descriptions: Seq[String]): Pipe = {
p.getStepConfigDef().setProperty(
Config.WithDescriptionSetExplicitly,
encodePipeDescriptions(getPipeDescriptions(p) ++ descriptions))
p
}

}

/**
Expand Down
Expand Up @@ -58,7 +58,7 @@ object CoGroupable {
/**
* Represents something than can be CoGrouped with another CoGroupable
*/
trait CoGroupable[K, +R] extends HasReducers with java.io.Serializable {
trait CoGroupable[K, +R] extends HasReducers with HasDescription with java.io.Serializable {
/**
* This is the list of mapped pipes, just before the (reducing) joinFunction is applied
*/
Expand Down Expand Up @@ -94,6 +94,7 @@ trait CoGroupable[K, +R] extends HasReducers with java.io.Serializable {
new CoGrouped[K, R2] {
val inputs = self.inputs ++ smaller.inputs
val reducers = (self.reducers.toIterable ++ smaller.reducers.toIterable).reduceOption(_ max _)
val descriptions: Seq[String] = self.descriptions ++ smaller.descriptions
def keyOrdering = smaller.keyOrdering

/**
Expand Down Expand Up @@ -131,7 +132,7 @@ trait CoGroupable[K, +R] extends HasReducers with java.io.Serializable {
// TODO: implement blockJoin
}

trait CoGrouped[K, +R] extends KeyedListLike[K, R, CoGrouped] with CoGroupable[K, R] with WithReducers[CoGrouped[K, R]] {
trait CoGrouped[K, +R] extends KeyedListLike[K, R, CoGrouped] with CoGroupable[K, R] with WithReducers[CoGrouped[K, R]] with WithDescription[CoGrouped[K, R]] {
override def withReducers(reds: Int) = {
val self = this // the usual self => trick leads to serialization errors
val joinF = joinFunction // can't access this on self, since it is protected
Expand All @@ -140,6 +141,19 @@ trait CoGrouped[K, +R] extends KeyedListLike[K, R, CoGrouped] with CoGroupable[K
def reducers = Some(reds)
def keyOrdering = self.keyOrdering
def joinFunction = joinF
def descriptions: Seq[String] = self.descriptions
}
}

override def withDescription(description: String) = {
val self = this // the usual self => trick leads to serialization errors
val joinF = joinFunction // can't access this on self, since it is protected
new CoGrouped[K, R] {
def inputs = self.inputs
def reducers = self.reducers
def keyOrdering = self.keyOrdering
def joinFunction = joinF
def descriptions: Seq[String] = self.descriptions :+ description
}
}

Expand All @@ -159,6 +173,7 @@ trait CoGrouped[K, +R] extends KeyedListLike[K, R, CoGrouped] with CoGroupable[K
new CoGrouped[K, R] {
val inputs = self.inputs.map(_.filterKeys(fn))
def reducers = self.reducers
def descriptions: Seq[String] = self.descriptions
def keyOrdering = self.keyOrdering
def joinFunction = joinF
}
Expand All @@ -170,6 +185,7 @@ trait CoGrouped[K, +R] extends KeyedListLike[K, R, CoGrouped] with CoGroupable[K
new CoGrouped[K, R1] {
def inputs = self.inputs
def reducers = self.reducers
def descriptions: Seq[String] = self.descriptions
def keyOrdering = self.keyOrdering
def joinFunction = { (k: K, leftMost: Iterator[CTuple], joins: Seq[Iterable[CTuple]]) =>
val joined = joinF(k, leftMost, joins)
Expand Down Expand Up @@ -284,10 +300,13 @@ trait CoGrouped[K, +R] extends KeyedListLike[K, R, CoGrouped] with CoGroupable[K
* the CoGrouped only populates the first two fields, the second two
* are null. We then project out at the end of the method.
*/

val pipeWithRed = RichPipe.setReducers(newPipe, reducers.getOrElse(-1)).project('key, 'value)
val pipeWithRedAndDescriptions = {
RichPipe.setReducers(newPipe, reducers.getOrElse(-1))
RichPipe.setPipeDescriptions(newPipe, descriptions)
newPipe.project('key, 'value)
}
//Construct the new TypedPipe
TypedPipe.from[(K, R)](pipeWithRed, ('key, 'value))(flowDef, mode, tuple2Converter)
TypedPipe.from[(K, R)](pipeWithRedAndDescriptions, ('key, 'value))(flowDef, mode, tuple2Converter)
})
}
}
Expand Down

0 comments on commit d72c482

Please sign in to comment.