Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pull cascading flowDef thread into cascading_backend #1681

Merged
merged 3 commits into from May 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
215 changes: 37 additions & 178 deletions scalding-core/src/main/scala/com/twitter/scalding/Execution.scala
Expand Up @@ -15,22 +15,16 @@ limitations under the License.
*/
package com.twitter.scalding

import com.twitter.algebird.monad.{ Reader, Trampoline }
import cascading.flow.{ FlowDef, Flow }
import com.twitter.algebird.monad.Trampoline
import com.twitter.algebird.{ Monoid, Monad, Semigroup }
import com.twitter.scalding.cascading_interop.FlowListenerPromise
import com.twitter.scalding.filecache.{CachedFile, DistributedCacheFile}
import com.twitter.scalding.Dsl.flowDefToRichFlowDef
import java.util.concurrent.LinkedBlockingQueue
import scala.concurrent.{ Await, Future, ExecutionContext => ConcurrentExecutionContext, Promise }
import scala.util.{ Failure, Success, Try }
import scala.util.control.NonFatal
import cascading.flow.{ FlowDef, Flow }
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{ FileSystem, Path }
import org.slf4j.LoggerFactory

import com.twitter.scalding.typed.cascading_backend.AsyncFlowDefRunner
import scala.collection.mutable
import scala.concurrent.{ Await, Future, ExecutionContext => ConcurrentExecutionContext, Promise }
import scala.runtime.ScalaRunTime
import scala.util.{ Failure, Success, Try }

/**
* Execution[T] represents and computation that can be run and
Expand Down Expand Up @@ -148,14 +142,15 @@ sealed trait Execution[+T] extends java.io.Serializable { self: Product =>
* Seriously: pro-style is for this to be called only once in a program.
*/
final def run(conf: Config, mode: Mode)(implicit cec: ConcurrentExecutionContext): Future[T] = {
val ec = new EvalCache
val runner = new AsyncFlowDefRunner
val ec = new EvalCache(runner)
val confWithId = conf.setScaldingExecutionId(java.util.UUID.randomUUID.toString)
// get on Trampoline
val result = runStats(confWithId, mode, ec)(cec).get.map(_._1)
// When the final future in complete we stop the submit thread
result.onComplete { _ => ec.finished(mode) }
result.onComplete { _ => runner.finished(mode) }
// wait till the end to start the thread in case the above throws
ec.start()
runner.start()
result
}

Expand Down Expand Up @@ -309,138 +304,20 @@ object Execution {
a.zip(b).map { case (ta, tb) => Monoid.plus(ta, tb) }
}

/**
* This is a Thread used as a shutdown hook to clean up temporary files created by some Execution
*
* If the job is aborted the shutdown hook may not run and the temporary files will not get cleaned up
*/
private[scalding] case class TempFileCleanup(filesToCleanup: Iterable[String], mode: Mode) extends Thread {
val LOG = LoggerFactory.getLogger(this.getClass)

override def run(): Unit = {
val fs = mode match {
case localMode: CascadingLocal => FileSystem.getLocal(new Configuration)
case hdfsMode: HadoopMode => FileSystem.get(hdfsMode.jobConf)
}

filesToCleanup.foreach { file: String =>
try {
val path = new Path(file)
if (fs.exists(path)) {
// The "true" parameter here indicates that we should recursively delete everything under the given path
fs.delete(path, true)
}
} catch {
// If we fail in deleting a temp file, log the error but don't fail the run
case e: Throwable => LOG.warn(s"Unable to delete temp file $file", e)
}
}
}
}

/**
* This is a mutable state that is kept internal to an execution
* as it is evaluating.
*/
private[scalding] object EvalCache {
/**
* We send messages from other threads into the submit thread here
*/
private[EvalCache] sealed trait FlowDefAction
private[EvalCache] case class RunFlowDef(conf: Config,
mode: Mode,
fd: FlowDef,
result: Promise[(Long, JobStats)]) extends FlowDefAction
private[EvalCache] case object Stop extends FlowDefAction
}

