Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Commit

Permalink
Add some Execution support in scalding (#674)
Browse files Browse the repository at this point in the history
* Add some Execution support in scalding

* Add more debugging

* Make it work with 0.16.1-RC3

* don't capture an unused variable
  • Loading branch information
johnynek committed Jul 20, 2016
1 parent 424f2b6 commit 39e3850
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 18 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Expand Up @@ -28,7 +28,7 @@ val log4jVersion = "1.2.16"
val novocodeJunitVersion = "0.10" val novocodeJunitVersion = "0.10"
val scalaCheckVersion = "1.12.2" val scalaCheckVersion = "1.12.2"
val scalatestVersion = "2.2.4" val scalatestVersion = "2.2.4"
val scaldingVersion = "0.16.0-RC3" val scaldingVersion = "0.16.1-RC3"
val slf4jVersion = "1.6.6" val slf4jVersion = "1.6.6"
val storehausVersion = "0.15.0-RC1" val storehausVersion = "0.15.0-RC1"
val stormDep = "storm" % "storm" % "0.9.0-wip15" //This project also compiles with the latest storm, which is in fact required to run the example val stormDep = "storm" % "storm" % "0.9.0-wip15" //This project also compiles with the latest storm, which is in fact required to run the example
Expand Down
Expand Up @@ -16,14 +16,14 @@


package com.twitter.summingbird.scalding package com.twitter.summingbird.scalding


import com.twitter.scalding.{ Source => ScaldingSource, Test => TestMode, _ }

import cascading.tuple.{ Tuple, Fields, TupleEntry }
import cascading.tap.Tap
import cascading.scheme.NullScheme import cascading.scheme.NullScheme
import cascading.tap.Tap
import cascading.tuple.{ Tuple, Fields, TupleEntry }
import com.twitter.scalding.{ Source => ScaldingSource, Test => TestMode, _ }
import java.io.{ InputStream, OutputStream }
import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.OutputCollector import org.apache.hadoop.mapred.OutputCollector
import org.apache.hadoop.mapred.RecordReader


class MockMappable[T](val id: String)(implicit tconv: TupleConverter[T]) class MockMappable[T](val id: String)(implicit tconv: TupleConverter[T])
extends ScaldingSource with Mappable[T] { extends ScaldingSource with Mappable[T] {
Expand All @@ -36,5 +36,22 @@ class MockMappable[T](val id: String)(implicit tconv: TupleConverter[T])
override def hashCode = id.hashCode override def hashCode = id.hashCode


override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] =
TestTapFactory(this, new NullScheme[JobConf, RecordReader[_, _], OutputCollector[_, _], T, T](Fields.ALL, Fields.ALL)).createTap(readOrWrite) (readOrWrite, mode) match {
} case (Write, TestMode(buffers)) =>
/*
* We copy this code from scalding because scalding ALWAYS erases the target
* whether or not a job is run. This is not what HDFS would do, and it violates
* the assumption that planning is a pure function (running of course is not)
*/
require(
buffers(this).isDefined,
TestTapFactory.sourceNotFoundError.format(this))

new MemoryTap[InputStream, OutputStream](
new NullScheme(Fields.ALL, Fields.ALL),
buffers(this).get)

case _ =>
TestTapFactory(this, new NullScheme[JobConf, RecordReader[_, _], OutputCollector[_, _], T, T](Fields.ALL, Fields.ALL)).createTap(readOrWrite)
}
}
Expand Up @@ -41,6 +41,12 @@ object TestUtil {
println("producer extra keys: " + (produced.keySet -- inMemory.keySet)) println("producer extra keys: " + (produced.keySet -- inMemory.keySet))
println("producer missing keys: " + (inMemory.keySet -- produced.keySet)) println("producer missing keys: " + (inMemory.keySet -- produced.keySet))
println("Difference: " + diffMap) println("Difference: " + diffMap)
if (produced.isEmpty) println("produced isEmpty")
val correct = (inMemory.keySet ++ produced.keySet).foldLeft(produced) { (m, k) =>
if (inMemory.get(k) == produced.get(k)) m - k
else m
}
println("correct keys: " + correct)
} }
!wrong !wrong
} }
Expand All @@ -59,6 +65,12 @@ object TestUtil {
println("written batches: " + testStore.writtenBatches) println("written batches: " + testStore.writtenBatches)
println("earliest unwritten time: " + testStore.batcher.earliestTimeOf(testStore.writtenBatches.max.next)) println("earliest unwritten time: " + testStore.batcher.earliestTimeOf(testStore.writtenBatches.max.next))
println("Difference: " + diffMap) println("Difference: " + diffMap)
if (produced.isEmpty) println("produced isEmpty")
val correct = (inMemory.keySet ++ produced.keySet).foldLeft(produced) { (m, k) =>
if (inMemory.get(k) == produced.get(k)) m - k
else m
}
println("correct keys: " + correct)
} }
!wrong !wrong
} }
Expand Down
Expand Up @@ -97,6 +97,39 @@ class ScaldingLaws extends WordSpec {


assert(TestUtil.compareMaps(original, Monoid.plus(initStore, inMemory), testStore) == true) assert(TestUtil.compareMaps(original, Monoid.plus(initStore, inMemory), testStore) == true)
} }
"match scala for single step jobs using Execution" in {
val original = sample[List[Int]]
val fn = sample[(Int) => List[(Int, Int)]]

// Add a time:
val inWithTime = original.zipWithIndex.map { case (item, time) => (time.toLong, item) }

// get time interval for the input
val intr = TestUtil.toTimeInterval(0L, original.size.toLong)

val batcher = TestUtil.randomBatcher(inWithTime)
val batchCoveredInput = TestUtil.pruneToBatchCovered(inWithTime, intr, batcher)

val inMemory = TestGraphs.singleStepInScala(batchCoveredInput)(fn)

val initStore = sample[Map[Int, Int]]
val testStore = TestStore[Int, Int]("test", batcher, initStore, inWithTime.size)
val (buffer, source) = TestSource(inWithTime)

val summer = TestGraphs.singleStepJob[Scalding, (Long, Int), Int, Int](source, testStore)(t =>
fn(t._2))

val ex = Scalding.toExecutionExact(
Scalding.dateRangeInjection.invert(intr).get,
summer).unit

val mode: Mode = TestMode(t => (testStore.sourceToBuffer ++ buffer).get(t))
ex.waitFor(Config.default, mode).get

// Now check that the inMemory ==

assert(TestUtil.compareMaps(original, Monoid.plus(initStore, inMemory), testStore, "execution") == true)
}


