Skip to content

Commit

Permalink
Merge branch 'hotfix/0.3.4' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
johnynek committed Mar 1, 2012
2 parents 51742bd + 64da82e commit 092fd9e
Show file tree
Hide file tree
Showing 19 changed files with 704 additions and 263 deletions.
4 changes: 4 additions & 0 deletions .travis.yml
@@ -0,0 +1,4 @@
language: scala
scala:
- 2.8.1
- 2.9.1
25 changes: 16 additions & 9 deletions README.md
Expand Up @@ -14,28 +14,35 @@ You should follow the scalding project on twitter: <http://twitter.com/scalding>
Hadoop is a distributed system for counting words. Here is how it's done in scalding. You can find this in examples: Hadoop is a distributed system for counting words. Here is how it's done in scalding. You can find this in examples:


```scala ```scala
package com.twitter.scalding.examples package com.twitter.scalding.examples


import com.twitter.scalding._ import com.twitter.scalding._


class WordCountJob(args : Args) extends Job(args) { class WordCountJob(args : Args) extends Job(args) {
TextLine( args("input") ).read. TextLine( args("input") ).read.
flatMap('line -> 'word) { line : String => line.split("\\s+") }. flatMap('line -> 'word) { line : String => line.split("\\s+") }.
groupBy('word) { _.size }. groupBy('word) { _.size }.
write( Tsv( args("output") ) ) write( Tsv( args("output") ) )
} }
``` ```


##Tutorial ##Tutorial
See tutorial/ for examples of how to use the DSL. See tutorial/CodeSnippets.md for some See tutorial/ for examples of how to use the DSL. See tutorial/CodeSnippets.md for some
example scalding snippets. example scalding snippets. Edwin Chen wrote an excellent tutorial on using scalding for
recommendations:
<http://blog.echen.me/2012/02/09/movie-recommendations-and-more-via-mapreduce-and-scalding/>


## Building ## Building
0. Install sbt 0.11 0. Install sbt 0.11
1. ```sbt update``` (takes 2 minutes or more) 1. ```sbt update``` (takes 2 minutes or more)
2. ```sbt test``` 2. ```sbt test```
3. ```sbt assembly``` (needed to make the jar used by the scald.rb script) 3. ```sbt assembly``` (needed to make the jar used by the scald.rb script)


