Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an Executor to run flows without a Job #915

Merged
merged 8 commits into from Jun 27, 2014
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,60 @@
/*
Copyright 2014 Twitter, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.twitter.scalding.cascading_interop;

import cascading.flow.FlowListener;
import cascading.flow.Flow;
import scala.concurrent.Promise$;
import scala.concurrent.Promise;
import scala.concurrent.Future;

/*
* The cascading API uses a raw type here which is difficult to
* deal with in scala
*/
public class FlowListenerPromise {
/*
* This starts the flow and applies a mapping function fn in
* the same thread that completion happens
*/
public static <Config, T> Future<T> start(Flow<Config> flow, final scala.Function1<Flow<Config>, T> fn) {
final Promise<T> result = Promise$.MODULE$.<T>apply();
flow.addListener(new FlowListener() {
public void onStarting(Flow f) { } // ignore
public void onStopping(Flow f) { } // ignore
public void onCompleted(Flow f) {
// This is always called, but onThrowable is called first
if(!result.isCompleted()) {
// we use the above rather than trySuccess to avoid calling fn twice
try {
T toPut = fn.apply(f);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this FN? would it not suffice to be a Future[Flow[Config]] ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need it, but I didn't want to pass on scala.concurrent.ExecutionContext just to access the stats, which clearly is not an expensive operation.

So, this is a map tacked on the end, but logically, you are right. Since this is an adapter class, I'd say it is okay, but there is some code smell here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok cool fair enough, not used the scala futures really enough to know when those are needed/not needed.

result.success(toPut);
}
catch(Throwable t) {
result.failure(t);
}
}
}
public boolean onThrowable(Flow f, Throwable t) {
result.failure(t);
// The exception is handled by the owner of the promise and should not be rethrown
return true;
}
});
flow.start();
return result.future();
}
}
12 changes: 9 additions & 3 deletions scalding-core/src/main/scala/com/twitter/scalding/Config.scala
Expand Up @@ -31,8 +31,9 @@ import java.security.MessageDigest
/**
* This is a wrapper class on top of Map[String, String]
*/
case class Config(toMap: Map[String, String]) {
trait Config {
import Config._ // get the constants
def toMap: Map[String, String]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are some non-String keys allowed. Not a ton, but they're allowed. Do we just not care?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are they?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


def get(key: String): Option[String] = toMap.get(key)
def +(kv: (String, String)): Config = Config(toMap + kv)
Expand All @@ -44,6 +45,7 @@ case class Config(toMap: Map[String, String]) {
case (None, r) => (r, this - k)
}

def getCascadingAppName: Option[String] = get(CascadingAppName)
def setCascadingAppName(name: String): Config =
this + (CascadingAppName -> name)

Expand Down Expand Up @@ -133,7 +135,7 @@ case class Config(toMap: Map[String, String]) {
this + (Config.ScaldingVersion -> scaldingVersion)

/*
* This is *required* is you are using counters. You must use
* This is *required* if you are using counters. You must use
* the same UniqueID as you used when defining your jobs.
*/
def setUniqueId(u: UniqueID): Config =
Expand Down Expand Up @@ -182,7 +184,11 @@ object Config {
.setSerialization(Right(classOf[serialization.KryoHadoop]))
.setScaldingVersion

implicit def from(m: Map[String, String]): Config = Config(m)
def apply(m: Map[String, String]): Config = new Config { def toMap = m }
/*
* Implicits cannot collide in name, so making apply impliict is a bad idea
*/
implicit def from(m: Map[String, String]): Config = apply(m)

/**
* Returns all the non-string keys on the left, the string keys/values on the right
Expand Down
160 changes: 160 additions & 0 deletions scalding-core/src/main/scala/com/twitter/scalding/Execution.scala
@@ -0,0 +1,160 @@
/*
Copyright 2014 Twitter, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.twitter.scalding

import com.twitter.algebird.monad.Reader
import com.twitter.scalding.cascading_interop.FlowListenerPromise

import scala.concurrent.{ Future, Promise }
import scala.util.{ Failure, Success, Try }
import cascading.flow.{ FlowDef, Flow, FlowListener }

import java.util.UUID

/*
* This has all the state needed to build a single flow
* This is used with the implicit-arg-as-dependency-injection
* style and with the Reader-as-dependency-injection
*/
trait ExecutionContext {
def mode: Mode
def flowDef: FlowDef
def uniqueId: UniqueID
}

/*
* import ExecutionContext._
* is generally needed to use the ExecutionContext as the single
* dependency injected. For instance, TypedPipe needs FlowDef and Mode
* in many cases, so if you have an implicit ExecutionContext, you need
* modeFromImplicit, etc... below.
*/
object ExecutionContext {
/*
* implicit val ec = ExecutionContext.newContext
* can be used inside of a Job to get an ExecutionContext if you want
* to call a function that requires an implicit ExecutionContext
*/
def newContext(implicit fd: FlowDef, m: Mode, u: UniqueID): ExecutionContext =
new ExecutionContext {
def mode = m
def flowDef = fd
def uniqueId = u
}
implicit def modeFromContext(implicit ec: ExecutionContext): Mode = ec.mode
implicit def flowDefFromContext(implicit ec: ExecutionContext): FlowDef = ec.flowDef
implicit def uniqueIdFromContext(implicit ec: ExecutionContext): UniqueID = ec.uniqueId
}

object Execution {
/*
* Here is the recommended way to run scalding as a library
* Put all your logic is calls like this:
* import ExecutionContext._
*
* Reader(implicit ec: ExecutionContext =>
* //job here
* )
* you can compose these readers in flatMaps:
* for {
* firstPipe <- job1
* secondPipe <- job2
* } yield firstPipe.group.join(secondPipe.join)
*
* Note that the only config considered is in conf.
* The caller is responsible for setting up the Config
* completely
*/
def buildFlow[T](mode: Mode, conf: Config)(op: Reader[ExecutionContext, T]): (T, Try[Flow[_]]) = {
val newFlowDef = new FlowDef
conf.getCascadingAppName.foreach(newFlowDef.setName)
// Set up the uniqueID, which is used to access to counters
val uniqueId = UniqueID(UUID.randomUUID.toString)
val finalConf = conf.setUniqueId(uniqueId)
val ec = ExecutionContext.newContext(newFlowDef, mode, uniqueId)
try {
val resultT = op(ec)

// The newFlowDef is ready now, and mutates newFlowDef as a side effect. :(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason is that wildcards are not interchangeable. The _ Flow[] in the return type is different from the Flow[] here. One solution is to make them both Flow[Any]. Another solution is to add a type parameter that does nothing. Another solution is to curse cascading's dumb use of wildcards.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But Flow[Any] is not really right either, because Flow[C] is invariant in C even though it should be covariant (C only appears in return values). It really has some type, we just don't tell you what it is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if you do a type parameter C and pass it to the return type, it will just get filled in with Nothing and then erased. This should work fine, no?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, yes, I know the _ are different somethings but, why does manually doing the try work, but using Try does not. That seems like a compiler bug to me.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But that's also a lie, since we don't really know or ever care what C is.
In fact, C seems like an error in cascading's design, since he
backpedaled and uses Map<Object, Object> in most places now for the
config.

On Thu, Jun 26, 2014 at 11:20 AM, Jonathan Coveney <notifications@github.com

wrote:

In scalding-core/src/main/scala/com/twitter/scalding/Execution.scala:

  • * Note that the only config considered is in conf.
  • * The caller is responsible for setting up the Config
  • * completely
  • */
  • def buildFlow[T](mode: Mode, conf: Config)(op: Reader[ExecutionContext, T]): (T, Try[Flow[_]]) = {
  • val newFlowDef = new FlowDef
  • conf.getCascadingAppName.foreach(newFlowDef.setName)
  • // Set up the uniqueID, which is used to access to counters
  • val uniqueId = UniqueID(UUID.randomUUID.toString)
  • val finalConf = conf.setUniqueId(uniqueId)
  • val ec = ExecutionContext.newContext(newFlowDef, mode, uniqueId)
  • try {
  •  val resultT = op(ec)
    
  •  // The newFlowDef is ready now, and mutates newFlowDef as a side effect. :(
    

I think if you do a type parameter C and pass it to the return type, it
will just get filled in with Nothing and then erased. This should work
fine, no?


Reply to this email directly or view it on GitHub
https://github.com/twitter/scalding/pull/915/files#r14267542.

Oscar Boykin :: @posco :: http://twitter.com/posco

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh it's definitely an error in cascading's design.

Thinking on why it's an error... that's a good point. Hm.

// For some horrible reason, using Try( ) instead of the below gets me stuck:
// [error]
// /Users/oscar/workspace/scalding/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala:92:
// type mismatch;
// [error] found : cascading.flow.Flow[_]
// [error] required: cascading.flow.Flow[?0(in method buildFlow)] where type ?0(in method
// buildFlow)
// [error] Note: Any >: ?0, but Java-defined trait Flow is invariant in type Config.
// [error] You may wish to investigate a wildcard type such as `_ >: ?0`. (SLS 3.2.10)
// [error] (resultT, Try(mode.newFlowConnector(finalConf).connect(newFlowDef)))

val tryFlow = try {
val flow = mode.newFlowConnector(finalConf).connect(newFlowDef)
Success(flow)
}
catch {
case err: Throwable => Failure(err)
}
(resultT, tryFlow)
} finally {
FlowStateMap.clear(newFlowDef)
}
}

/*
* If you want scalding to fail if the sources cannot be validated, then
* use this.
* Alteratively, in your Reader, call Source.validateTaps(Mode) to
* control which sources individually need validation
* Suggested use:
* for {
* result <- job
* mightErr <- validate
* } yield mightErr.map(_ => result)
*/
def validate: Reader[ExecutionContext, Try[Unit]] =
Reader { ec => Try(FlowStateMap.validateSources(ec.flowDef, ec.mode)) }

def run[T](mode: Mode, conf: Config)(op: Reader[ExecutionContext, T]): (T, Future[JobStats]) = {
val (t, tryFlow) = buildFlow(mode, conf)(op)
val fut = tryFlow match {
case Success(flow) => run(flow)
case Failure(err) => Future.failed(err)
}
(t, fut)
}

/*
* This runs a Flow using Cascading's built in threads. The resulting JobStats
* are put into a promise when they are ready
*/
def run[C](flow: Flow[C]): Future[JobStats] =
// This is in Java because of the cascading API's raw types on FlowListener
FlowListenerPromise.start(flow, { f: Flow[C] => JobStats(f.getFlowStats) })

def waitFor[T](mode: Mode, conf: Config)(op: Reader[ExecutionContext, T]): (T, Try[JobStats]) = {
val (t, tryFlow) = buildFlow(mode, conf)(op)
(t, tryFlow.flatMap(waitFor(_)))
}
/*
* This blocks the current thread until the job completes with either success or
* failure.
*/
def waitFor[C](flow: Flow[C]): Try[JobStats] =
Try {
flow.complete;
JobStats(flow.getStats)
}
}
21 changes: 14 additions & 7 deletions scalding-core/src/main/scala/com/twitter/scalding/Job.scala
Expand Up @@ -143,7 +143,7 @@ class Job(val args: Args) extends FieldConversions with java.io.Serializable {
* Keep 100k tuples in memory by default before spilling
* Turn this up as high as you can without getting OOM.
*
* This is ignored if there is a value set in the incoming mode.config
* This is ignored if there is a value set in the incoming jobConf on Hadoop
*/
def defaultSpillThreshold: Int = 100 * 1000

Expand All @@ -163,6 +163,8 @@ class Job(val args: Args) extends FieldConversions with java.io.Serializable {
*
* Tip: override this method, call super, and ++ your additional
* map to add or overwrite more options
*
* TODO: Should we bite the bullet and return Config here?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is where using github tags would be nice...it'd be good to accrue some of the bigger breaking changes we'd like before 1.0. Like this.

*/
def config: Map[AnyRef, AnyRef] = {
val base = Config.empty
Expand All @@ -174,12 +176,14 @@ class Job(val args: Args) extends FieldConversions with java.io.Serializable {
System.setProperty(AppProps.APP_FRAMEWORKS,
String.format("scalding:%s", scaldingVersion))

val (nonStrings, modeConf) = Config.stringsFrom(mode.config)
// All the above are string keys, so overwrite and ++ will work
val modeConf = mode match {
case h: HadoopMode => Config.fromHadoop(h.jobConf)
case _ => Config.empty
}

val init = base ++ modeConf

Config.overwrite(nonStrings,
defaultComparator.map(init.setDefaultComparator)
defaultComparator.map(init.setDefaultComparator)
.getOrElse(init)
.setSerialization(Right(classOf[serialization.KryoHadoop]), ioSerializations)
.setScaldingVersion
Expand All @@ -188,7 +192,8 @@ class Job(val args: Args) extends FieldConversions with java.io.Serializable {
.setScaldingFlowClass(getClass)
.setArgs(args)
.setUniqueId(uniqueId)
.maybeSetSubmittedTimestamp()._2)
.maybeSetSubmittedTimestamp()._2
.toMap.toMap //second to lift to AnyRef, AnyRef
}

def skipStrategy: Option[FlowSkipStrategy] = None
Expand All @@ -199,7 +204,9 @@ class Job(val args: Args) extends FieldConversions with java.io.Serializable {
* combine the config, flowDef and the Mode to produce a flow
*/
def buildFlow: Flow[_] = {
val flow = mode.newFlowConnector(config).connect(flowDef)
val (nonStrings, conf) = Config.stringsFrom(config.mapValues(_.toString))
assert(nonStrings.isEmpty, "Non-string keys are not supported")
val flow = mode.newFlowConnector(conf).connect(flowDef)
listeners.foreach { flow.addListener(_) }
stepListeners.foreach { flow.addStepListener(_) }
skipStrategy.foreach { flow.setFlowSkipStrategy(_) }
Expand Down
25 changes: 5 additions & 20 deletions scalding-core/src/main/scala/com/twitter/scalding/Mode.scala
Expand Up @@ -79,16 +79,6 @@ object Mode {
}

trait Mode extends java.io.Serializable {
/**
* TODO: This should probably be Map[String, String]
*
* This is the input config of arguments passed in from
* Hadoop defaults, or possibly from the base config of this
* mode.
*
* this map is transformed by Job.config before running
*/
def config: Map[AnyRef, AnyRef]
/*
* Using a new FlowProcess, which is only suitable for reading outside
* of a map/reduce job, open a given tap and return the TupleEntryIterator
Expand All @@ -97,17 +87,14 @@ trait Mode extends java.io.Serializable {
// Returns true if the file exists on the current filesystem.
def fileExists(filename: String): Boolean
/** Create a new FlowConnector for this cascading planner */
def newFlowConnector(props: Map[AnyRef, AnyRef]): FlowConnector
def newFlowConnector(props: Config): FlowConnector
}

trait HadoopMode extends Mode {
def jobConf: Configuration

/* the second toMap lifts from AnyRef up to String, :( */
override def config = Config.fromHadoop(jobConf).toMap.toMap

override def newFlowConnector(props: Map[AnyRef, AnyRef]) =
new HadoopFlowConnector(props.asJava)
override def newFlowConnector(conf: Config) =
new HadoopFlowConnector(conf.toMap.toMap[AnyRef, AnyRef].asJava)

// TODO unlike newFlowConnector, this does not look at the Job.config
override def openForRead(tap: Tap[_, _, _]) = {
Expand All @@ -121,10 +108,8 @@ trait HadoopMode extends Mode {
}

trait CascadingLocal extends Mode {
override def config = Map[AnyRef, AnyRef]()

override def newFlowConnector(props: Map[AnyRef, AnyRef]) =
new LocalFlowConnector(props.asJava)
override def newFlowConnector(conf: Config) =
new LocalFlowConnector(conf.toMap.toMap[AnyRef, AnyRef].asJava)

override def openForRead(tap: Tap[_, _, _]) = {
val ltap = tap.asInstanceOf[Tap[Properties, _, _]]
Expand Down
14 changes: 13 additions & 1 deletion scalding-core/src/main/scala/com/twitter/scalding/Stats.scala
Expand Up @@ -9,6 +9,18 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.ref.WeakReference

/*
* This can be a bit tricky to use, but it is important that incBy and inc
* are called INSIDE any map or reduce functions.
* Like:
* val stat = Stat("test")
* .map { x =>
* stat.inc
* 2 * x
* }
* NOT: map( { stat.inc; { x => 2*x } } )
* which increments on the submitter before creating the function. See the difference?
*/
case class Stat(name: String, group: String = Stats.ScaldingGroup)(@transient implicit val uniqueIdCont: UniqueID) {
@transient private lazy val logger: Logger = LoggerFactory.getLogger(this.getClass)
val uniqueId = uniqueIdCont.get
Expand Down Expand Up @@ -66,4 +78,4 @@ object Stats {
} yield (counter, value)
counts.toMap
}
}
}