Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove FlowState and use a proxy pattern for that state #1001

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 3 additions & 30 deletions scalding-core/src/main/scala/com/twitter/scalding/Execution.scala
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,7 @@ object Execution {
def runStats(conf: Config, mode: Mode)(implicit cec: ConcurrentExecutionContext) = {
for {
(flowDef, fn) <- toFuture(Try(result(conf, mode)))
_ = FlowStateMap.validateSources(flowDef, mode)
jobStats <- ExecutionContext.newContext(conf)(flowDef, mode).run
_ = FlowStateMap.clear(flowDef)
t <- fn(jobStats)
} yield (t, ExecutionCounters.fromJobStats(jobStats))
}
Expand Down Expand Up @@ -325,14 +323,9 @@ object Execution {
@deprecated("Use Execution[T]", "2014-07-29")
def buildFlow[T](conf: Config, mode: Mode)(op: Reader[ExecutionContext, T]): (T, Try[Flow[_]]) = {
val ec = ExecutionContext.newContextEmpty(conf, mode)
try {
// This mutates the newFlowDef in ec
val resultT = op(ec)
(resultT, ec.buildFlow)
} finally {
// Make sure to clean up all state with flowDef
FlowStateMap.clear(ec.flowDef)
}
// This mutates the newFlowDef in ec
val resultT = op(ec)
(resultT, ec.buildFlow)
}

/*
Expand All @@ -357,26 +350,6 @@ object Execution {
// This is in Java because of the cascading API's raw types on FlowListener
FlowListenerPromise.start(flow, { f: Flow[C] => JobStats(f.getFlowStats) })

/*
* This is a low-level method that should be avoided if you are reading
* the docs rather than the source, and may be removed.
* You should be using Execution[T] to compose.
*
* If you want scalding to fail if the sources cannot be validated, then
* use this (highly recommended and the default for Execution[T])
*
* Alteratively, in your Reader, call Source.validateTaps(Mode) to
* control which sources individually need validation
* Suggested use:
* for {
* result <- job
* mightErr <- validateSources
* } yield mightErr.map(_ => result)
*/
@deprecated("Use Execution[T].run", "2014-07-29")
def validateSources: Reader[ExecutionContext, Try[Unit]] =
Reader { ec => Try(FlowStateMap.validateSources(ec.flowDef, ec.mode)) }

/*
* This is a low-level method that should be avoided if you are reading
* the docs rather than the source, and may be removed.
Expand Down
104 changes: 0 additions & 104 deletions scalding-core/src/main/scala/com/twitter/scalding/FlowState.scala

This file was deleted.

10 changes: 2 additions & 8 deletions scalding-core/src/main/scala/com/twitter/scalding/Job.scala
Original file line number Diff line number Diff line change
Expand Up @@ -232,16 +232,10 @@ class Job(val args: Args) extends FieldConversions with java.io.Serializable {
.get

// called before run
// only override if you do not use flowDef
def validate {
FlowStateMap.validateSources(flowDef, mode)
}
def validate {}

// called after successfull run
// only override if you do not use flowDef
def clear {
FlowStateMap.clear(flowDef)
}
def clear {}

protected def handleStats(statsData: CascadingStats) {
scaldingCascadingStats = Some(statsData)
Expand Down
6 changes: 6 additions & 0 deletions scalding-core/src/main/scala/com/twitter/scalding/Mode.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ object Mode {
else
throw ArgsException("[ERROR] Mode must be one of --local or --hdfs, you provided neither")
}

def shouldCallValidate(m: Mode): Boolean = m match {
case Hdfs(v, _) => v
case Local(v) => v
case _ => false
}
}

trait Mode extends java.io.Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,6 @@ class RichFlowDef(val fd: FlowDef) {
appendLeft(fd.getTails, o.getTails)

fd.mergeMiscFrom(o)
// Merge the FlowState
FlowStateMap.get(o)
.foreach { oFS =>
FlowStateMap.mutate(fd) { current =>
// overwrite the items from o with current
(FlowState(oFS.sourceMap ++ current.sourceMap), ())
}
}
}

/**
Expand Down Expand Up @@ -134,18 +126,6 @@ class RichFlowDef(val fd: FlowDef) {
if (sinks.containsKey(pipe.getName)) {
newFd.addTailSink(pipe, sinks.get(pipe.getName))
}
// Update the FlowState:
FlowStateMap.get(fd)
.foreach { thisFS =>
val subFlowState = thisFS.sourceMap
.foldLeft(Map[String, (Source, Pipe)]()) {
case (newfs, kv @ (name, (source, pipe))) =>
if (upipes(pipe)) newfs + kv
else newfs
}
FlowStateMap.mutate(newFd) { _ => (FlowState(subFlowState), ()) }
}
newFd
}

}
52 changes: 38 additions & 14 deletions scalding-core/src/main/scala/com/twitter/scalding/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ package com.twitter.scalding
import java.io.{ File, InputStream, OutputStream }
import java.util.{ TimeZone, Calendar, Map => JMap, Properties }

import cascading.flow.FlowDef
import cascading.flow.FlowProcess
import cascading.flow.{ Flow, FlowDef, FlowProcess }
import cascading.flow.hadoop.HadoopFlowProcess
import cascading.flow.local.LocalFlowProcess
import cascading.scheme.{ NullScheme, Scheme }
Expand Down Expand Up @@ -67,6 +66,20 @@ object CastHfsTap {
tap.asInstanceOf[Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]]]
}

/**
* This lets us store the Source in the FlowDef along with the Tap
* This is so we can call the validate method at the appropriate time
*/
case class SourceTap[C, I, O](@transient source: Source,
@transient mode: Mode,
@transient pipe: Pipe,
override val proxy: Tap[C, I, O]) extends ProxyTap[C, I, O, Tap[C, I, O]] {
override def flowConfInit(f: Flow[C]) {
source.validateTaps(mode)
proxy.flowConfInit(f)
}
}

/**
* Every source must have a correct toString method. If you use
* case classes for instances of sources, you will get this for free.
Expand All @@ -87,21 +100,32 @@ abstract class Source extends java.io.Serializable {
*/
def transformInTest: Boolean = false

/**
* This is the name used to refer to Taps created by this source
* in cascading. By default we use toString. This usually works
* fine.
*/
protected def sourceIdentifier: String = toString

def read(implicit flowDef: FlowDef, mode: Mode): Pipe = {
checkFlowDefNotNull

//workaround for a type erasure problem, this is a map of String -> Tap[_,_,_]
val sources = flowDef.getSources().asInstanceOf[JMap[String, Any]]
val srcName = this.toString
if (!sources.containsKey(srcName)) {
sources.put(srcName, createTap(Read)(mode))
}
FlowStateMap.mutate(flowDef) { st =>
val newPipe = (mode, transformInTest) match {
case (test: TestMode, false) => new Pipe(srcName)
case _ => transformForRead(new Pipe(srcName))
}
st.getReadPipe(this, newPipe)
val srcName = sourceIdentifier
sources.get(srcName) match {
case null =>
val newPipe = (mode, transformInTest) match {
case (test: TestMode, false) => new Pipe(srcName)
case _ => transformForRead(new Pipe(srcName))
}
val stap = SourceTap(this, mode, newPipe, createTap(Read)(mode))
sources.put(srcName, stap)
newPipe
case SourceTap(existingSrc, m, pipe, _) if (m == mode) && (existingSrc == this) => pipe
case stap @ SourceTap(existingSrc, m, p, _) =>
sys.error("Mismatch. called mode: %s, this: %s, but existing: %s".format(mode, this, stap))
case tap => sys.error("Unexpected Source: " + tap.toString)
}
}

Expand All @@ -112,9 +136,9 @@ abstract class Source extends java.io.Serializable {
def writeFrom(pipe: Pipe)(implicit flowDef: FlowDef, mode: Mode): Pipe = {
checkFlowDefNotNull

//insane workaround for scala compiler bug
//insane workaround for raw types issue
val sinks = flowDef.getSinks.asInstanceOf[JMap[String, Any]]
val sinkName = this.toString
val sinkName = sourceIdentifier
if (!sinks.containsKey(sinkName)) {
sinks.put(sinkName, createTap(Write)(mode))
}
Expand Down
Loading