We use Travis-ci.org to verify the build:
[![Build Status](https://secure.travis-ci.org/twitter/scalding.png)](http://travis-ci.org/twitter/scalding)

The current version is 0.3.4 and available from maven central: org="com.twitter", artifact="scalding_2.8.1".

## Comparison to Scrunch/Scoobi ## Comparison to Scrunch/Scoobi
Scalding comes with an executable tutorial set that does not require a Hadoop Scalding comes with an executable tutorial set that does not require a Hadoop
cluster. If you're curious about scalding, why not invest a bit of time and run the tutorial cluster. If you're curious about scalding, why not invest a bit of time and run the tutorial
Expand Down
12 changes: 6 additions & 6 deletions build.sbt
Expand Up @@ -2,23 +2,23 @@ import AssemblyKeys._


name := "scalding" name := "scalding"


version := "0.3.0" version := "0.3.4"


organization := "com.twitter" organization := "com.twitter"


scalaVersion := "2.8.1" scalaVersion := "2.8.1"


resolvers += "Concurrent Maven Repo" at "http://conjars.org/repo" resolvers += "Concurrent Maven Repo" at "http://conjars.org/repo"


libraryDependencies += "cascading" % "cascading-core" % "2.0.0-wip-215" libraryDependencies += "cascading" % "cascading-core" % "2.0.0-wip-238"


libraryDependencies += "cascading" % "cascading-local" % "2.0.0-wip-215" libraryDependencies += "cascading" % "cascading-local" % "2.0.0-wip-238"


libraryDependencies += "cascading" % "cascading-hadoop" % "2.0.0-wip-215" libraryDependencies += "cascading" % "cascading-hadoop" % "2.0.0-wip-238"


libraryDependencies += "cascading.kryo" % "cascading.kryo" % "0.2.0" libraryDependencies += "cascading.kryo" % "cascading.kryo" % "0.2.1"


libraryDependencies += "com.twitter" % "meat-locker" % "0.1.4" libraryDependencies += "com.twitter" % "meat-locker" % "0.1.6"


libraryDependencies += "commons-lang" % "commons-lang" % "2.4" libraryDependencies += "commons-lang" % "commons-lang" % "2.4"


Expand Down
2 changes: 1 addition & 1 deletion scripts/scald.rb
Expand Up @@ -2,7 +2,7 @@
require 'fileutils' require 'fileutils'
require 'thread' require 'thread'


SCALDING_VERSION="0.3.0" SCALDING_VERSION="0.3.4"


#Usage : scald.rb [--hdfs|--local|--print] job <job args> #Usage : scald.rb [--hdfs|--local|--print] job <job args>
# --hdfs: if job ends in ".scala" or ".java" and the file exists, link it against JARFILE (below) and then run it on HOST. # --hdfs: if job ends in ".scala" or ".java" and the file exists, link it against JARFILE (below) and then run it on HOST.
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/com/twitter/scalding/DateRange.scala
Expand Up @@ -380,6 +380,7 @@ case class DateRange(val start : RichDate, val end : RichDate) {
* current range. This children must be ordered from largest * current range. This children must be ordered from largest
* to smallest in size. * to smallest in size.
*/ */
@serializable
class BaseGlobifier(dur : Duration, val sym: String, pattern : String, tz : TimeZone, child : Option[BaseGlobifier]) { class BaseGlobifier(dur : Duration, val sym: String, pattern : String, tz : TimeZone, child : Option[BaseGlobifier]) {
import DateOps._ import DateOps._
// result <= rd // result <= rd
Expand Down Expand Up @@ -494,5 +495,6 @@ case class MonthGlob(pat : String)(implicit tz: TimeZone)
/* /*
* This is the outermost globifier and should generally be used to globify * This is the outermost globifier and should generally be used to globify
*/ */
@serializable
case class Globifier(pat : String)(implicit tz: TimeZone) case class Globifier(pat : String)(implicit tz: TimeZone)
extends BaseGlobifier(Years(1)(tz), "%1$tY", pat, tz, Some(MonthGlob(pat))) extends BaseGlobifier(Years(1)(tz), "%1$tY", pat, tz, Some(MonthGlob(pat)))
96 changes: 83 additions & 13 deletions src/main/scala/com/twitter/scalding/GroupBuilder.scala
Expand Up @@ -32,7 +32,8 @@ import scala.math.Ordering
// for instance, a scanLeft/foldLeft generally requires a sorting // for instance, a scanLeft/foldLeft generally requires a sorting
// but such sorts are (at least for now) incompatible with doing a combine // but such sorts are (at least for now) incompatible with doing a combine
// which includes some map-side reductions. // which includes some map-side reductions.
class GroupBuilder(val groupFields : Fields) extends FieldConversions with TupleConversions { class GroupBuilder(val groupFields : Fields) extends FieldConversions
with TupleConversions with java.io.Serializable {


/** /**
* Holds the "reducers/combiners", the things that we can do paritially map-side. * Holds the "reducers/combiners", the things that we can do paritially map-side.
Expand Down Expand Up @@ -70,8 +71,57 @@ class GroupBuilder(val groupFields : Fields) extends FieldConversions with Tuple
return !reds.isEmpty return !reds.isEmpty
} }


/**
* Holds the number of reducers to use in the reduce stage of the groupBy/aggregateBy.
* By default uses whatever value is set in the jobConf.
*/
private var numReducers : Option[Int] = None
private val REDUCER_KEY = "mapred.reduce.tasks"
/**
* Override the number of reducers used in the groupBy.
*/
def reducers(r : Int) = {
if(r > 0) {
numReducers = Some(r)
}
this
}

private def overrideReducers(p : Pipe) : Pipe = {
numReducers.map{ r =>
if(r <= 0)
throw new IllegalArgumentException("Number of reducers must be non-negative")
p.getProcessConfigDef()
.setProperty(REDUCER_KEY, r.toString)
}
p
}

// When combining averages, if the counts sizes are too close we should use a different
// algorithm. This constant defines how close the ratio of the smaller to the total count
// can be:
private val STABILITY_CONSTANT = 0.1
/**
* uses a more stable online algorithm which should
* be suitable for large numbers of records
* similar to:
* http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
*/
def average(f : (Fields, Fields)) : GroupBuilder = { def average(f : (Fields, Fields)) : GroupBuilder = {
mapReduceMap(f)((x:Double) => (1L, x))((t1, t2) => (t1._1 + t2._1, t1._2 + t2._2))(res => res._2/res._1) mapReduceMap(f){(x:Double) =>
(1L, x)
} {(cntAve1, cntAve2) =>
val (big, small) = if (cntAve1._1 >= cntAve2._1) (cntAve1, cntAve2) else (cntAve2, cntAve1)
val n = big._1
val k = small._1
val an = big._2
val ak = small._2
val newCnt = n+k
val scaling = k.toDouble/newCnt
// a_n + (a_k - a_n)*(k/(n+k)) is only stable if n is not approximately k
val newAve = if (scaling < STABILITY_CONSTANT) (an + (ak - an)*scaling) else (n*an + k*ak)/newCnt
(newCnt, newAve)
} { res => res._2 }
} }
def average(f : Symbol) : GroupBuilder = average(f->f) def average(f : Symbol) : GroupBuilder = average(f->f)


Expand Down Expand Up @@ -112,6 +162,7 @@ class GroupBuilder(val groupFields : Fields) extends FieldConversions with Tuple
/** /**
* Compute the count, ave and stdard deviation in one pass * Compute the count, ave and stdard deviation in one pass
* example: g.cntAveStdev('x -> ('cntx, 'avex, 'stdevx)) * example: g.cntAveStdev('x -> ('cntx, 'avex, 'stdevx))
* uses: http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
*/ */
def sizeAveStdev(fieldDef : (Fields,Fields)) = { def sizeAveStdev(fieldDef : (Fields,Fields)) = {
val (fromFields, toFields) = fieldDef val (fromFields, toFields) = fieldDef
Expand All @@ -123,18 +174,27 @@ class GroupBuilder(val groupFields : Fields) extends FieldConversions with Tuple
// sum_i (x_i - ave)^2 = sum_i x_i^2 - 2x_i ave +ave^2 // sum_i (x_i - ave)^2 = sum_i x_i^2 - 2x_i ave +ave^2
// = (sum_i x_i^2) - 2N ave^2 + N ave^2 // = (sum_i x_i^2) - 2N ave^2 + N ave^2
// = (sum_i x_i^2) - N ave^2 // = (sum_i x_i^2) - N ave^2
// These lines are long, wrapping them impossible, so partial function
// application:
mapReduceMap[Double, (Long,Double,Double), (Long,Double,Double)](fieldDef) { //Map mapReduceMap[Double, (Long,Double,Double), (Long,Double,Double)](fieldDef) { //Map
x => (1L,x,x*x) (x : Double) => (1L,x,0.0)
} { //Reduce, mourn the absence of a clearly monadic operation on tuples: } {(cntAve1, cntAve2) =>
(t1, t2) => (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3) val (big, small) = if (cntAve1._1 >= cntAve2._1) (cntAve1, cntAve2) else (cntAve2, cntAve1)
val n = big._1
val k = small._1
val an = big._2
val ak = small._2
val delta = (ak - an)
val mn = big._3
val mk = small._3
val newCnt = n+k
val scaling = k.toDouble/newCnt
// a_n + (a_k - a_n)*(k/(n+k)) is only stable if n is not approximately k
val newAve = if (scaling < STABILITY_CONSTANT) (an + delta*scaling) else (n*an + k*ak)/newCnt
val newStdMom = mn + mk + delta*delta*(n*scaling)
(newCnt, newAve, newStdMom)
} { //Map } { //Map
moms => moms =>
val cnt = moms._1; val cnt = moms._1
val ave = moms._2/moms._1; (cnt, moms._2, scala.math.sqrt(moms._3/(cnt - 1)))
val vari = scala.math.sqrt((moms._3 - cnt * ave * ave)/(cnt-1));
(cnt, ave, vari)
} }
} }


Expand Down Expand Up @@ -319,14 +379,24 @@ class GroupBuilder(val groupFields : Fields) extends FieldConversions with Tuple
case None => new GroupBy(name, mpipes, groupFields) case None => new GroupBy(name, mpipes, groupFields)
case Some(sf) => new GroupBy(name, mpipes, groupFields, sf, isReversed) case Some(sf) => new GroupBy(name, mpipes, groupFields, sf, isReversed)
} }
overrideReducers(startPipe)

// Time to schedule the addEverys: // Time to schedule the addEverys:
evs.foldRight(startPipe)( (op : Pipe => Every, p) => op(p) ) evs.foldRight(startPipe)( (op : Pipe => Every, p) => op(p) )
} }
//This is the case where the group function is identity: { g => g } //This is the case where the group function is identity: { g => g }
case Some(Nil) => new GroupBy(name, mpipes, groupFields) case Some(Nil) => {
val gb = new GroupBy(name, mpipes, groupFields)
overrideReducers(gb)
gb
}
//There is some non-empty AggregateBy to do: //There is some non-empty AggregateBy to do:
case Some(redlist) => { case Some(redlist) => {
new AggregateBy(name, mpipes, groupFields, redlist.reverse.toArray : _*) val THRESHOLD = 100000 //tune this, default is 10k
val ag = new AggregateBy(name, mpipes, groupFields,
THRESHOLD, redlist.reverse.toArray : _*)
overrideReducers(ag.getGroupBy())
ag
} }
} }
} }
Expand Down
71 changes: 71 additions & 0 deletions src/main/scala/com/twitter/scalding/IntegralComparator.scala
@@ -0,0 +1,71 @@
/*
Copyright 2012 Twitter, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.twitter.scalding;

import cascading.tuple.Hasher;

import java.io.Serializable;
import java.util.Comparator;

/*
* Handles numerical hashing properly
*/
class IntegralComparator extends Comparator[AnyRef] with Hasher[AnyRef] with Serializable {

val integralTypes : Set[Class[_]] = Set(classOf[java.lang.Long],
classOf[java.lang.Integer],
classOf[java.lang.Short],
classOf[java.lang.Byte])

def isIntegral(boxed : AnyRef) = integralTypes(boxed.getClass)

override def compare(a1: AnyRef, a2: AnyRef) : Int = {
val a1IsNull = if (null == a1) 1 else 0
val a2IsNull = if (null == a2) 1 else 0
if (a1IsNull + a2IsNull > 0) {
//if a2IsNull, but a1IsNot, a2 is less:
a2IsNull - a1IsNull
}
else if (isIntegral(a1) && isIntegral(a2)) {
val long1 = a1.asInstanceOf[Number].longValue
val long2 = a2.asInstanceOf[Number].longValue
if (long1 < long2)
-1
else if (long1 > long2)
1
else
0
}
else
a1.asInstanceOf[Comparable[AnyRef]].compareTo(a2)
}

override def hashCode(obj : AnyRef) : Int = {
if (null == obj) {
0
}
else if (isIntegral(obj)) {
obj.asInstanceOf[Number]
.longValue
.hashCode
}
else {
//Use the default:
obj.hashCode
}
}
}
54 changes: 40 additions & 14 deletions src/main/scala/com/twitter/scalding/Job.scala
Expand Up @@ -63,33 +63,59 @@ class Job(val args : Args) extends TupleConversions with FieldConversions {


// Only very different styles of Jobs should override this. // Only very different styles of Jobs should override this.
def buildFlow(implicit mode : Mode) = { def buildFlow(implicit mode : Mode) = {
// first verify all the source inputs are present validateSources(mode)
flowDef.getSources() // Sources are good, now connect the flow:
.asInstanceOf[JMap[String,AnyRef]] mode.newFlowConnector(config).connect(flowDef)
// this is a map of (name, Tap) }
.foreach { nameTap =>
// Each named source must be present:
mode.getSourceNamed(nameTap._1)
.get
// This can throw a InvalidSourceException
.validateTaps(mode)
}


mode.newFlowConnector(ioSerializations ++ List("com.twitter.scalding.KryoHadoopSerialization")) /**
.connect(flowDef) * By default we only set two keys:
* io.serializations
* cascading.tuple.element.comparator.default
* Override this class, call base and ++ your additional
* map to set more options
*/
def config : Map[AnyRef,AnyRef] = {
val ioserVals = (ioSerializations ++
List("com.twitter.scalding.KryoHadoopSerialization")).mkString(",")
Map("io.serializations" -> ioserVals) ++
(defaultComparator match {
case Some(defcomp) => Map("cascading.tuple.element.comparator.default" -> defcomp)
case None => Map[String,String]()
}) ++
Map("cascading.spill.threshold" -> "100000", //Tune these for better performance
"cascading.spillmap.threshold" -> "100000")
} }

//Override this if you need to do some extra processing other than complete the flow //Override this if you need to do some extra processing other than complete the flow
def run(implicit mode : Mode) = { def run(implicit mode : Mode) = {
val flow = buildFlow(mode) val flow = buildFlow(mode)
flow.complete flow.complete
flow.getFlowStats.isSuccessful flow.getFlowStats.isSuccessful
} }
//Add any serializations you need to deal with here: // Add any serializations you need to deal with here:
def ioSerializations = List[String]() def ioSerializations = List[String]()
// Override this if you want to customize comparisons/hashing for your job
def defaultComparator : Option[String] = {
Some("com.twitter.scalding.IntegralComparator")
}


//Largely for the benefit of Java jobs //Largely for the benefit of Java jobs
def read(src : Source) = src.read def read(src : Source) = src.read
def write(pipe : Pipe, src : Source) {src.write(pipe)} def write(pipe : Pipe, src : Source) {src.write(pipe)}

def validateSources(mode : Mode) {
flowDef.getSources()
.asInstanceOf[JMap[String,AnyRef]]
// this is a map of (name, Tap)
.foreach { nameTap =>
// Each named source must be present:
mode.getSourceNamed(nameTap._1)
.get
// This can throw a InvalidSourceException
.validateTaps(mode)
}
}
} }


/** /**
Expand Down

0 comments on commit 092fd9e

Please sign in to comment.