Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an Executor to run flows without a Job #915

Merged
merged 8 commits into from
Jun 27, 2014
Merged

Conversation

johnynek
Copy link
Collaborator

This is based on #914 The goal here is to make it easy to take a function that produces a TypedPipe, and to run that.

Next, we should have a methods that write TypedPipe[T] or ValuePipe[T] to the temp directory, then reads that in the Future to get a Iterable[T] or T respectively.

With the above, you can easily write jobs that loop that does some ML algorithm, for instance.

@jcoveney
Copy link
Contributor

Merge master now that the config patch is in

@johnynek
Copy link
Collaborator Author

closes #889

* secondPipe <- job2
* } yield firstPipe.group.join(secondPipe.join)
*/
def buildFlow[T](mode: Mode, conf: Config)(op: Reader[ExecutionContext, T]): (T, Flow[_]) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also want to document to them how to contrust the proper mode and conf object, if this is meant to be instructive?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.

try {
val resultT = op(ec)

// The newFlowDef is ready now, and mutates newFlowDef as a side effect. :(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason is that wildcards are not interchangeable. The _ Flow[] in the return type is different from the Flow[] here. One solution is to make them both Flow[Any]. Another solution is to add a type parameter that does nothing. Another solution is to curse cascading's dumb use of wildcards.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But Flow[Any] is not really right either, because Flow[C] is invariant in C even though it should be covariant (C only appears in return values). It really has some type, we just don't tell you what it is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if you do a type parameter C and pass it to the return type, it will just get filled in with Nothing and then erased. This should work fine, no?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, yes, I know the _ are different somethings but, why does manually doing the try work, but using Try does not. That seems like a compiler bug to me.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But that's also a lie, since we don't really know or ever care what C is.
In fact, C seems like an error in cascading's design, since he
backpedaled and uses Map<Object, Object> in most places now for the
config.

On Thu, Jun 26, 2014 at 11:20 AM, Jonathan Coveney <notifications@github.com

wrote:

In scalding-core/src/main/scala/com/twitter/scalding/Execution.scala:

  • * Note that the only config considered is in conf.
  • * The caller is responsible for setting up the Config
  • * completely
  • */
  • def buildFlow[T](mode: Mode, conf: Config)(op: Reader[ExecutionContext, T]): (T, Try[Flow[_]]) = {
  • val newFlowDef = new FlowDef
  • conf.getCascadingAppName.foreach(newFlowDef.setName)
  • // Set up the uniqueID, which is used to access to counters
  • val uniqueId = UniqueID(UUID.randomUUID.toString)
  • val finalConf = conf.setUniqueId(uniqueId)
  • val ec = ExecutionContext.newContext(newFlowDef, mode, uniqueId)
  • try {
  •  val resultT = op(ec)
    
  •  // The newFlowDef is ready now, and mutates newFlowDef as a side effect. :(
    

I think if you do a type parameter C and pass it to the return type, it
will just get filled in with Nothing and then erased. This should work
fine, no?


Reply to this email directly or view it on GitHub
https://github.com/twitter/scalding/pull/915/files#r14267542.

Oscar Boykin :: @posco :: http://twitter.com/posco

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh it's definitely an error in cascading's design.

Thinking on why it's an error... that's a good point. Hm.

@jcoveney
Copy link
Contributor

Really digging these Scalding patches, Oscar. Hoping we can get to the point where Summingbird is just an example of using these API's, instead of being a weird Job-less workaround. Will be great for end users.

if(!result.isCompleted()) {
// we use the above rather than trySuccess to avoid calling fn twice
try {
T toPut = fn.apply(f);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this FN? would it not suffice to be a Future[Flow[Config]] ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need it, but I didn't want to pass on scala.concurrent.ExecutionContext just to access the stats, which clearly is not an expensive operation.

So, this is a map tacked on the end, but logically, you are right. Since this is an adapter class, I'd say it is okay, but there is some code smell here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok cool fair enough, not used the scala futures really enough to know when those are needed/not needed.

@@ -163,6 +163,8 @@ class Job(val args: Args) extends FieldConversions with java.io.Serializable {
*
* Tip: override this method, call super, and ++ your additional
* map to add or overwrite more options
*
* TODO: Should we bite the bullet and return Config here?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is where using github tags would be nice...it'd be good to accrue some of the bigger breaking changes we'd like before 1.0. Like this.

@jcoveney
Copy link
Contributor

minor nits, looks good otherwise

jcoveney added a commit that referenced this pull request Jun 27, 2014
Add an Executor to run flows without a Job
@jcoveney jcoveney merged commit 26af923 into develop Jun 27, 2014
@jcoveney jcoveney deleted the oscar-executor branch June 27, 2014 01:16
@avibryant
Copy link
Contributor

So good to see this get done. Thanks @johnynek.

@bholt bholt mentioned this pull request Jul 1, 2014
2 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants