Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Sync with Twitter internal, bump version

  • Loading branch information...
commit 51742bd739fd1d9b7b1be0be2aaf22051908adee 1 parent 6bf133c
@johnynek johnynek authored
View
14 build.sbt
@@ -2,7 +2,7 @@ import AssemblyKeys._
name := "scalding"
-version := "0.2.0"
+version := "0.3.0"
organization := "com.twitter"
@@ -10,15 +10,17 @@ scalaVersion := "2.8.1"
resolvers += "Concurrent Maven Repo" at "http://conjars.org/repo"
-libraryDependencies += "cascading" % "cascading-core" % "2.0.0-wip-165"
+libraryDependencies += "cascading" % "cascading-core" % "2.0.0-wip-215"
-libraryDependencies += "cascading" % "cascading-local" % "2.0.0-wip-165"
+libraryDependencies += "cascading" % "cascading-local" % "2.0.0-wip-215"
-libraryDependencies += "cascading" % "cascading-hadoop" % "2.0.0-wip-165"
+libraryDependencies += "cascading" % "cascading-hadoop" % "2.0.0-wip-215"
-libraryDependencies += "commons-lang" % "commons-lang" % "2.4"
+libraryDependencies += "cascading.kryo" % "cascading.kryo" % "0.2.0"
+
+libraryDependencies += "com.twitter" % "meat-locker" % "0.1.4"
-libraryDependencies += "de.javakaffee" % "kryo-serializers" % "0.9"
+libraryDependencies += "commons-lang" % "commons-lang" % "2.4"
libraryDependencies += "org.scala-tools.testing" % "specs_2.8.0" % "1.6.5" % "test"
View
4 scripts/scald.rb
@@ -2,6 +2,8 @@
require 'fileutils'
require 'thread'
+SCALDING_VERSION="0.3.0"
+
#Usage : scald.rb [--hdfs|--local|--print] job <job args>
# --hdfs: if job ends in ".scala" or ".java" and the file exists, link it against JARFILE (below) and then run it on HOST.
# else, it is assumed to be a full classname to an item in the JARFILE, which is run on HOST
@@ -15,7 +17,7 @@
#Get the absolute path of the original (non-symlink) file.
ORIGINAL_FILE=File.symlink?(__FILE__) ? File.readlink(__FILE__) : __FILE__
SCIENCE_ROOT=File.expand_path(File.dirname(ORIGINAL_FILE)+"/../")
-JARFILE=SCIENCE_ROOT + "/target/scalding-assembly-0.2.0.jar" #what jar has all the depencies for this job
+JARFILE=SCIENCE_ROOT + "/target/scalding-assembly-#{SCALDING_VERSION}.jar" #what jar has all the depencies for this job
puts JARFILE
HOST="my.remote.host" #where the job is rsynced to and run
TMPDIR="/tmp"
View
384 src/main/scala/com/twitter/scalding/DateRange.scala
@@ -23,6 +23,7 @@ import java.util.Calendar
import java.util.Date
import java.util.TimeZone
import java.util.NoSuchElementException
+import java.util.regex.Pattern
import org.apache.commons.lang.time.DateUtils
@@ -87,44 +88,156 @@ object DateOps {
/**
* Represents millisecond based duration (non-calendar based): seconds, minutes, hours
+* calField should be a java.util.Calendar field
*/
-abstract class Duration {
- def toMillisecs : Long
- def toSeconds = toMillisecs / 1000.0
- def +(that : Duration) = Millisecs(toMillisecs + that.toMillisecs)
- def -(that : Duration) = Millisecs(toMillisecs - that.toMillisecs)
- def *(that : Double) = Millisecs((toMillisecs * that).toLong)
- def /(that : Double) = Millisecs((toMillisecs / that).toLong)
-}
-case class Millisecs(value : Long) extends Duration {
- def toMillisecs = value
+@serializable
+object Duration {
+ val SEC_IN_MS = 1000
+ val MIN_IN_MS = 60 * SEC_IN_MS
+ val HOUR_IN_MS = 60 * MIN_IN_MS
+ val UTC_UNITS = List((Hours,HOUR_IN_MS),(Minutes,MIN_IN_MS),(Seconds,SEC_IN_MS),(Millisecs,1))
+
+ // Creates the largest unit corresponding to this number of milliseconds (or possibly a duration list)
+ def fromMillisecs(diffInMs : Long) : AbsoluteDuration = {
+ // Try to see if we have an even number of any "calendar-free" units
+ // Note, no unit is truly calendar free due to leap years, seconds, etc... so
+ // so this is approximate
+ // We can't fail the last one, x % 1 == 0
+ UTC_UNITS.find { u_ms : (Function1[Int,AbsoluteDuration],Int) =>
+ //This always returns something, maybe the last item:
+ (diffInMs % u_ms._2) == 0
+ }.map { u_ms : (Function1[Int,AbsoluteDuration],Int) =>
+ val count = diffInMs / u_ms._2
+ if (count <= Int.MaxValue)
+ u_ms._1(count.toInt)
+ else {
+ if (diffInMs / HOUR_IN_MS > Int.MaxValue) throw new RuntimeException("difference not expressable in 2^{31} hours")
+
+ AbsoluteDurationList(UTC_UNITS.foldLeft((diffInMs, Nil : List[AbsoluteDuration])) { (state, u_ms) =>
+ val (rest, durList) = state
+ val (constructor, timeInterval) = u_ms
+ val thisCnt = (rest / timeInterval).toInt
+ val next = rest - (thisCnt) * timeInterval
+ (next, constructor(thisCnt) :: durList)
+ }._2)
+ }
+ //This get should never throw because the list always finds something
+ }.get
+ }
}
-case class Seconds(value : Long) extends Duration {
- def toMillisecs = value * 1000L
+
+@serializable
+abstract class Duration(val calField : Int, val count : Int, val tz : TimeZone) {
+ protected def calAdd(that : RichDate, steps : Int) = {
+ val cal = that.toCalendar(tz)
+ cal.setLenient(true)
+ cal.add(calField, steps)
+ new RichDate(cal.getTime)
+ }
+
+ def addTo(that : RichDate) = calAdd(that, count)
+
+ def subtractFrom(that : RichDate) = calAdd(that, -count)
+
+ // Return the latest RichDate at boundary of this time unit, ignoring
+ // the count of the units. Like a truncation.
+ // Only makes sense for non-mixed durations.
+ def floorOf(that : RichDate) : RichDate = {
+ val cal = that.toCalendar(tz)
+ RichDate(DateUtils.truncate(cal, calField).getTime)
+ }
}
-case class Minutes(value : Long) extends Duration {
- def toMillisecs = value * 60 * 1000L
+
+/*
+ * These are reasonably indepedendent of calendars (or we will pretend)
+ */
+@serializable
+object AbsoluteDuration {
+ def max(a : AbsoluteDuration, b : AbsoluteDuration) = if(a > b) a else b
}
-case class Hours(value : Long) extends Duration {
- def toMillisecs = value * 60 * 60 * 1000L
+trait AbsoluteDuration extends Duration with Ordered[AbsoluteDuration] {
+ def toSeconds : Double = {
+ calField match {
+ case Calendar.MILLISECOND => count / 1000.0
+ case Calendar.SECOND => count.toDouble
+ case Calendar.MINUTE => count * 60.0
+ case Calendar.HOUR => count * 60.0 * 60.0
+ }
+ }
+ def toMillisecs : Long = {
+ calField match {
+ case Calendar.MILLISECOND => count.toLong
+ case Calendar.SECOND => count.toLong * 1000L
+ case Calendar.MINUTE => count.toLong * 1000L * 60L
+ case Calendar.HOUR => count.toLong * 1000L * 60L * 60L
+ }
+ }
+ def compare(that : AbsoluteDuration) : Int = {
+ this.toMillisecs.compareTo(that.toMillisecs)
+ }
+ def +(that : AbsoluteDuration) = {
+ Duration.fromMillisecs(this.toMillisecs + that.toMillisecs)
+ }
}
-/**
-* this is only relative to a calendar
-*/
-abstract class CalendarDuration {
- def toDays : Int
- def +(that : CalendarDuration) = Days(toDays + that.toDays)
- def -(that : CalendarDuration) = Days(toDays - that.toDays)
+case class Millisecs(cnt : Int) extends Duration(Calendar.MILLISECOND, cnt, DateOps.UTC)
+ with AbsoluteDuration
+
+case class Seconds(cnt : Int) extends Duration(Calendar.SECOND, cnt, DateOps.UTC)
+ with AbsoluteDuration
+
+case class Minutes(cnt : Int) extends Duration(Calendar.MINUTE, cnt, DateOps.UTC)
+ with AbsoluteDuration
+
+case class Hours(cnt : Int) extends Duration(Calendar.HOUR, cnt, DateOps.UTC)
+ with AbsoluteDuration
+
+case class Days(cnt : Int)(implicit tz : TimeZone)
+ extends Duration(Calendar.DAY_OF_MONTH, cnt, tz)
+
+case class Weeks(cnt : Int)(implicit tz : TimeZone)
+ extends Duration(Calendar.WEEK_OF_YEAR, cnt, tz) {
+
+ // The library we are using can't handle week truncation...
+ override def floorOf(that : RichDate) = {
+ val step = Days(1)
+ @tailrec def recentMonday(rd : RichDate) : RichDate = {
+ rd.toCalendar(tz).get(Calendar.DAY_OF_WEEK) match {
+ case Calendar.MONDAY => rd
+ case _ => recentMonday(step.subtractFrom(rd))
+ }
+ }
+ //Set it to the earliest point in the day:
+ step.floorOf(recentMonday(that))
+ }
}
-case class Days(value : Int) extends CalendarDuration {
- def toDays = value
+case class Months(cnt : Int)(implicit tz : TimeZone)
+ extends Duration(Calendar.MONTH, cnt, tz)
+
+case class Years(cnt : Int)(implicit tz : TimeZone)
+ extends Duration(Calendar.YEAR, cnt, tz)
+
+abstract class AbstractDurationList[T <: Duration](parts : List[T]) extends Duration(-1,-1, null) {
+ override def addTo(that : RichDate) = {
+ parts.foldLeft(that) { (curdate, next) => next.addTo(curdate) }
+ }
+ override def subtractFrom(that : RichDate) = {
+ parts.foldLeft(that) { (curdate, next) => next.subtractFrom(curdate) }
+ }
+ //This does not make sense for a DurationList interval, pass through
+ override def floorOf(that : RichDate) = that
}
-case class Weeks(value : Int) extends CalendarDuration {
- def toDays = value * 7
+
+case class DurationList(parts : List[Duration]) extends AbstractDurationList[Duration](parts)
+
+case class AbsoluteDurationList(parts : List[AbsoluteDuration])
+ extends AbstractDurationList[AbsoluteDuration](parts) with AbsoluteDuration {
+ override def toSeconds = parts.map{ _.toSeconds }.sum
+ override def toMillisecs : Long = parts.map{ _.toMillisecs }.sum
}
+
/**
* RichDate adds some nice convenience functions to the Java date/calendar classes
* We commonly do Date/Time work in analysis jobs, so having these operations convenient
@@ -150,32 +263,12 @@ object RichDate {
}
case class RichDate(val value : Date) extends Ordered[RichDate] {
- def +(interval : Duration) = new RichDate(new Date(value.getTime + interval.toMillisecs))
- def -(interval : Duration) = new RichDate(new Date(value.getTime - interval.toMillisecs))
-
- def +(interval : CalendarDuration)(implicit tz : TimeZone) = {
- val cal = toCalendar(tz)
- cal.setLenient(true)
- cal.add(Calendar.DAY_OF_YEAR, interval.toDays)
- new RichDate(cal.getTime)
- }
- def -(interval : CalendarDuration)(implicit tz : TimeZone) = {
- val cal = toCalendar(tz)
- cal.setLenient(true)
- cal.add(Calendar.DAY_OF_YEAR, -(interval.toDays))
- new RichDate(cal.getTime)
- }
+ def +(interval : Duration) = interval.addTo(this)
+ def -(interval : Duration) = interval.subtractFrom(this)
//Inverse of the above, d2 + (d1 - d2) == d1
- def -(that : RichDate) : Duration = {
- val diff = value.getTime - that.value.getTime
- val units = List(Hours,Minutes,Seconds,Millisecs)
- //We can't fail the last one, x % 1 == 0
- val d_unit = units.find { u : Function1[Long,Duration] =>
- (diff % u(1).toMillisecs) == 0
- }.head
- d_unit( diff / d_unit(1).toMillisecs )
- }
+ def -(that : RichDate) = Duration.fromMillisecs(value.getTime - that.value.getTime)
+
override def compare(that : RichDate) : Int = {
if (value.before(that.value)) {
-1
@@ -187,42 +280,6 @@ case class RichDate(val value : Date) extends Ordered[RichDate] {
}
}
- private def earliestIn(calField : Int, tz : TimeZone) : RichDate = {
- val cal = toCalendar(tz)
- new RichDate(DateUtils.truncate(cal, calField).getTime)
- }
- /**
- * Truncate to the earliest millisecond in the same hour as this time, in the given TZ.
- */
- def earliestInHour(implicit tz : TimeZone) = earliestIn(Calendar.HOUR, tz)
- /**
- * Truncate to the earliest millisecond in the same day as this time, in the given TZ.
- */
- def earliestInDay(implicit tz : TimeZone) = earliestIn(Calendar.DAY_OF_MONTH, tz)
- /**
- * Truncate to the earliest millisecond in the most recent Monday as this time, in the given TZ.
- */
- def earliestInWeek(implicit tz : TimeZone) = {
- @tailrec def recentMonday(cal : Calendar) : Calendar = {
- cal.get(Calendar.DAY_OF_WEEK) match {
- case Calendar.MONDAY => cal
- case _ => {
- //The sorrows of the highly mutable Java standard library
- val newc = cal.clone().asInstanceOf[Calendar];
- //Make it clear we want to interpret a previous day at the beginning
- //of the year/week as the previous week
- newc.setLenient(true)
- newc.add(Calendar.DAY_OF_MONTH, -1)
- recentMonday(newc)
- }
- }
- }
- val mon = recentMonday(toCalendar(tz))
- //Set it to the earliest point in the day:
- DateUtils.truncate(mon, Calendar.DATE)
- new RichDate(mon.getTime)
- }
-
//True of the other is a RichDate with equal value, or a Date equal to value
override def equals(that : Any) = {
//Due to type erasure (scala 2.9 complains), we need to use a manifest:
@@ -291,17 +348,14 @@ case class DateRange(val start : RichDate, val end : RichDate) {
/**
* produce a contiguous non-overlapping set of DateRanges
* whose union is equivalent to this.
- * If it is passed an hour, day, or week interval, the break points
- * are set by the start timezone, otherwise we break and start + k * span.
+ * If it is passed an integral unit of time (not a DurationList), it stops at boundaries
+ * which are set by the start timezone, else break at start + k * span.
*/
- def each(span : Duration)(implicit tz: TimeZone) : Iterable[DateRange] = {
+ def each(span : Duration) : Iterable[DateRange] = {
//tail recursive method which produces output (as a stack, so it is
//reversed). acc is the accumulated list so far:
@tailrec def eachRec(acc : List[DateRange], nextDr : DateRange) : List[DateRange] = {
- val next_start = span match {
- case Hours(_) => nextDr.start.earliestInHour(tz) + span
- case _ => nextDr.start + span
- }
+ val next_start = span.floorOf(nextDr.start) + span
//the smallest grain of time we count is 1 millisecond
val this_end = next_start - Millisecs(1)
if( nextDr.end <= this_end ) {
@@ -317,32 +371,128 @@ case class DateRange(val start : RichDate, val end : RichDate) {
//have to reverse because eachDayRec produces backwards
eachRec(Nil, this).reverse
}
- /**
- * produce a contiguous non-overlapping set of DateRanges
- * whose union is equivalent to this.
- * Operate on CalendarDurations
- */
- def each(span : CalendarDuration)(implicit tz: TimeZone) : Iterable[DateRange] = {
- //tail recursive method which produces output (as a stack, so it is
- //reversed). acc is the accumulated list so far:
- @tailrec def eachRec(acc : List[DateRange], nextDr : DateRange) : List[DateRange] = {
- val next_start = span match {
- case Weeks(_) => nextDr.start.earliestInWeek(tz) + span
- case Days(_) => nextDr.start.earliestInDay(tz) + span
- }
- //the smallest grain of time we count is 1 millisecond
- val this_end = next_start - Millisecs(1)
- if( nextDr.end <= this_end ) {
- //This is the last block, output and end:
- nextDr :: acc
+}
+
+/*
+ * All the Globification logic is encoded in this one
+ * class. It has a list of child ranges that share boundaries
+ * with the current range and are completely contained within
+ * current range. This children must be ordered from largest
+ * to smallest in size.
+ */
+class BaseGlobifier(dur : Duration, val sym: String, pattern : String, tz : TimeZone, child : Option[BaseGlobifier]) {
+ import DateOps._
+ // result <= rd
+ private def greatestLowerBound(rd : RichDate) = dur.floorOf(rd)
+ // rd <= result
+ private def leastUpperBound(rd : RichDate) : RichDate = {
+ val lb = greatestLowerBound(rd)
+ if (lb == rd)
+ rd
+ else
+ lb + dur
+ }
+
+ def format(rd : RichDate) : String = String.format(pattern, rd.toCalendar(tz))
+
+ // Generate a lazy list of all children
+ final def children : Stream[BaseGlobifier] = child match {
+ case Some(c) => Stream.cons(c, c.children)
+ case None => Stream.empty
+ }
+
+ final def asteriskChildren(rd : RichDate) : String = {
+ val childStarPattern = children.foldLeft(pattern) { (this_pat, child) =>
+ this_pat.replaceAll(Pattern.quote(child.sym), "*")
+ }
+ String.format(childStarPattern, rd.toCalendar(tz))
+ }
+
+ // Handles the case of zero interior boundaries
+ // with potential boundaries only at the end points.
+ private def simpleCase(dr : DateRange) : List[String] = {
+ val sstr = format(dr.start)
+ val estr = format(dr.end)
+ if (dr.end < dr.start) {
+ Nil
+ }
+ else if (child.isEmpty) {
+ //There is only one block:
+ assert(sstr == estr, "Malformed heirarchy" + sstr + " != " + estr)
+ List(sstr)
+ }
+ else {
+ /*
+ * Two cases: we should asterisk our children, or we need
+ * to recurse. If we fill this entire range, just asterisk,
+ */
+ val bottom = children.last
+ val fillsright = format(leastUpperBound(dr.end)) ==
+ format(bottom.leastUpperBound(dr.end))
+ val fillsleft = format(greatestLowerBound(dr.start)) ==
+ format(bottom.greatestLowerBound(dr.start))
+ if (fillsright && fillsleft) {
+ List(asteriskChildren(dr.start))
}
else {
- //Put today's portion, and then start on tomorrow:
- val today = DateRange(nextDr.start, this_end)
- eachRec(today :: acc, DateRange(next_start, nextDr.end))
+ child.get.globify(dr)
}
}
- //have to reverse because eachDayRec produces backwards
- eachRec(Nil, this).reverse
+ }
+
+ def globify(dr : DateRange) : List[String] = {
+ /* We know:
+ * start <= end : by assumption
+ * mid1 - start < delta : mid1 is least upper bound
+ * end - mid2 < delta : mid2 is greatest lower bound
+ * mid1 = mid2 + n*delta : both on the boundary.
+ * if mid1 <= mid2, then we contain a boundary point,
+ * else we do not.
+ */
+ val mid1 = leastUpperBound(dr.start)
+ val mid2 = greatestLowerBound(dr.end)
+ //Imprecise patterns may not need to drill down, let's see if we can stop early:
+ val sstr = format(dr.start)
+ val estr = format(dr.end)
+ if (sstr == estr) {
+ List(sstr)
+ }
+ else if (dr.end < dr.start) {
+ //This is nonsense:
+ Nil
+ }
+ else if (mid2 < mid1) {
+ //We do not contain a boundary point:
+ simpleCase(dr)
+ }
+ // otherwise we contain one or more than one boundary points
+ else if (mid1 == mid2) {
+ //we contain exactly one boundary point:
+ simpleCase(DateRange(dr.start, mid1 - Millisecs(1))) ++
+ simpleCase(DateRange(mid1, dr.end))
+ }
+ else {
+ //We contain 2 or more boundary points:
+ // [start <= mid1 < mid2 <= end]
+ // First check to see if we even need to check our children:
+ simpleCase(DateRange(dr.start, mid1 - Millisecs(1))) ++
+ (asteriskChildren(mid1) ::
+ globify(DateRange(mid1 + dur, dr.end)))
+ }
}
}
+
+case class HourGlob(pat : String)(implicit tz : TimeZone)
+ extends BaseGlobifier(Hours(1),"%1$tH", pat, tz, None)
+
+case class DayGlob(pat : String)(implicit tz: TimeZone)
+ extends BaseGlobifier(Days(1)(tz), "%1$td", pat, tz, Some(HourGlob(pat)))
+
+case class MonthGlob(pat : String)(implicit tz: TimeZone)
+ extends BaseGlobifier(Months(1)(tz), "%1$tm", pat, tz, Some(DayGlob(pat)))
+
+/*
+ * This is the outermost globifier and should generally be used to globify
+ */
+case class Globifier(pat : String)(implicit tz: TimeZone)
+ extends BaseGlobifier(Years(1)(tz), "%1$tY", pat, tz, Some(MonthGlob(pat)))
View
2  src/main/scala/com/twitter/scalding/GroupBuilder.scala
@@ -101,7 +101,7 @@ class GroupBuilder(val groupFields : Fields) extends FieldConversions with Tuple
val out_arity = toFields.size
assert(out_arity == 1, "toList: can only add a single element to the GroupBuilder")
mapReduceMap[T, List[T], List[T]](fieldDef) { //Map
- x => List(x)
+ x => Option(x).map{ List(_) }.getOrElse(List())
} { //Reduce
(t1, t2) => t2 ++ t1
} { //Map
View
51 src/main/scala/com/twitter/scalding/Job.scala
@@ -21,6 +21,16 @@ import cascading.pipe.Pipe
//For java -> scala implicits on collections
import scala.collection.JavaConversions._
+import java.util.{Map => JMap}
+
+object Job {
+ // Uses reflection to create a job by name
+ def apply(jobName : String, args : Args) : Job =
+ Class.forName(jobName).
+ getConstructor(classOf[Args]).
+ newInstance(args).
+ asInstanceOf[Job]
+}
@serializable
class Job(val args : Args) extends TupleConversions with FieldConversions {
@@ -51,8 +61,20 @@ class Job(val args : Args) extends TupleConversions with FieldConversions {
*/
def next : Option[Job] = None
- //Only very different styles of Jobs should override this.
+ // Only very different styles of Jobs should override this.
def buildFlow(implicit mode : Mode) = {
+ // first verify all the source inputs are present
+ flowDef.getSources()
+ .asInstanceOf[JMap[String,AnyRef]]
+ // this is a map of (name, Tap)
+ .foreach { nameTap =>
+ // Each named source must be present:
+ mode.getSourceNamed(nameTap._1)
+ .get
+ // This can throw a InvalidSourceException
+ .validateTaps(mode)
+ }
+
mode.newFlowConnector(ioSerializations ++ List("com.twitter.scalding.KryoHadoopSerialization"))
.connect(flowDef)
}
@@ -95,3 +117,30 @@ trait DefaultDateRangeJob extends Job {
assert(start <= end, "end of date range must occur after the start")
implicit val dateRange = DateRange(start, end)
}
+
+/*
+ * Run a list of shell commands through bash in the given order. Return success
+ * when all commands succeed. Excution stops after the first failure. The
+ * failing command is printed to stdout.
+ */
+class ScriptJob(cmds: Iterable[String]) extends Job(Args("")) {
+ override def run(implicit mode : Mode) = {
+ try {
+ cmds.dropWhile {
+ cmd: String => {
+ new java.lang.ProcessBuilder("bash", "-c", cmd).start().waitFor() match {
+ case x if x != 0 =>
+ println(cmd + " failed, exitStatus: " + x)
+ false
+ case 0 => true
+ }
+ }
+ }.isEmpty
+ } catch {
+ case e : Exception => {
+ e.printStackTrace
+ false
+ }
+ }
+ }
+}
View
315 src/main/scala/com/twitter/scalding/KryoHadoopSerialization.scala
@@ -22,132 +22,36 @@ import java.nio.ByteBuffer
import org.apache.hadoop.io.serializer.{Serialization, Deserializer, Serializer, WritableSerialization}
-//This version of Kryo has support for objects without default constructors, which
-//are not uncommon in scala
-import de.javakaffee.kryoserializers.{KryoReflectionFactorySupport=>Kryo}
import com.esotericsoftware.kryo.{Serializer => KSerializer}
+import com.esotericsoftware.kryo.serialize.DateSerializer
+import cascading.kryo.KryoSerialization;
import cascading.tuple.hadoop.BufferedInputStream
import cascading.tuple.hadoop.TupleSerialization
+import cascading.kryo.Kryo
import scala.annotation.tailrec
-/**
-* Use Kryo to serialize any object. Thrift and Protobuf
-* are strictly more efficient, but this allows any object to
-* be used. Recommended for objects sent between mappers and reducers
-* not for final output serialization.
-*
-* Modeled on:
-* https://github.com/mesos/spark/blob/master/core/src/main/scala/spark/KryoSerializer.scala
-*
-* @author Oscar Boykin
-*/
-
-@serializable
-object KryoHadoopSerialization {
- def readSize(instream : InputStream) = {
- @tailrec
- def sizeR(bytes : Int) : Int = {
- // 1 -> 256 - 1, 2 -> 256**2 - 1, 4-> 256**4 - 1
- val maxValue = (1L << (8*(bytes))) - 1
- //This is just a big-endian read:
- val thisValue = (1 to bytes).foldLeft(0) {
- (acc, _) => (acc << 8) + instream.read
- }
- if (thisValue < maxValue) {
- thisValue
- }
- else {
- //Try again:
- sizeR(bytes * 2)
- }
- }
- sizeR(1)
- }
-
- /** Returns the total number of bytes written into the
- * the stream
- */
- def writeSize(os : OutputStream, sz : Int) = {
- @tailrec
- def writeR(written : Int, bytes : Int) : Int = {
- val maxValue = (1L << (8*(bytes))) - 1
- if( sz < maxValue ) {
- //Do a big endian write:
- (0 until bytes).reverse.foreach { shift =>
- os.write(sz >> (8*shift))
- }
- written + bytes
- }
- else {
- //Signal it was too big:
- (0 until bytes).foreach { (_) => os.write(0xff) }
- //Try with twice as many bytes:
- writeR(written + bytes, 2*bytes)
- }
- }
- writeR(0,1)
- }
-}
-
-// Singletons are easy, you just return the singleton and don't read:
-// It's important you actually do this, or Kryo will generate Nil != Nil, or None != None
-class SingletonSerializer(obj: AnyRef) extends KSerializer {
- override def writeObjectData(buf: ByteBuffer, obj: AnyRef) {}
- override def readObjectData[T](buf: ByteBuffer, cls: Class[T]): T = obj.asInstanceOf[T]
-}
-
-// Lists cause stack overflows for Kryo because they are cons cells.
-class ListSerializer(kser : Kryo) extends KSerializer {
- override def writeObjectData(buf: ByteBuffer, obj: AnyRef) {
- //Write the size:
- val list = obj.asInstanceOf[List[AnyRef]]
- kser.writeObjectData(buf, new java.lang.Integer(list.size))
- /*
- * An excellent question arrises at this point:
- * How do we deal with List[List[T]]?
- * Since by the time this method is called, the ListSerializer has
- * already been registered, this iterative method will be used on
- * each element, and we should be safe.
- * The only risk is List[List[List[List[.....
- * But anyone who tries that gets what they deserve
- */
- list.foreach { t => kser.writeClassAndObject(buf, t) }
- }
+class KryoHadoopSerialization extends KryoSerialization {
+
+ /** TODO!!!
+ * Deal with this issue. The problem is grouping by Kryo serialized
+ * objects silently breaks the results. If Kryo gets in front of TupleSerialization
+ * (and possibly Writable, unclear at this time), grouping is broken.
+ * There are two issues here:
+ * 1) Kryo objects not being compared properly.
+ * 2) Kryo being used instead of cascading.
+ *
+ * We must identify each and fix these bugs.
+ */
+ val highPrioritySerializations = List(new WritableSerialization, new TupleSerialization)
- override def readObjectData[T](buf: ByteBuffer, cls: Class[T]) : T = {
- val size = kser.readObjectData(buf, classOf[java.lang.Integer]).intValue
- //Produce the reversed list:
- if (size == 0) {
- /*
- * this is only here at compile time. The type T is erased, but the
- * compiler verifies that we are intending to return a type T here.
- */
- Nil.asInstanceOf[T]
- }
- else {
- (0 until size).foldLeft(List[AnyRef]()) { (l, i) =>
- val iT = kser.readClassAndObject(buf)
- iT :: l
- }.reverse.asInstanceOf[T]
- }
+ override def accept(klass : Class[_]) = {
+ highPrioritySerializations.forall { !_.accept(klass) }
}
-}
-
-class KryoHadoopSerialization[T] extends Serialization[T] with Serializable {
- val INIT_SIZE = 4 * 1024 //4kB to start, double if needed
-
- //TODO a more robust way to handle being low priority should go here. For now, it looks like
- //the only serializer added after we add user io.serializations is TupleSerialization, so we are
- //handling it below. A better scheme is to look at the JobConf, and instantiate all the named
- //serializers. For now, we are waiting until this actually poses itself as a problem
- val highPrioritySerializations = List(new WritableSerialization, new TupleSerialization)
+ override def decorateKryo(newK : Kryo) : Kryo = {
- def newKryo(klass : Class[_]) = {
- val newK = new Kryo
- newK.setRegistrationOptional(true)
/* Here is some jank for your reading pleasure:
* Kryo looks up the serializer in a hashtable of java.lang.Class objects.
* What if X is a subclass of Y and my serialization is co-variant?
@@ -159,24 +63,11 @@ class KryoHadoopSerialization[T] extends Serialization[T] with Serializable {
newK.register(List(1).getClass, listSer)
//Make sure to register the Nil singleton, different from List(1).getClass
newK.register(Nil.getClass, listSer)
- //Deal with the case that we directly have a list:
- Option(klass).foreach { cls =>
- if (classOf[List[_]].isAssignableFrom(cls)) {
- newK.register(cls, listSer)
- } else {
- try {
- newK.register(cls)
- } catch {
- case iae : java.lang.IllegalArgumentException => {
- // This is because we tried to register a type twice.
- // That is okay, nothing is really wrong, this can
- // happen if you are lazy and register types like java.lang.Integer
- // which Hadoop is not going to use Kryo for anyway.
- }
- }
- }
- }
- //Add commonly used types:
+ newK.register(classOf[java.util.Date], new DateSerializer)
+ newK.register(classOf[RichDate], new RichDateSerializer(newK))
+ newK.register(classOf[DateRange], new DateRangeSerializer(newK))
+
+ //Add commonly used types with Fields serializer:
registeredTypes.foreach { cls => newK.register(cls) }
// Register some commonly used Scala singleton objects. Because these
// are singletons, we must return the exact same local object when we
@@ -203,111 +94,87 @@ class KryoHadoopSerialization[T] extends Serialization[T] with Serializable {
// Arrays
Array(1), Array(1.0), Array(1.0f), Array(1L), Array(""), Array(("", "")),
Array(new java.lang.Object), Array(1.toByte), Array(true), Array('c'),
- // Specialized Tuple2s
+ // Specialized Tuple2s: (Int,Long,Double) are defined for 1,2
+ Tuple1(""), Tuple1(1), Tuple1(1L), Tuple1(1.0),
("", ""), (1, 1), (1.0, 1.0), (1L, 1L),
(1, 1.0), (1.0, 1), (1L, 1.0), (1.0, 1L), (1, 1L), (1L, 1),
// Options and Either
Some(1), Left(1), Right(1),
- // Higher-dimensional tuples
+ // Higher-dimensional tuples, not specialized
(1, 1, 1), (1, 1, 1, 1), (1, 1, 1, 1, 1)
).map { _.getClass }
}
+}
- //We should handle everything the higher priority doesn't
- override def accept(klass : Class[_]) = highPrioritySerializations.forall { !_.accept(klass) }
- override def getDeserializer(klass : Class[T]) = new KryoDeserializer[T](klass)
- override def getSerializer(klass : Class[T]) = new KryoSerializer[T](klass)
-
- class KryoSerializer[T](klass : Class[T]) extends Serializer[T] with Serializable {
- var out : Option[OutputStream] = None
- var kryo : Option[Kryo] = None
- var buf = ByteBuffer.wrap(new Array[Byte](INIT_SIZE))
- val SIZE_MULT = 1.5 //From Java default does 3*size/2 + 1
- /**
- * this keeps trying to write, increasing the buf each time until the
- * allocation fails.
- */
- @tailrec
- private def allocatingWrite(obj : Any) : Unit = {
- val k = kryo.get
- buf.rewind
- val start = buf.position
- val finished = try {
- k.writeObjectData(buf, obj)
- val end = buf.position
- //write the size:
- val size = end - start
- //Now put into the OutputStream:
- val os = out.get
- KryoHadoopSerialization.writeSize(os, size)
- os.write(buf.array, start, size)
- true
- }
- catch {
- case ex: com.esotericsoftware.kryo.SerializationException => {
- //Take a step up if we need to realloc:
- buf = ByteBuffer.wrap(new Array[Byte]((SIZE_MULT * buf.capacity).toInt + 1))
- false
- }
- }
- if (!finished) {
- allocatingWrite(obj)
- }
- }
-
- override def open(output : OutputStream) {
- out = Some(output)
- //We have to reset the kryo serializer each time because
- //the order in which we see classes defines the codes given to them.
- val nk = newKryo(klass)
- kryo = Some(nk)
- }
-
- override def serialize(obj : T) {
- allocatingWrite(obj)
- }
-
- override def close {
- out.map { _.close }
- out = None
- kryo = None
- }
+// Singletons are easy, you just return the singleton and don't read:
+// It's important you actually do this, or Kryo will generate Nil != Nil, or None != None
+class SingletonSerializer(obj: AnyRef) extends KSerializer {
+ override def writeObjectData(buf: ByteBuffer, obj: AnyRef) {}
+ override def readObjectData[T](buf: ByteBuffer, cls: Class[T]): T = obj.asInstanceOf[T]
+}
+// Lists cause stack overflows for Kryo because they are cons cells.
+class ListSerializer(kser : Kryo) extends KSerializer {
+ override def writeObjectData(buf: ByteBuffer, obj: AnyRef) {
+ //Write the size:
+ val list = obj.asInstanceOf[List[AnyRef]]
+ kser.writeObjectData(buf, new java.lang.Integer(list.size))
+ /*
+ * An excellent question arrises at this point:
+ * How do we deal with List[List[T]]?
+ * Since by the time this method is called, the ListSerializer has
+ * already been registered, this iterative method will be used on
+ * each element, and we should be safe.
+ * The only risk is List[List[List[List[.....
+ * But anyone who tries that gets what they deserve
+ */
+ list.foreach { t => kser.writeClassAndObject(buf, t) }
}
- class KryoDeserializer[T](klass : Class[T]) extends Deserializer[T] with Serializable {
- var in : Option[InputStream] = None
- var kryo : Option[Kryo] = None
- var buf = ByteBuffer.wrap(new Array[Byte](INIT_SIZE))
-
- private def ensureAlloc(sz : Int) {
- if (buf.capacity < sz) {
- buf = ByteBuffer.wrap(new Array[Byte](sz))
- }
+ override def readObjectData[T](buf: ByteBuffer, cls: Class[T]) : T = {
+ val size = kser.readObjectData(buf, classOf[java.lang.Integer]).intValue
+ //Produce the reversed list:
+ if (size == 0) {
+ /*
+ * this is only here at compile time. The type T is erased, but the
+ * compiler verifies that we are intending to return a type T here.
+ */
+ Nil.asInstanceOf[T]
}
-
- override def open(input : InputStream) {
- in = Some(input)
- //We have to reset the kryo serializer each time because
- //the order in which we see classes defines the codes given to them.
- val nk = newKryo(klass)
- kryo = Some(nk)
+ else {
+ (0 until size).foldLeft(List[AnyRef]()) { (l, i) =>
+ val iT = kser.readClassAndObject(buf)
+ iT :: l
+ }.reverse.asInstanceOf[T]
}
+ }
+}
- override def deserialize(ignored : T) : T = {
- val k = kryo.get
- //read into our ByteBuffer:
- val size = KryoHadoopSerialization.readSize(in.get)
- ensureAlloc(size)
- buf.rewind
- in.get.read(buf.array, buf.position, size)
- k.readObjectData[T](buf, klass)
- }
+/***
+ * Below are some serializers for objects in the scalding project.
+ */
+class RichDateSerializer(kser : Kryo) extends KSerializer {
+ override def writeObjectData(buf: ByteBuffer, obj: AnyRef) {
+ kser.writeObjectData(buf,
+ new java.lang.Long(obj.asInstanceOf[RichDate].value.getTime))
+ }
+ override def readObjectData[T](buf: ByteBuffer, cls: Class[T]): T = {
+ RichDate(kser.readObjectData[java.lang.Long](buf,classOf[java.lang.Long]).longValue)
+ .asInstanceOf[T]
+ }
+}
- override def close {
- in.map { _.close }
- in = None
- kryo = None
- }
+class DateRangeSerializer(kser : Kryo) extends KSerializer {
+ override def writeObjectData(buf: ByteBuffer, obj: AnyRef) {
+ kser.writeObjectData(buf,
+ new java.lang.Long(obj.asInstanceOf[DateRange].start.value.getTime))
+ kser.writeObjectData(buf,
+ new java.lang.Long(obj.asInstanceOf[DateRange].end.value.getTime))
+ }
+ override def readObjectData[T](buf: ByteBuffer, cls: Class[T]): T = {
+ val start = kser.readObjectData[java.lang.Long](buf,classOf[java.lang.Long])
+ val end = kser.readObjectData[java.lang.Long](buf,classOf[java.lang.Long])
+ DateRange(RichDate(start.longValue), RichDate(end.longValue))
+ .asInstanceOf[T]
}
}
View
16 src/main/scala/com/twitter/scalding/Mode.scala
@@ -31,12 +31,13 @@ object Mode {
/**
* This mode is used by default by sources in read and write
*/
- implicit var mode : Mode = Local()
+ implicit var mode : Mode = Local(false)
}
/**
* There are three ways to run jobs
+* sourceStrictness is set to true
*/
-abstract class Mode {
+abstract class Mode(val sourceStrictness : Boolean) {
//We can't name two different pipes with the same name.
protected val sourceMap = MMap[Source, Pipe]()
@@ -50,9 +51,13 @@ abstract class Mode {
def getReadPipe(s : Source, p: => Pipe) : Pipe = {
sourceMap.getOrElseUpdate(s, p)
}
+
+ def getSourceNamed(name : String) : Option[Source] = {
+ sourceMap.find { _._1.toString == name }.map { _._1 }
+ }
}
-case class Hdfs(val config : Configuration) extends Mode {
+case class Hdfs(strict : Boolean, val config : Configuration) extends Mode(strict) {
def newFlowConnector(iosersIn : List[String]) = {
val props = config.foldLeft(Map[AnyRef, AnyRef]()) {
(acc, kv) => acc + ((kv.getKey, kv.getValue))
@@ -63,14 +68,15 @@ case class Hdfs(val config : Configuration) extends Mode {
}
}
-case class Local() extends Mode {
+case class Local(strict : Boolean) extends Mode(strict) {
//No serialization is actually done in local mode, it's all memory
def newFlowConnector(iosers : List[String]) = new LocalFlowConnector
}
/**
* Memory only testing for unit tests
*/
-case class Test(val buffers : Map[Source,Buffer[Tuple]]) extends Mode {
+case class Test(val buffers : Map[Source,Buffer[Tuple]]) extends Mode(false) {
//No serialization is actually done in Test mode, it's all memory
def newFlowConnector(iosers : List[String]) = new LocalFlowConnector
+
}
View
120 src/main/scala/com/twitter/scalding/RichPipe.scala
@@ -19,7 +19,7 @@ import cascading.tap._
import cascading.scheme._
import cascading.pipe._
import cascading.pipe.assembly._
-import cascading.pipe.cogroup._
+import cascading.pipe.joiner._
import cascading.flow._
import cascading.operation._
import cascading.operation.aggregator._
@@ -38,6 +38,8 @@ object RichPipe extends FieldConversions with TupleConversions {
nextPipe = nextPipe + 1
"_pipe_" + nextPipe.toString
}
+
+ def assignName(p : Pipe) = new Pipe(getNextName, p)
}
@serializable
@@ -61,18 +63,25 @@ class RichPipe(val pipe : Pipe) {
* A common use-case comes from a groupAll and reduction to one row,
* then you want to send the results back out to every element in a pipe
*
+ * This uses joinWithTiny, so tiny pipe is replicated to all Mappers. If it
+ * is large, this will blow up. Get it: be foolish here and LOSE IT ALL!
+ *
* Use at your own risk.
*/
def crossWithTiny(tiny : Pipe) = {
- // This should be larger than the number of
- // of reducers but there is no need for it
- // to be much larger.
- val PARALLELISM = 1000
- val tinyJoin = tiny.flatMap(() -> 'joinTiny) { (u:Unit) => (0 until PARALLELISM) }
+ /*
+ // This uses optimized joins, but these are a new feature
+ val tinyJoin = tiny.map(() -> '__joinTiny__) { (u:Unit) => 1 }
+ map(() -> '__joinBig__) { (u:Unit) => 1 }
+ .joinWithTiny('__joinBig__ -> '__joinTiny__, tinyJoin)
+ .discard('__joinBig__, '__joinTiny__)
+ */
+ val COPIES = 1000
+ val tinyJoin = tiny.flatMap(() -> '__joinTiny__) { (u:Unit) => (0 until COPIES) }
//Now attach a random item:
- map(() -> 'joinBig) { (u:Unit) => (new java.util.Random).nextInt(PARALLELISM) }
- .joinWithSmaller('joinBig -> 'joinTiny, tinyJoin)
- .discard('joinBig, 'joinTiny)
+ map(() -> '__joinBig__) { (u:Unit) => (new java.util.Random()).nextInt(COPIES) }
+ .joinWithSmaller('__joinBig__ -> '__joinTiny__, tinyJoin)
+ .discard('__joinBig__, '__joinTiny__)
}
//Discard the given fields, and keep the rest
@@ -101,15 +110,9 @@ class RichPipe(val pipe : Pipe) {
def unique(f : Fields) : Pipe = groupBy(f) { _.size('__uniquecount__) }.project(f)
/**
- * Merge or Concatenate several pipes together with this one, but do so while grouping
- * over some set of fields. Within this group, you can do the usual
- * every/buffer operations.
- *
- * Eventually you must groupBy before writing or joining
- * TODO: In principle, both issues are probably fixable.
+ * Merge or Concatenate several pipes together with this one:
*/
- def ++(that : MergedRichPipe) = new MergedRichPipe(this :: that.pipes)
- def ++(that : RichPipe) = new MergedRichPipe(List(this,that))
+ def ++(that : Pipe) = new Merge(assignName(this.pipe), assignName(that))
// Group all tuples down to one reducer.
// (due to cascading limitation).
@@ -221,7 +224,7 @@ class RichPipe(val pipe : Pipe) {
*/
def joinWithSmaller(fs :(Fields,Fields), that : Pipe, joiner : Joiner = new InnerJoin) = {
//Rename these pipes to avoid cascading name conflicts
- new CoGroup(new Pipe(getNextName, pipe), fs._1, new Pipe(getNextName, that), fs._2, joiner)
+ new CoGroup(assignName(pipe), fs._1, assignName(that), fs._2, joiner)
}
def joinWithLarger(fs : (Fields, Fields), that : Pipe) = {
@@ -243,6 +246,26 @@ class RichPipe(val pipe : Pipe) {
@deprecated("Equivalent to joinWithSmaller. Be explicit.")
def outerJoin(field_def :(Fields,Fields), that : Pipe) = join(field_def, that, new OuterJoin)
+ /**
+ * This does an assymmetric join, using cascading's "Join". This only runs through
+ * this pipe once, and keeps the right hand side pipe in memory (but is spillable).
+ * WARNING: this does not work with outer joins, or right joins (according to
+ * cascading documentation), only inner and left join versions are given.
+ */
+ def joinWithTiny(fs :(Fields,Fields), that : Pipe) = {
+ //Rename these pipes to avoid cascading name conflicts
+ //new Join(assignName(pipe), fs._1, assignName(that), fs._2, new InnerJoin)
+ // TODO: Join seems broken still, keep testing the above and remove below soon:
+ joinWithSmaller(fs, that)
+ }
+
+ def leftJoinWithTiny(fs :(Fields,Fields), that : Pipe) = {
+ //Rename these pipes to avoid cascading name conflicts
+ // new Join(assignName(pipe), fs._1, assignName(that), fs._2, new LeftJoin)
+ // TODO: Join seems broken still, keep testing the above and remove below soon:
+ leftJoinWithSmaller(fs, that)
+ }
+
def write(outsource : Source)(implicit flowDef : FlowDef, mode : Mode) = {
outsource.write(pipe)(flowDef, mode)
pipe
@@ -256,64 +279,3 @@ class RichPipe(val pipe : Pipe) {
}
}
}
-
-/**
-* Represents more than one pipe that have been concatenated, or merged,
-* You can only call flatMap/map/filter/group/write on this set.
-* This is exactly a Monad situation, but we haven't abstracted so it is full
-* of boilerplate.
-*/
-class MergedRichPipe(val pipes : List[RichPipe]) {
- //Get the implicit conversions:
- import RichPipe._
-
- def ++(that : MergedRichPipe) = new MergedRichPipe(that.pipes ++ pipes)
- def ++(that : RichPipe) = new MergedRichPipe(that :: pipes)
-
- def rename(fields : (Fields,Fields)) = {
- new MergedRichPipe(pipes.map { (rp : RichPipe) => rp.rename(fields) }.map { RichPipe(_) })
- }
-
- def project(f : Any*) = {
- new MergedRichPipe(pipes.map { (rp : RichPipe) => rp.project(f) }.map { RichPipe(_) })
- }
-
- def discard(f : Any*) = {
- new MergedRichPipe(pipes.map { (rp : RichPipe) => rp.discard(f) }.map { RichPipe(_) })
- }
-
- //Insert a function into the pipeline:
- def then[T,U](pfn : (T) => U)(implicit in : (RichPipe)=>T, out : (U)=>Pipe) : MergedRichPipe = {
- new MergedRichPipe(pipes.map { rp => out(pfn(in(rp))) }.map { RichPipe(_) } )
- }
-
- def filter[A:TupleConverter](f : Any*)(fn : (A) => Boolean) : MergedRichPipe = {
- new MergedRichPipe(pipes.map { (rp : RichPipe) => rp.filter(f)(fn) }
- .map { RichPipe(_) })
- }
-
- def map[A,T](fs : (Fields,Fields))(fn : A => T)
- (implicit conv : TupleConverter[A], setter : TupleSetter[T]) = {
- new MergedRichPipe(pipes.map { _.map[A,T](fs)(fn)(conv,setter) }.map { RichPipe(_) })
- }
- def mapTo[A,T](fs : (Fields,Fields))(fn : A => T)
- (implicit conv : TupleConverter[A], setter : TupleSetter[T]) = {
- new MergedRichPipe(pipes.map { _.mapTo[A,T](fs)(fn)(conv,setter) }.map { RichPipe(_) })
- }
- def flatMap[A,T](fs : (Fields,Fields))(fn : A => Iterable[T])
- (implicit conv : TupleConverter[A], setter : TupleSetter[T]) = {
- new MergedRichPipe(pipes.map { _.flatMap[A,T](fs)(fn)(conv,setter) }.map { RichPipe(_) })
- }
- def flatMapTo[A,T](fs : (Fields,Fields))(fn : A => Iterable[T])
- (implicit conv : TupleConverter[A], setter : TupleSetter[T]) = {
- new MergedRichPipe(pipes.map { _.flatMapTo[A,T](fs)(fn)(conv,setter) }.map { RichPipe(_) })
- }
-
- def groupBy(f : Any*)(gs : GroupBuilder => GroupBuilder) : Pipe = {
- val mpipes = pipes.map { rp : RichPipe => new Pipe(getNextName, rp.pipe) }
- gs(new GroupBuilder(f)).schedule(pipes.head.pipe.getName, mpipes : _*)
- }
-
- def unique(f : Any*) : Pipe = groupBy(f : _*){g => g}
- // TODO: I think we can handle CoGroup here, but we need to look
-}
View
159 src/main/scala/com/twitter/scalding/Source.scala
@@ -43,6 +43,11 @@ import cascading.tuple.{Tuple, TupleEntryIterator, Fields}
import collection.mutable.{Buffer, MutableList}
/**
+ * thrown when validateTaps fails
+ */
+class InvalidSourceException(message : String) extends RuntimeException(message)
+
+/**
* Every source must have a correct toString method. If you use
* case classes for instances of sources, you will get this for free.
* This is one of the several reasons we recommend using cases classes
@@ -106,46 +111,90 @@ abstract class Source extends java.io.Serializable {
}
}
- private def pathIsGood(p : String, conf : Configuration) = {
+ protected def pathIsGood(p : String, conf : Configuration) = {
val path = new Path(p)
Option(path.getFileSystem(conf).globStatus(path)).
map(_.length > 0).
getOrElse(false)
}
- protected def readWriteMode(rwmode : AccessMode) : SinkMode = {
- rwmode match {
- case Read() => SinkMode.KEEP
- case Write() => SinkMode.REPLACE
- }
- }
-
def hdfsPaths : Iterable[String]
+ // By default, we write to the LAST path returned by hdfsPaths
+ def hdfsWritePath = hdfsPaths.last
def localPath : String
def localScheme : LocalScheme
def hdfsScheme : Scheme[_ <: FlowProcess[_],_,_,_,_,_]
- protected def createTap(m : AccessMode, mode : Mode) : RawTap = {
+ protected def createTap(readOrWrite : AccessMode, mode : Mode) : RawTap = {
mode match {
- case Local() => new FileTap(localScheme, localPath, readWriteMode(m))
- case Test(buffers) => new MemoryTap(localScheme, testBuffer(buffers, m))
- case Hdfs(conf) => createHdfsTap(m, conf)
+ // TODO support strict in Local
+ case Local(_) => {
+ val sinkmode = readOrWrite match {
+ case Read() => SinkMode.KEEP
+ case Write() => SinkMode.REPLACE
+ }
+ new FileTap(localScheme, localPath, sinkmode)
+ }
+ case Test(buffers) => new MemoryTap(localScheme, testBuffer(buffers, readOrWrite))
+ case hdfsMode @ Hdfs(_, _) => readOrWrite match {
+ case Read() => createHdfsReadTap(hdfsMode)
+ case Write() => createHdfsWriteTap(hdfsMode)
+ }
}
}
- protected def createHdfsTap(m : AccessMode, conf : Configuration) : RawTap = {
- val taps = hdfsPaths.
- filter{ (m == Write()) || pathIsGood(_, conf) }.
- map(new Hfs(hdfsScheme, _, readWriteMode(m)).asInstanceOf[RawTap])
+ // This is only called when Mode.sourceStrictness is true
+ protected def hdfsReadPathsAreGood(conf : Configuration) = {
+ hdfsPaths.forall { pathIsGood(_, conf) }
+ }
- m match {
- case Read() => taps.size match {
- case 0 => error("No existing paths found in " + hdfsPaths)
- case 1 => taps.head
- case _ => new MultiSourceTap(taps.toSeq : _*)
+ /*
+ * This throws InvalidSourceException if:
+ * 1) we are in sourceStrictness mode and all sources are not present.
+ * 2) we are not in the above, but some source has no input whatsoever
+ * TODO this only does something for HDFS now. Maybe we should do the same for LocalMode
+ */
+ def validateTaps(mode : Mode) : Unit = {
+ mode match {
+ case Hdfs(strict, conf) => {
+ if (strict && (!hdfsReadPathsAreGood(conf))) {
+ throw new InvalidSourceException("[" + this.toString + "] No good paths in: " + hdfsPaths.toString)
}
- case Write() => taps.head
+ else if (!hdfsPaths.exists { pathIsGood(_, conf) }) {
+ //Check that there is at least one good path:
+ throw new InvalidSourceException("[" + this.toString + "] No good paths in: " + hdfsPaths.toString)
+ }
+ }
+ case _ => ()
+ }
+ }
+
+ protected def createHdfsReadTap(hdfsMode : Hdfs) :
+ Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], OutputCollector[_,_]] = {
+ val goodPaths = if (hdfsMode.sourceStrictness) {
+ //we check later that all the paths are good
+ hdfsPaths
}
+ else {
+ // If there are no matching paths, this is still an error, we need at least something:
+ hdfsPaths.filter{ pathIsGood(_, hdfsMode.config) }
+ }
+ val taps = goodPaths.map(new Hfs(hdfsScheme, _, SinkMode.KEEP))
+ taps.size match {
+ case 0 => {
+ // This case is going to result in an error, but we don't want to throw until
+ // validateTaps, so we just put a dummy path to return something so the
+ // Job constructor does not fail.
+ new Hfs(hdfsScheme, hdfsPaths.head, SinkMode.KEEP)
+ }
+ case 1 => taps.head
+ case _ => new MultiSourceTap[HadoopFlowProcess,
+ JobConf, RecordReader[_,_], OutputCollector[_,_]](taps.toSeq : _*)
+ }
+ }
+ protected def createHdfsWriteTap(hdfsMode : Hdfs) :
+ Tap[HadoopFlowProcess, JobConf, RecordReader[_,_], OutputCollector[_,_]] = {
+ new Hfs(hdfsScheme, hdfsWritePath, SinkMode.REPLACE)
}
/**
@@ -154,7 +203,8 @@ abstract class Source extends java.io.Serializable {
*/
def readAtSubmitter[T](implicit mode : Mode, conv : TupleConverter[T]) : Stream[T] = {
val tupleEntryIterator = mode match {
- case Local() => {
+ case Local(_) => {
+ // TODO support strict here
val fp = new LocalFlowProcess()
val tap = new FileTap(localScheme, localPath, SinkMode.KEEP)
tap.openForRead(fp)
@@ -164,13 +214,9 @@ abstract class Source extends java.io.Serializable {
val tap = new MemoryTap(localScheme, testBuffer(buffers, Read()))
tap.openForRead(fp)
}
- case Hdfs(conf) => {
+ case hdfsMode @ Hdfs(_, conf) => {
val fp = new HadoopFlowProcess(new JobConf(conf))
- val taps = hdfsPaths.
- filter{ pathIsGood(_, conf) }.
- map(new Hfs(hdfsScheme, _, SinkMode.KEEP).asInstanceOf[RawTap])
- val tap = new MultiSourceTap[HadoopFlowProcess,
- JobConf, RecordReader[_,_], OutputCollector[_,_]](taps.toSeq : _*)
+ val tap = createHdfsReadTap(hdfsMode)
tap.openForRead(fp)
}
}
@@ -276,27 +322,50 @@ case class Osv(p : String, f : Fields = Fields.ALL) extends FixedPathSource(p)
object TimePathedSource {
val YEAR_MONTH_DAY = "/%1$tY/%1$tm/%1$td"
- val YEAR_MONTH_DAY_HOUR = YEAR_MONTH_DAY + "/%1$tH"
-}
-abstract class TimePathedSource(pattern : String, dateRange : DateRange, duration : Duration, tz : TimeZone) extends Source {
- def hdfsPaths = {
- dateRange.each(duration)(tz)
- .map { (dr : DateRange) => String.format(pattern, dr.start.toCalendar(tz)) }
- }
-
- def localPath = pattern
+ val YEAR_MONTH_DAY_HOUR = "/%1$tY/%1$tm/%1$td/%1$tH"
}
-abstract class CalendarPathedSource(pattern : String, dateRange : DateRange, duration : CalendarDuration, tz : TimeZone) extends Source {
- def hdfsPaths = {
- dateRange.each(duration)(tz)
- .map { (dr : DateRange) => String.format(pattern, dr.start.toCalendar(tz)) }
+/**
+ * This will automatically produce a globbed version of the given path.
+ * THIS MEANS YOU MUST END WITH A / followed by * to match a file
+ * For writing, we write to the directory specified by the END time.
+ */
+abstract class TimePathedSource(pattern : String, dateRange : DateRange, tz : TimeZone) extends Source {
+ val glober = Globifier(pattern)(tz)
+ override def hdfsPaths = glober.globify(dateRange)
+ //Write to the path defined by the end time:
+ override def hdfsWritePath = {
+ val lastSlashPos = pattern.lastIndexOf('/')
+ assert(lastSlashPos >= 0, "/ not found in: " + pattern)
+ val stripped = pattern.slice(0,lastSlashPos)
+ String.format(stripped, dateRange.end.toCalendar(tz))
+ }
+ override def localPath = pattern
+
+ // Override because we want to check UNGLOBIFIED paths that each are present.
+ override def hdfsReadPathsAreGood(conf : Configuration) : Boolean = {
+ List("%1$tH" -> Hours(1), "%1$td" -> Days(1)(tz),
+ "%1$tm" -> Months(1)(tz), "%1$tY" -> Years(1)(tz))
+ .find { unitDur : (String,Duration) => pattern.contains(unitDur._1) }
+ .map { unitDur =>
+ // This method is exhaustive, but too expensive for Cascading's JobConf writing.
+ dateRange.each(unitDur._2)
+ .map { dr : DateRange =>
+ val path = String.format(pattern, dr.start.toCalendar(tz))
+ val good = pathIsGood(path, conf)
+ if (!good) {
+ System.err.println("[ERROR] Path: " + path + " is missing in: " + toString)
+ }
+ //return
+ good
+ }
+ //All should be true
+ .forall { x => x }
+ }
+ .getOrElse(false)
}
-
- def localPath = pattern
}
-
case class TextLine(p : String) extends FixedPathSource(p) with TextLineScheme
case class SequenceFile(p : String, f : Fields = Fields.ALL) extends FixedPathSource(p) with SequenceFileScheme
View
31 src/main/scala/com/twitter/scalding/Tool.scala
@@ -31,31 +31,32 @@ class Tool extends hadoop.conf.Configured with hadoop.util.Tool {
}
val mode = remainingArgs(1)
+ val jobName = remainingArgs(0)
+ val firstargs = Args(remainingArgs.tail.tail)
+ //
+ val strictSources = firstargs.boolean("tool.partialok") == false
+ if (!strictSources) {
+ println("[Scalding:INFO] using --sources.partialok. Missing log data won't cause errors.")
+ }
Mode.mode = mode match {
- case "--local" => Local()
- case "--hdfs" => Hdfs(config)
+ case "--local" => Local(strictSources)
+ case "--hdfs" => Hdfs(strictSources, config)
case _ => {
- System.err.println("Mode must be one of --local or --hdfs")
+ System.err.println("[ERROR] Mode must be one of --local or --hdfs")
return 1
}
}
- val jobName = remainingArgs(0)
- val firstargs = Args(remainingArgs.tail.tail)
- val firstjob = Class.forName(jobName).
- getConstructor(classOf[Args]).
- newInstance(firstargs).
- asInstanceOf[Job]
+ val onlyPrintGraph = firstargs.boolean("tool.graph")
+ if (onlyPrintGraph) {
+ println("Only printing the job graph, NOT executing. Run without --tool.graph to execute the job")
+ }
/*
* This is a tail recursive loop that runs all the
* jobs spawned from this one
*/
- val onlyPrintGraph = firstargs.boolean("graph")
- if (onlyPrintGraph) {
- println("Only printing the job graph, NOT executing. Run without --graph to execute the job")
- }
@tailrec
def start(j : Job, cnt : Int) {
val successful = if (onlyPrintGraph) {
@@ -81,10 +82,12 @@ class Tool extends hadoop.conf.Configured with hadoop.util.Tool {
case Some(nextj) => start(nextj, cnt + 1)
case None => Unit
}
+ } else {
+ throw new RuntimeException("Job failed to run: " + jobName)
}
}
//start a counter to see how deep we recurse:
- start(firstjob, 0)
+ start(Job(jobName, firstargs), 0)
return 0
}
}
View
5 src/main/scala/com/twitter/scalding/TupleBase.scala
@@ -19,8 +19,7 @@ import cascading.tuple.Fields
import cascading.tuple.Tuple
import cascading.tuple.TupleEntry
-// TupleGetter[String] <: TupleGetter[Any], so this is covariant
-abstract class TupleGetter[+T] extends java.io.Serializable {
+abstract class TupleGetter[T] extends java.io.Serializable {
def get(tup : Tuple, i : Int) : T
}
@@ -56,7 +55,7 @@ abstract class TupleSetter[-T] extends java.io.Serializable with TupleArity {
def apply(arg : T) : Tuple
}
-abstract class TupleConverter[+T] extends java.io.Serializable with TupleArity {
+abstract class TupleConverter[T] extends java.io.Serializable with TupleArity {
def get(te : TupleEntry) : T = apply(te.getTuple)
def apply(tup : Tuple) : T
}
View
4 src/main/scala/com/twitter/scalding/TupleConversions.scala
@@ -81,10 +81,6 @@ trait TupleConversions extends GeneratedConversions {
override def get(tup : CTuple, i : Int) = tup.getString(i)
}
- implicit object AnyRefGetter extends TupleGetter[AnyRef] {
- override def get(tup : CTuple, i : Int) = tup.getObject(i)
- }
-
//This is here for handling functions that return cascading tuples:
implicit object CascadingTupleSetter extends TupleSetter[CTuple] {
override def apply(arg : CTuple) = arg
View
88 src/test/scala/com/twitter/scalding/CoreTest.scala
@@ -90,6 +90,43 @@ class JoinTest extends Specification with TupleConversions {
}
}
+class TinyJoinJob(args: Args) extends Job(args) {
+ val p1 = Tsv(args("input1"))
+ .read
+ .mapTo((0, 1) -> ('k1, 'v1)) { v : (String, Int) => v }
+ val p2 = Tsv(args("input2"))
+ .read
+ .mapTo((0, 1) -> ('k2, 'v2)) { v : (String, Int) => v }
+ p1.joinWithTiny('k1 -> 'k2, p2)
+ .project('k1, 'v1, 'v2)
+ .write( Tsv(args("output")) )
+}
+
+class TinyJoinTest extends Specification with TupleConversions {
+ "A JoinJob" should {
+ val input1 = List("a" -> 1, "b" -> 2, "c" -> 3)
+ val input2 = List("b" -> -1, "c" -> 5, "d" -> 4)
+ val correctOutput = Map("b" -> (2, -1), "c" -> (3, 5))
+
+ JobTest("com.twitter.scalding.JoinJob")
+ .arg("input1", "fakeInput1")
+ .arg("input2", "fakeInput2")
+ .arg("output", "fakeOutput")
+ .source(Tsv("fakeInput1"), input1)
+ .source(Tsv("fakeInput2"), input2)
+ .sink[(String,Int,Int)](Tsv("fakeOutput")) { outBuf =>
+ val actualOutput = outBuf.map {
+ case (k : String, v1 : Int, v2 : Int) =>
+ (k,(v1, v2))
+ }.toMap
+ "join tuples with the same key" in {
+ correctOutput must be_==(actualOutput)
+ }
+ }
+ .run
+ }
+}
+
class MergeTestJob(args : Args) extends Job(args) {
val in = TextLine(args("in")).read.mapTo(1->('x,'y)) { line : String =>
val p = line.split(" ").map { _.toDouble }
@@ -313,6 +350,13 @@ class ToListJob(args : Args) extends Job(args) {
.write(Tsv(args("out")))
}
+class NullListJob(args : Args) extends Job(args) {
+ TextLine(args("in")).read
+ .groupBy('num){ _.toList[String]('line -> 'lineList) }
+ .map('lineList -> 'lineList) { ll : List[String] => ll.mkString(" ") }
+ .write(Tsv(args("out")))
+}
+
class ToListTest extends Specification with TupleConversions {
"A ToListJob" should {
JobTest("com.twitter.scalding.ToListJob")
@@ -330,6 +374,22 @@ class ToListTest extends Specification with TupleConversions {
}
}.run
}
+
+ "A NullListJob" should {
+ JobTest("com.twitter.scalding.NullListJob")
+ .arg("in","fakeIn")
+ .arg("out","fakeOut")
+ .source(TextLine("fakeIn"), List("0" -> null, "0" -> "a", "0" -> null, "0" -> "b"))
+ .sink[(Int,String)](Tsv("fakeOut")) { outBuf =>
+ "must have the right number of lines" in {
+ outBuf.size must_== 1
+ }
+ "must return an empty list for null key" in {
+ val sSet = outBuf(0)._2.split(" ").toSet
+ sSet must_== Set("a", "b")
+ }
+ }.run
+ }
}
class CrossJob(args : Args) extends Job(args) {
@@ -383,3 +443,31 @@ class TopKTest extends Specification with TupleConversions {
.run
}
}
+
+class TakeJob(args : Args) extends Job(args) {
+ val input = Tsv("in").read
+ .mapTo((0,1,2) -> ('x,'y,'z)) { tup : (Int,Int,Int) => tup }
+
+ input.groupBy('x) { _.take(2) }.write(Tsv("out2"))
+ input.groupAll.write(Tsv("outall"))
+}
+
+class TakeTest extends Specification with TupleConversions {
+ "A TakeJob" should {
+ JobTest("com.twitter.scalding.TakeJob")
+ .source(Tsv("in"), List((3,0,1),(3,1,10),(3,5,100)) )
+ .sink[(Int,Int,Int)](Tsv("outall")) { outBuf => ()
+ "groupAll must see everything in same order" in {
+ outBuf.size must_==3
+ outBuf.toList must be_== (List((3,0,1),(3,1,10),(3,5,100)))
+ }
+ }
+ .sink[(Int,Int,Int)](Tsv("out2")) { outBuf =>
+ "take(2) must only get 2" in {
+ outBuf.size must_==2
+ outBuf.toList must be_== (List((3,0,1),(3,1,10)))
+ }
+ }
+ .run
+ }
+}
View
135 src/test/scala/com/twitter/scalding/DateTest.scala
@@ -84,11 +84,11 @@ class DateTest extends Specification {
}
"know the most recent time units" in {
//10-25 is a Tuesday, earliest in week is a monday
- "2011-10-25".earliestInWeek must_==(stringToRichDate("2011-10-24"))
- "2011-10-25 10:01".earliestInDay must_==(stringToRichDate("2011-10-25 00:00"))
+ Weeks(1).floorOf("2011-10-25") must_==(stringToRichDate("2011-10-24"))
+ Days(1).floorOf("2011-10-25 10:01") must_==(stringToRichDate("2011-10-25 00:00"))
//Leaving off the time should give the same result:
- "2011-10-25 10:01".earliestInDay must_==(stringToRichDate("2011-10-25"))
- "2011-10-25 10:01".earliestInHour must_==(stringToRichDate("2011-10-25 10:00"))
+ Days(1).floorOf("2011-10-25 10:01") must_==(stringToRichDate("2011-10-25"))
+ Hours(1).floorOf("2011-10-25 10:01") must_==(stringToRichDate("2011-10-25 10:00"))
}
"correctly do arithmetic" in {
val d1 : RichDate = "2011-10-24"
@@ -108,14 +108,11 @@ class DateTest extends Specification {
def rangeContainTest(d1 : DateRange, dur : Duration) = {
d1.each(dur).forall( (d1r : DateRange) => d1.contains(d1r) ) must beTrue
}
- def rangeContainTestCal(d1 : DateRange, dur : CalendarDuration) = {
- d1.each(dur).forall( (d1r : DateRange) => d1.contains(d1r) ) must beTrue
- }
- rangeContainTestCal(DateRange("2010-10-01", "2010-10-13"), Weeks(1))
- rangeContainTestCal(DateRange("2010-10-01", "2010-10-13"), Weeks(2))
- rangeContainTestCal(DateRange("2010-10-01", "2010-10-13"), Days(1))
+ rangeContainTest(DateRange("2010-10-01", "2010-10-13"), Weeks(1))
+ rangeContainTest(DateRange("2010-10-01", "2010-10-13"), Weeks(2))
+ rangeContainTest(DateRange("2010-10-01", "2010-10-13"), Days(1))
//Prime non one:
- rangeContainTestCal(DateRange("2010-10-01", "2010-10-13"), Days(5))
+ rangeContainTest(DateRange("2010-10-01", "2010-10-13"), Days(5))
//Prime number of Minutes
rangeContainTest(DateRange("2010-10-01", "2010-10-13"), Minutes(13))
rangeContainTest(DateRange("2010-10-01", "2010-10-13"), Hours(13))
@@ -135,25 +132,113 @@ class DateTest extends Specification {
da.isBefore(db.start) && db.isAfter(da.end) && ((da.end + Millisecs(1)) == db.start)
} must beTrue
}
- def eachIsDisjointCal(d : DateRange, dur : CalendarDuration) {
- val dl = d.each(dur)
- dl.zip(dl.tail).forall { case (da, db) =>
- da.isBefore(db.start) && db.isAfter(da.end) && (da.end + Millisecs(1)).equals(db.start)
- } must beTrue
- }
- eachIsDisjointCal(DateRange("2010-10-01", "2010-10-03"), Days(1))
- eachIsDisjointCal(DateRange("2010-10-01", "2010-10-03"), Weeks(1))
+ eachIsDisjoint(DateRange("2010-10-01", "2010-10-03"), Days(1))
+ eachIsDisjoint(DateRange("2010-10-01", "2010-10-03"), Weeks(1))
+ eachIsDisjoint(DateRange("2010-10-01", "2011-10-03"), Weeks(1))
+ eachIsDisjoint(DateRange("2010-10-01", "2010-10-03"), Months(1))
+ eachIsDisjoint(DateRange("2010-10-01", "2011-10-03"), Months(1))
eachIsDisjoint(DateRange("2010-10-01", "2010-10-03"), Hours(1))
eachIsDisjoint(DateRange("2010-10-01", "2010-10-03"), Hours(2))
eachIsDisjoint(DateRange("2010-10-01", "2010-10-03"), Minutes(1))
}
}
"Time units" should {
- def isSame(d1 : Duration, d2 : Duration) = (d1.toMillisecs == d2.toMillisecs)
- def isSameC(d1 : CalendarDuration, d2 : CalendarDuration) = (d1.toDays == d2.toDays)
- "1000 milliseconds in a sec" in { isSame(Millisecs(1000), Seconds(1)) must beTrue }
- "60 seconds in a minute" in { isSame(Seconds(60), Minutes(1)) must beTrue }
- "60 minutes in a hour" in { isSame(Minutes(60),Hours(1)) must beTrue }
- "7 days in a week" in { isSameC(Days(7), Weeks(1)) must beTrue }
+ def isSame(d1 : Duration, d2 : Duration) = {
+ (RichDate("2011-12-01") + d1) == (RichDate("2011-12-01") + d2)
+ }
+ "have 1000 milliseconds in a sec" in {
+ isSame(Millisecs(1000), Seconds(1)) must beTrue
+ Seconds(1).toMillisecs must_== 1000L
+ Millisecs(1000).toSeconds must_== 1.0
+ Seconds(2).toMillisecs must_== 2000L
+ Millisecs(2000).toSeconds must_== 2.0
+ }
+ "have 60 seconds in a minute" in {
+ isSame(Seconds(60), Minutes(1)) must beTrue
+ Minutes(1).toSeconds must_== 60.0
+ Minutes(1).toMillisecs must_== 60 * 1000L
+ Minutes(2).toSeconds must_== 120.0
+ Minutes(2).toMillisecs must_== 120 * 1000L
+ }
+ "have 60 minutes in a hour" in {
+ isSame(Minutes(60),Hours(1)) must beTrue
+ Hours(1).toSeconds must_== 60.0 * 60.0
+ Hours(1).toMillisecs must_== 60 * 60 * 1000L
+ Hours(2).toSeconds must_== 2 * 60.0 * 60.0
+ Hours(2).toMillisecs must_== 2 * 60 * 60 * 1000L
+ }
+ "have 7 days in a week" in { isSame(Days(7), Weeks(1)) must beTrue }
+ }
+ "AbsoluteDurations" should {
+ "behave as comparable" in {
+ (Hours(5) >= Hours(2)) must beTrue
+ (Minutes(60) >= Minutes(60)) must beTrue
+ (Hours(1) < Millisecs(3600001)) must beTrue
+ }
+ "add properly" in {
+ (Hours(2) + Hours(1)).compare(Hours(3)) must_== 0
+ }
+ "have a well behaved max function" in {
+ AbsoluteDuration.max(Hours(1), Hours(2)).compare(Hours(2)) must_== 0
+ }
+ }
+ "Globifiers" should {
+ "handle specific hand crafted examples" in {
+ val t1 = Globifier("/%1$tY/%1$tm/%1$td/%1$tH")
+ val t2 = Globifier("/%1$tY/%1$tm/%1$td/")
+
+ val testcases =
+ (t1.globify(DateRange("2011-12-01T14", "2011-12-04")),
+ List("/2011/12/01/14","/2011/12/01/15","/2011/12/01/16","/2011/12/01/17","/2011/12/01/18",
+ "/2011/12/01/19","/2011/12/01/20", "/2011/12/01/21","/2011/12/01/22","/2011/12/01/23",
+ "/2011/12/02/*","/2011/12/03/*","/2011/12/04/00")) ::
+ (t1.globify(DateRange("2011-12-01", "2011-12-01T23:59")),
+ List("/2011/12/01/*")) ::
+ (t1.globify(DateRange("2011-12-01T12", "2011-12-01T12:59")),
+ List("/2011/12/01/12")) ::
+ (t1.globify(DateRange("2011-12-01T12", "2011-12-01T14")),
+ List("/2011/12/01/12","/2011/12/01/13","/2011/12/01/14")) ::
+ (t2.globify(DateRange("2011-12-01T14", "2011-12-04")),
+ List("/2011/12/01/","/2011/12/02/","/2011/12/03/","/2011/12/04/")) ::
+ (t2.globify(DateRange("2011-12-01", "2011-12-01T23:59")),
+ List("/2011/12/01/")) ::
+ (t2.globify(DateRange("2011-12-01T12", "2011-12-01T12:59")),
+ List("/2011/12/01/")) ::
+ (t2.globify(DateRange("2011-12-01T12", "2012-01-02T14")),
+ List("/2011/12/*/","/2012/01/01/","/2012/01/02/")) ::
+ (t2.globify(DateRange("2011-11-01T12", "2011-12-02T14")),
+ List("/2011/11/*/","/2011/12/01/","/2011/12/02/")) ::
+ Nil
+
+ testcases.foreach { tup =>
+ tup._1 must_== tup._2
+ }
+ }
+ def eachElementDistinct(dates : List[String]) = dates.size == dates.toSet.size
+ def globMatchesDate(glob : String)(date : String) = {
+ java.util.regex.Pattern.matches(glob.replaceAll("\\*","[0-9]*"), date)
+ }
+ def bruteForce(pattern : String, dr : DateRange, dur : Duration)(implicit tz : java.util.TimeZone) = {
+ dr.each(dur)
+ .map { (dr : DateRange) => String.format(pattern, dr.start.toCalendar(tz)) }
+ }
+
+ "handle random test cases" in {
+ val pattern = "/%1$tY/%1$tm/%1$td/%1$tH"
+ val t1 = Globifier(pattern)
+
+ val r = new java.util.Random()
+ (0 until 100) foreach { step =>
+ val start = RichDate("2011-08-03").value.getTime + r.nextInt(Int.MaxValue)
+ val dr = DateRange(start, start + r.nextInt(Int.MaxValue))
+ val splits = bruteForce(pattern, dr, Hours(1))
+ val globed = t1.globify(dr)
+
+ eachElementDistinct(globed) must beTrue
+ //See that each path is matched by exactly one glob:
+ splits.map { path => globed.filter { globMatchesDate(_)(path) }.size }
+ .forall { _ == 1 } must beTrue
+ }
+ }
}
}
View
19 src/test/scala/com/twitter/scalding/JobTest.scala
@@ -7,13 +7,9 @@ import cascading.tuple.Tuple
import cascading.tuple.TupleEntry
object JobTest {
- def apply(jobName : String) = new JobTest(jobName)
+ def apply(jobName : String) = new JobTest(jobName)
}
-/**
-* mixes in TupleConversions only to get productToTuple,
-* none of the implicits there are resolved here
-*/
class JobTest(jobName : String) extends TupleConversions {
private var argsMap = Map[String, List[String]]()
private val callbacks = Buffer[() => Unit]()
@@ -44,11 +40,7 @@ class JobTest(jobName : String) extends TupleConversions {
def run {
Mode.mode = Test(sourceMap)
-
- runAll(Class.forName(jobName).
- getConstructor(classOf[Args]).
- newInstance(new Args(argsMap)).
- asInstanceOf[Job])
+ runAll(Job(jobName, new Args(argsMap)))
}
@tailrec
@@ -62,4 +54,11 @@ class JobTest(jobName : String) extends TupleConversions {
}
}
}
+
+ def runWithoutNext {
+ Mode.mode = Test(sourceMap)
+ Job(jobName, new Args(argsMap)).buildFlow.complete
+ callbacks.foreach { cb => cb() }
+ }
+
}
View
57 src/test/scala/com/twitter/scalding/KryoTest.scala
@@ -16,47 +16,9 @@ class KryoTest extends Specification {
noDetailedDiffs() //Fixes issue for scala 2.9
- def roundTrip(in : List[Int]) = {
- val outs = new BOS
- in.foreach { i => KryoHadoopSerialization.writeSize(outs, i) }
- val ins = new BIS(outs.toByteArray)
- //Read each back in and return it:
- in.map { (_) => KryoHadoopSerialization.readSize(ins) }
- }
- def rtEnds(upper : Int) {
- //Test the end points:
- val ends = List(0,1,2,upper - 2,upper - 1)
- roundTrip(ends) must be_==(ends)
- }
- def rtRands(upper : Int) {
- val r = new java.util.Random
- val rands = (1 to 20).map { (_) => r.nextInt(upper) }.toList
- roundTrip(rands) must be_==(rands)
- }
-
- "KryoHadoopSerialization" should {
- "read and write sizes near end points" in {
- rtEnds(0xff)
- rtEnds(0xffff)
- rtEnds(Int.MaxValue)
- }
- "read and write sizes of random values" in {
- rtRands(0xff)
- rtRands(0xffff)
- rtRands(Int.MaxValue)
- }
- /*
- Something about this test triggers a compiler bug
- "return serializers/deserializers/comparators" in {
- val khs = new KryoHadoopSerialization
- khs.getSerializer(null) must notBeNull
- khs.getDeserializer(null) must notBeNull
- khs.getComparator(null) must notBeNull
- }*/
- }
-
def serObj[T <: AnyRef](in : T) = {
- val khs = new KryoHadoopSerialization[T]
+ val khs = new KryoHadoopSerialization
+ khs.accept(in.getClass)
val ks = khs.getSerializer(in.getClass.asInstanceOf[Class[T]])
val out = new BOS
ks.open(out)
@@ -66,14 +28,15 @@ class KryoTest extends Specification {
}
def deserObj[T <: AnyRef](cls : Class[_], input : Array[Byte]) : T = {
- val khs = new KryoHadoopSerialization[T]
+ val khs = new KryoHadoopSerialization
+ khs.accept(cls)
val ks = khs.getDeserializer(cls.asInstanceOf[Class[T]])
val in = new BIS(input)
ks.open(in)
val fakeInputHadoopNeeds = null
val res = ks.deserialize(fakeInputHadoopNeeds.asInstanceOf[T])
ks.close
- res
+ res.asInstanceOf[T]
}
def singleRT[T <: AnyRef](in : T) : T = {
deserObj[T](in.getClass, serObj(in))
@@ -118,6 +81,16 @@ class KryoTest extends Specification {
//Together in a list:
singleRT(test) must be_==(test)
}
+ "handle Date, RichDate and DateRange" in {
+ import DateOps._
+ implicit val tz = PACIFIC
+ val myDate : RichDate = "1999-12-30T14"
+ val simpleDate : java.util.Date = myDate.value
+ val myDateRange = DateRange("2012-01-02", "2012-06-09")
+ singleRT(myDate) must be_==(myDate)
+ singleRT(simpleDate) must be_==(simpleDate)
+ singleRT(myDateRange) must be_==(myDateRange)
+ }
"Serialize a giant list" in {
val bigList = (1 to 100000).toList
val list2 = deserObj[List[Int]](bigList.getClass, serObj(bigList))
Please sign in to comment.
Something went wrong with that request. Please try again.