private[scalding] class EvalCache {
import EvalCache._
private[scalding] class EvalCache(val runner: AsyncFlowDefRunner) {

type Counters = Map[Long, ExecutionCounters]
private[this] val cache = new FutureCache[(Config, Execution[Any]), (Any, Counters)]
private[this] val toWriteCache = new FutureCache[(Config, ToWrite), Counters]
private[this] val filesToCleanup = mutable.Set[String]()

// This method with return a 'clean' cache, that shares
// the underlying thread and message queue of the parent evalCache
def cleanCache: EvalCache = {
val self = this
new EvalCache {
override protected[EvalCache] val messageQueue: LinkedBlockingQueue[EvalCache.FlowDefAction] = self.messageQueue
override def addFilesToCleanup(files: TraversableOnce[String]): Unit = self.addFilesToCleanup(files)
override def start(): Unit = sys.error("Invalid to start child EvalCache")
override def finished(mode: Mode): Unit = sys.error("Invalid to finish child EvalCache")
}
}

protected[EvalCache] val messageQueue = new LinkedBlockingQueue[EvalCache.FlowDefAction]()
/**
* Hadoop and/or cascading has some issues, it seems, with starting jobs
* from multiple threads. This thread does all the Flow starting.
*/
protected lazy val thread = new Thread(new Runnable {
def run(): Unit = {
@annotation.tailrec
def go(id: Long): Unit = messageQueue.take match {
case Stop => ()
case RunFlowDef(conf, mode, fd, promise) =>
try {
val ctx = ExecutionContext.newContext(conf)(fd, mode)
ctx.buildFlow match {
case Success(flow) =>
promise.completeWith(Execution.run(id, flow))
case Failure(err) =>
promise.failure(err)
}
} catch {
case t: Throwable =>
// something bad happened, but this thread is a daemon
// that should only stop if all others have stopped or
// we have received the stop message.
// Stopping this thread prematurely can deadlock
// futures from the promise we have.
// In a sense, this thread does not exist logically and
// must forward all exceptions to threads that requested
// this work be started.
promise.tryFailure(t)
}
// Loop
go(id + 1)
}

// Now we actually run the recursive loop
go(0)
}
})

def runFlowDef(conf: Config, mode: Mode, fd: FlowDef): Future[(Long, JobStats)] =
try {
val promise = Promise[(Long, JobStats)]()
val fut = promise.future
messageQueue.put(RunFlowDef(conf, mode, fd, promise))
// Don't do any work after the .put call, we want no chance for exception
// after the put
fut
} catch {
case NonFatal(e) =>
Future.failed(e)
}

def start(): Unit = {
// Make sure this thread can't keep us running if all others are gone
thread.setDaemon(true)
thread.start()
}
/*
* This is called after we are done submitting all jobs
*/
def finished(mode: Mode): Unit = {
messageQueue.put(Stop)
if (filesToCleanup.nonEmpty) {
Runtime.getRuntime.addShutdownHook(TempFileCleanup(filesToCleanup, mode))
}
}
def cleanCache: EvalCache =
new EvalCache(runner)

def getOrLock(cfg: Config, write: ToWrite): Either[Promise[Counters], Future[Counters]] =
toWriteCache.getOrPromise((cfg, write))
Expand All @@ -454,10 +331,6 @@ object Execution {
def getOrElseInsert[T](cfg: Config, ex: Execution[T],
res: => Future[(T, Counters)]): Future[(T, Counters)] =
getOrElseInsertWithFeedback(cfg, ex, res)._2

def addFilesToCleanup(files: TraversableOnce[String]): Unit = filesToCleanup.synchronized {
filesToCleanup ++= files
}
}

private case class FutureConst[T](get: ConcurrentExecutionContext => Future[T]) extends Execution[T] {
Expand Down Expand Up @@ -658,13 +531,10 @@ object Execution {
*/
private case class FlowDefExecution(result: (Config, Mode) => FlowDef) extends Execution[Unit] {
protected def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) = {
lazy val future =
for {
flowDef <- toFuture(Try(result(conf, mode)))
_ = FlowStateMap.validateSources(flowDef, mode)
(id, jobStats) <- cache.runFlowDef(conf, mode, flowDef)
_ = FlowStateMap.clear(flowDef)
} yield ((), Map(id -> ExecutionCounters.fromJobStats(jobStats)))
lazy val future = cache
.runner
.validateAndRun(conf, mode)(result)
.map { m => ((), m) }

Trampoline(cache.getOrElseInsert(conf, this, future))
}
Expand All @@ -675,20 +545,23 @@ object Execution {
* but with proof that pipe matches sink
*/

private trait ToWrite {
def write(config: Config, flowDef: FlowDef, mode: Mode): Unit
}
private case class SimpleWrite[T](pipe: TypedPipe[T], sink: TypedSink[T]) extends ToWrite {
def write(config: Config, flowDef: FlowDef, mode: Mode): Unit = {
// This has the side effect of mutating flowDef
pipe.write(sink)(flowDef, mode)
()
}
sealed trait ToWrite
object ToWrite {
case class SimpleWrite[T](pipe: TypedPipe[T], sink: TypedSink[T]) extends ToWrite
case class PreparedWrite[T](fn: (Config, Mode) => SimpleWrite[T]) extends ToWrite
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to change the visibility of these?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the idea is that we want to be able to write new backends. To do that, you will need to be able to see these types (so, in this case AsyncFlowDefRunner needs to be able to pattern match on them).

}

private case class PreparedWrite[T](fn: (Config, Mode) => SimpleWrite[T]) extends ToWrite {
def write(config: Config, flowDef: FlowDef, mode: Mode): Unit =
fn(config, mode).write(config, flowDef, mode)
/**
* Something that can handle a batch of writes that may be optimized
* before running. Return a unique Long for each run and Counters
*/
trait Writer {
def execute(
conf: Config,
mode: Mode,
head: ToWrite,
rest: List[ToWrite])(implicit cec: ConcurrentExecutionContext): Future[Map[Long, ExecutionCounters]]

}

/**
Expand All @@ -708,20 +581,6 @@ object Execution {
override def map[U](mapFn: T => U): Execution[U] =
WriteExecution(head, tail, { (conf: Config, mode: Mode) => mapFn(fn(conf, mode)) })

/* Run a list of ToWrite elements */
private[this] def scheduleToWrites(conf: Config,
mode: Mode,
cache: EvalCache,
head: ToWrite,
tail: List[ToWrite])(implicit cec: ConcurrentExecutionContext): Future[Map[Long, ExecutionCounters]] = {
for {
flowDef <- toFuture(Try { val fd = new FlowDef; (head :: tail).foreach(_.write(conf, fd, mode)); fd })
_ = FlowStateMap.validateSources(flowDef, mode)
(id, jobStats) <- cache.runFlowDef(conf, mode, flowDef)
_ = FlowStateMap.clear(flowDef)
} yield Map(id -> ExecutionCounters.fromJobStats(jobStats))
}

def unwrapListEither[A, B, C](it: List[(A, Either[B, C])]): (List[(A, B)], List[(A, C)]) = it match {
case (a, Left(b)) :: tail =>
val (l, r) = unwrapListEither(tail)
Expand All @@ -737,7 +596,7 @@ object Execution {
// Anything not already ran we run as part of a single flow def, using their combined counters for the others
protected def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) = {
Trampoline(cache.getOrElseInsert(conf, this, {
cache.addFilesToCleanup(tempFilesToCleanup(conf, mode))
cache.runner.addFilesToCleanup(tempFilesToCleanup(conf, mode))
val cacheLookup: List[(ToWrite, Either[Promise[Map[Long, ExecutionCounters]], Future[Map[Long, ExecutionCounters]]])] =
(head :: tail).map{ tw => (tw, cache.getOrLock(conf, tw)) }
val (weDoOperation, someoneElseDoesOperation) = unwrapListEither(cacheLookup)
Expand All @@ -750,7 +609,7 @@ object Execution {
weDoOperation match {
case all @ (h :: tail) =>
val futCounters: Future[Map[Long, ExecutionCounters]] =
scheduleToWrites(conf, mode, cache, h._1, tail.map(_._1))
cache.runner.execute(conf, mode, h._1, tail.map(_._1))
// Complete all of the promises we put into the cache
// with this future counters set
all.foreach {
Expand Down Expand Up @@ -854,7 +713,7 @@ object Execution {
* type U for the resultant execution.
*/
private[scalding] def write[T, U](pipe: TypedPipe[T], sink: TypedSink[T], generatorFn: (Config, Mode) => U): Execution[U] =
WriteExecution(SimpleWrite(pipe, sink), Nil, generatorFn)
WriteExecution(ToWrite.SimpleWrite(pipe, sink), Nil, generatorFn)

/**
* The simplest form, just sink the typed pipe into the sink and get a unit execution back
Expand All @@ -863,17 +722,17 @@ object Execution {
write(pipe, sink, ())

private[scalding] def write[T, U](pipe: TypedPipe[T], sink: TypedSink[T], presentType: => U): Execution[U] =
WriteExecution(SimpleWrite(pipe, sink), Nil, { (_: Config, _: Mode) => presentType })
WriteExecution(ToWrite.SimpleWrite(pipe, sink), Nil, { (_: Config, _: Mode) => presentType })

/**
* Here we allow both the targets to write and the sources to be generated from the config and mode.
* This allows us to merge things looking for the config and mode without using flatmap.
*/
private[scalding] def write[T, U](fn: (Config, Mode) => (TypedPipe[T], TypedSink[T]), generatorFn: (Config, Mode) => U,
tempFilesToCleanup: (Config, Mode) => Set[String] = (_, _) => Set()): Execution[U] =
WriteExecution(PreparedWrite({ (cfg: Config, m: Mode) =>
WriteExecution(ToWrite.PreparedWrite({ (cfg: Config, m: Mode) =>
val r = fn(cfg, m)
SimpleWrite(r._1, r._2)
ToWrite.SimpleWrite(r._1, r._2)
}), Nil, generatorFn, tempFilesToCleanup)

/**
Expand Down