Skip to content

Commit

Permalink
Major features and fixes
Browse files Browse the repository at this point in the history
* Ability to set reducers in groupBy (see .reducers)
* Ability to set reducers in join (see named parameter e.g. reducers=12)
* Some bug fixes around serializable objects.
* Increases the cascading spill thresholds (to 100k)
* Adds numeric hashing so joining longs and ints are okay.
* Enables optimized joins ("map-side" join). See RichPipe.joinWithTiny.
  • Loading branch information
johnynek committed Feb 23, 2012
1 parent 2a13558 commit 1481992
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 69 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import AssemblyKeys._

name := "scalding"

version := "0.3.1"
version := "0.3.2"

organization := "com.twitter"

Expand Down
2 changes: 1 addition & 1 deletion scripts/scald.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
require 'fileutils'
require 'thread'

SCALDING_VERSION="0.3.1"
SCALDING_VERSION="0.3.2"

#Usage : scald.rb [--hdfs|--local|--print] job <job args>
# --hdfs: if job ends in ".scala" or ".java" and the file exists, link it against JARFILE (below) and then run it on HOST.
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/com/twitter/scalding/DateRange.scala
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ case class DateRange(val start : RichDate, val end : RichDate) {
* current range. This children must be ordered from largest
* to smallest in size.
*/
@serializable
class BaseGlobifier(dur : Duration, val sym: String, pattern : String, tz : TimeZone, child : Option[BaseGlobifier]) {
import DateOps._
// result <= rd
Expand Down Expand Up @@ -494,5 +495,6 @@ case class MonthGlob(pat : String)(implicit tz: TimeZone)
/*
* This is the outermost globifier and should generally be used to globify
*/
@serializable
case class Globifier(pat : String)(implicit tz: TimeZone)
extends BaseGlobifier(Years(1)(tz), "%1$tY", pat, tz, Some(MonthGlob(pat)))
96 changes: 83 additions & 13 deletions src/main/scala/com/twitter/scalding/GroupBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import scala.math.Ordering
// for instance, a scanLeft/foldLeft generally requires a sorting
// but such sorts are (at least for now) incompatible with doing a combine
// which includes some map-side reductions.
class GroupBuilder(val groupFields : Fields) extends FieldConversions with TupleConversions {
class GroupBuilder(val groupFields : Fields) extends FieldConversions
with TupleConversions with java.io.Serializable {

/**
* Holds the "reducers/combiners", the things that we can do paritially map-side.
Expand Down Expand Up @@ -70,8 +71,57 @@ class GroupBuilder(val groupFields : Fields) extends FieldConversions with Tuple
return !reds.isEmpty
}

/**
* Holds the number of reducers to use in the reduce stage of the groupBy/aggregateBy.
* By default uses whatever value is set in the jobConf.
*/
private var numReducers : Option[Int] = None
private val REDUCER_KEY = "mapred.reduce.tasks"
/**
* Override the number of reducers used in the groupBy.
*/
def reducers(r : Int) = {
if(r > 0) {
numReducers = Some(r)
}
this
}

private def overrideReducers(p : Pipe) : Pipe = {
numReducers.map{ r =>
if(r <= 0)
throw new IllegalArgumentException("Number of reducers must be non-negative")
p.getProcessConfigDef()
.setProperty(REDUCER_KEY, r.toString)
}
p
}

// When combining averages, if the counts sizes are too close we should use a different
// algorithm. This constant defines how close the ratio of the smaller to the total count
// can be:
private val STABILITY_CONSTANT = 0.1
/**
* uses a more stable online algorithm which should
* be suitable for large numbers of records
* similar to:
* http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
*/
def average(f : (Fields, Fields)) : GroupBuilder = {
mapReduceMap(f)((x:Double) => (1L, x))((t1, t2) => (t1._1 + t2._1, t1._2 + t2._2))(res => res._2/res._1)
mapReduceMap(f){(x:Double) =>
(1L, x)
} {(cntAve1, cntAve2) =>
val (big, small) = if (cntAve1._1 >= cntAve2._1) (cntAve1, cntAve2) else (cntAve2, cntAve1)
val n = big._1
val k = small._1
val an = big._2
val ak = small._2
val newCnt = n+k
val scaling = k.toDouble/newCnt
// a_n + (a_k - a_n)*(k/(n+k)) is only stable if n is not approximately k
val newAve = if (scaling < STABILITY_CONSTANT) (an + (ak - an)*scaling) else (n*an + k*ak)/newCnt
(newCnt, newAve)
} { res => res._2 }
}
def average(f : Symbol) : GroupBuilder = average(f->f)

Expand Down Expand Up @@ -112,6 +162,7 @@ class GroupBuilder(val groupFields : Fields) extends FieldConversions with Tuple
/**
* Compute the count, ave and stdard deviation in one pass
* example: g.cntAveStdev('x -> ('cntx, 'avex, 'stdevx))
* uses: http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
*/
def sizeAveStdev(fieldDef : (Fields,Fields)) = {
val (fromFields, toFields) = fieldDef
Expand All @@ -123,18 +174,27 @@ class GroupBuilder(val groupFields : Fields) extends FieldConversions with Tuple
// sum_i (x_i - ave)^2 = sum_i x_i^2 - 2x_i ave +ave^2
// = (sum_i x_i^2) - 2N ave^2 + N ave^2
// = (sum_i x_i^2) - N ave^2
// These lines are long, wrapping them impossible, so partial function
// application:
mapReduceMap[Double, (Long,Double,Double), (Long,Double,Double)](fieldDef) { //Map
x => (1L,x,x*x)
} { //Reduce, mourn the absence of a clearly monadic operation on tuples:
(t1, t2) => (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
(x : Double) => (1L,x,0.0)
} {(cntAve1, cntAve2) =>
val (big, small) = if (cntAve1._1 >= cntAve2._1) (cntAve1, cntAve2) else (cntAve2, cntAve1)
val n = big._1
val k = small._1
val an = big._2
val ak = small._2
val delta = (ak - an)
val mn = big._3
val mk = small._3
val newCnt = n+k
val scaling = k.toDouble/newCnt
// a_n + (a_k - a_n)*(k/(n+k)) is only stable if n is not approximately k
val newAve = if (scaling < STABILITY_CONSTANT) (an + delta*scaling) else (n*an + k*ak)/newCnt
val newStdMom = mn + mk + delta*delta*(n*scaling)
(newCnt, newAve, newStdMom)
} { //Map
moms =>
val cnt = moms._1;
val ave = moms._2/moms._1;
val vari = scala.math.sqrt((moms._3 - cnt * ave * ave)/(cnt-1));
(cnt, ave, vari)
val cnt = moms._1
(cnt, moms._2, scala.math.sqrt(moms._3/(cnt - 1)))
}
}

Expand Down Expand Up @@ -319,14 +379,24 @@ class GroupBuilder(val groupFields : Fields) extends FieldConversions with Tuple
case None => new GroupBy(name, mpipes, groupFields)
case Some(sf) => new GroupBy(name, mpipes, groupFields, sf, isReversed)
}
overrideReducers(startPipe)

// Time to schedule the addEverys:
evs.foldRight(startPipe)( (op : Pipe => Every, p) => op(p) )
}
//This is the case where the group function is identity: { g => g }
case Some(Nil) => new GroupBy(name, mpipes, groupFields)
case Some(Nil) => {
val gb = new GroupBy(name, mpipes, groupFields)
overrideReducers(gb)
gb
}
//There is some non-empty AggregateBy to do:
case Some(redlist) => {
new AggregateBy(name, mpipes, groupFields, redlist.reverse.toArray : _*)
val THRESHOLD = 100000 //tune this, default is 10k
val ag = new AggregateBy(name, mpipes, groupFields,
THRESHOLD, redlist.reverse.toArray : _*)
overrideReducers(ag.getGroupBy())
ag
}
}
}
Expand Down
54 changes: 40 additions & 14 deletions src/main/scala/com/twitter/scalding/Job.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,33 +63,59 @@ class Job(val args : Args) extends TupleConversions with FieldConversions {

// Only very different styles of Jobs should override this.
def buildFlow(implicit mode : Mode) = {
// first verify all the source inputs are present
flowDef.getSources()
.asInstanceOf[JMap[String,AnyRef]]
// this is a map of (name, Tap)
.foreach { nameTap =>
// Each named source must be present:
mode.getSourceNamed(nameTap._1)
.get
// This can throw a InvalidSourceException
.validateTaps(mode)
}
validateSources(mode)
// Sources are good, now connect the flow:
mode.newFlowConnector(config).connect(flowDef)
}

mode.newFlowConnector(ioSerializations ++ List("com.twitter.scalding.KryoHadoopSerialization"))
.connect(flowDef)
/**
* By default we only set two keys:
* io.serializations
* cascading.tuple.element.comparator.default
* Override this class, call base and ++ your additional
* map to set more options
*/
def config : Map[AnyRef,AnyRef] = {
val ioserVals = (ioSerializations ++
List("com.twitter.scalding.KryoHadoopSerialization")).mkString(",")
Map("io.serializations" -> ioserVals) ++
(defaultComparator match {
case Some(defcomp) => Map("cascading.tuple.element.comparator.default" -> defcomp)
case None => Map[String,String]()
}) ++
Map("cascading.spill.threshold" -> "100000", //Tune these for better performance
"cascading.spillmap.threshold" -> "100000")
}

//Override this if you need to do some extra processing other than complete the flow
def run(implicit mode : Mode) = {
val flow = buildFlow(mode)
flow.complete
flow.getFlowStats.isSuccessful
}
//Add any serializations you need to deal with here:
// Add any serializations you need to deal with here:
def ioSerializations = List[String]()
// Override this if you want to customize comparisons/hashing for your job
def defaultComparator : Option[String] = {
Some("com.twitter.scalding.IntegralComparator")
}

//Largely for the benefit of Java jobs
def read(src : Source) = src.read
def write(pipe : Pipe, src : Source) {src.write(pipe)}

def validateSources(mode : Mode) {
flowDef.getSources()
.asInstanceOf[JMap[String,AnyRef]]
// this is a map of (name, Tap)
.foreach { nameTap =>
// Each named source must be present:
mode.getSourceNamed(nameTap._1)
.get
// This can throw a InvalidSourceException
.validateTaps(mode)
}
}
}

/**
Expand Down
36 changes: 25 additions & 11 deletions src/main/scala/com/twitter/scalding/Mode.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ abstract class Mode(val sourceStrictness : Boolean) {
//We can't name two different pipes with the same name.
protected val sourceMap = MMap[Source, Pipe]()

def newFlowConnector(iosers : List[String]) : FlowConnector
def newFlowConnector(props : Map[AnyRef,AnyRef]) : FlowConnector

/**
* Cascading can't handle multiple head pipes with the same
Expand All @@ -58,14 +58,30 @@ abstract class Mode(val sourceStrictness : Boolean) {
}

trait HadoopMode extends Mode {
def jobConf : Configuration
def newFlowConnector(iosersIn : List[String]) = {
val props = jobConf.foldLeft(Map[AnyRef, AnyRef]()) {
// config is iterable, but not a map, convert to one:
implicit def configurationToMap(config : Configuration) = {
config.foldLeft(Map[AnyRef, AnyRef]()) {
(acc, kv) => acc + ((kv.getKey, kv.getValue))
}
val io = "io.serializations"
val iosers = (props.get(io).toList ++ iosersIn).mkString(",")
new HadoopFlowConnector(props + (io -> iosers))
}

def jobConf : Configuration

/*
* for each key, do a set union of values, keeping the order from prop1 to prop2
*/
protected def unionValues(prop1 : Map[AnyRef,AnyRef], prop2 : Map[AnyRef,AnyRef]) = {
(prop1.keys ++ prop2.keys).foldLeft(Map[AnyRef,AnyRef]()) { (acc, key) =>
val values1 = prop1.get(key).map { _.toString.split(",") }.getOrElse(Array[String]())
val values2 = prop2.get(key).map { _.toString.split(",") }.getOrElse(Array[String]())
//Only keep the different ones:
val union = (values1 ++ values2.filter { !values1.contains(_) }).mkString(",")
acc + ((key, union))
}
}

def newFlowConnector(props : Map[AnyRef,AnyRef]) = {
new HadoopFlowConnector(unionValues(jobConf, props))
}
}