"match scala single step pruned jobs" in { "match scala single step pruned jobs" in {
val original = sample[List[Int]] val original = sample[List[Int]]
Expand Down
Expand Up @@ -58,6 +58,8 @@ import scala.util.control.NonFatal
object Scalding { object Scalding {
@transient private val logger = LoggerFactory.getLogger(classOf[Scalding]) @transient private val logger = LoggerFactory.getLogger(classOf[Scalding])


case class PlanningException(errorMessages: List[String]) extends Exception(errorMessages.toString)

def apply(jobName: String, options: Map[String, Options] = Map.empty) = { def apply(jobName: String, options: Map[String, Options] = Map.empty) = {
new Scalding(jobName, options, identity, List()) new Scalding(jobName, options, identity, List())
} }
Expand Down Expand Up @@ -285,7 +287,7 @@ object Scalding {
default default
case Some((id, opt)) => case Some((id, opt)) =>
logger.info( logger.info(
s"Producer (${producer.getClass.getName}) Using $opt found via NamedProducer ${'"'}$id${'"'}") s"""Producer (${producer.getClass.getName}) Using $opt found via NamedProducer "$id" """)
opt opt
} }


Expand Down Expand Up @@ -563,6 +565,44 @@ object Scalding {
planProducer(options, prod) planProducer(options, prod)
} }


/**
* like toPipeExact, but using Execution
*/
def toExecutionExact[T](
dr: DateRange,
prod: Producer[Scalding, T],
options: Map[String, Options] = Map.empty): Execution[TypedPipe[(Timestamp, T)]] =

Execution.getMode.flatMap { mode =>
val fd = new FlowDef
toPipeExact(dr, prod, options)(fd, mode) match {
case Right(_) =>
/**
* We plan twice. The first time we plan to see what new
* writes we do, the second time we only read, we do not
* do any new writes.
*/
Execution.fromFn { (_, _) => fd }
.flatMap { _ =>
// Now plan again and use summingbird's built in support
// to minimize work by reading existing on-disk data:
val reads = new FlowDef
toPipeExact(dr, prod, options)(reads, mode) match {
case Right(pipe) =>
/*
* Note that we discard "reads", any writes there
* will not be performed
*/
Execution.from(pipe)
case Left(msgs) =>
//Left should never happen, since we planned before.
Execution.failed(PlanningException(msgs))
}
}
case Left(msgs) => Execution.failed(PlanningException(msgs))
}
}

/** /**
* Use this method to interop with existing scalding code * Use this method to interop with existing scalding code
* Note this may return a smaller DateRange than you ask for * Note this may return a smaller DateRange than you ask for
Expand Down Expand Up @@ -659,10 +699,12 @@ class Scalding(
def withConfigUpdater(fn: Config => Config) = def withConfigUpdater(fn: Config => Config) =
new Scalding(jobName, options, transformConfig.andThen(fn), passedRegistrars) new Scalding(jobName, options, transformConfig.andThen(fn), passedRegistrars)


def configProvider(hConf: Configuration): Config = { def configProvider(hConf: Configuration): Config =
withKryo(Config.hadoopWithDefaults(hConf))

private def withKryo(conf: Config): Config = {
import com.twitter.scalding._ import com.twitter.scalding._
import com.twitter.chill.config.ScalaMapConfig import com.twitter.chill.config.ScalaMapConfig
val conf = Config.hadoopWithDefaults(hConf)


if (passedRegistrars.isEmpty) { if (passedRegistrars.isEmpty) {
conf.setSerialization(Right(classOf[serialization.KryoHadoop])) conf.setSerialization(Right(classOf[serialization.KryoHadoop]))
Expand All @@ -681,17 +723,18 @@ class Scalding(
} }


final def buildConfig(hConf: Configuration): Config = { final def buildConfig(hConf: Configuration): Config = {
val config = transformConfig(configProvider(hConf)) val postConfig = finalTransform(Config.hadoopWithDefaults(hConf))

// Store the options used:
val postConfig = config.+("summingbird.options" -> options.toString)
.+("summingbird.jobname" -> jobName)
.+("summingbird.submitted.timestamp" -> System.currentTimeMillis.toString)

postConfig.toMap.foreach { case (k, v) => hConf.set(k, v) } postConfig.toMap.foreach { case (k, v) => hConf.set(k, v) }
postConfig postConfig
} }


private def finalTransform(c: Config): Config =
transformConfig(withKryo(c))
// Store the options used:
.+("summingbird.options" -> options.toString)
.+("summingbird.jobname" -> jobName)
.+("summingbird.submitted.timestamp" -> System.currentTimeMillis.toString)

// This is a side-effect-free computation that is called by run // This is a side-effect-free computation that is called by run
def toFlow(config: Config, timeSpan: Interval[Timestamp], mode: Mode, pf: PipeFactory[_]): Try[(Interval[Timestamp], Option[Flow[_]])] = { def toFlow(config: Config, timeSpan: Interval[Timestamp], mode: Mode, pf: PipeFactory[_]): Try[(Interval[Timestamp], Option[Flow[_]])] = {
val flowDef = new FlowDef val flowDef = new FlowDef
Expand Down

0 comments on commit 39e3850

Please sign in to comment.