Expand All @@ -79,13 +95,11 @@ case class HadoopTest(val config : Configuration, val buffers : Map[Source,Buffe
}

case class Local(strict : Boolean) extends Mode(strict) {
//No serialization is actually done in local mode, it's all memory
def newFlowConnector(iosers : List[String]) = new LocalFlowConnector
def newFlowConnector(props : Map[AnyRef,AnyRef]) = new LocalFlowConnector(props)
}
/**
* Memory only testing for unit tests
*/
case class Test(val buffers : Map[Source,Buffer[Tuple]]) extends Mode(false) {
//No serialization is actually done in Test mode, it's all memory
def newFlowConnector(iosers : List[String]) = new LocalFlowConnector
def newFlowConnector(props : Map[AnyRef,AnyRef]) = new LocalFlowConnector(props)
}
2 changes: 1 addition & 1 deletion src/main/scala/com/twitter/scalding/Operations.scala
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ import OperatorConversions._
class ExtremumFunctor(choose_max : Boolean, fields : Fields) extends AggregateBy.Functor {
override def getDeclaredFields = fields
def aggregate(flowProcess : FlowProcess[_], args : TupleEntry, context : Tuple) = {
val this_tup = args.getTupleCopy
val this_tup = args.getTuple
if(context == null) { this_tup }
else {
val (max, min) = if( context.compareTo(this_tup) < 0 ) {
Expand Down
Loading

0 comments on commit 1481992

Please sign in to comment.