Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make an Execution[T] type, which is a monad, which makes composing Jobs easy. #974

Merged
merged 8 commits into from
Jul 28, 2014

Conversation

johnynek
Copy link
Collaborator

No description provided.

/**
* This is a Monad, that represents a computation and a result
*/
sealed trait Execution[+T] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still reading the code so I'm just tossing stuff out as I go along, but this seems like a Reader[(Config, Mode, ConcurrentExecutionContext), Future[T]] ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sort of, but actually it should be a ReaderT[WriterT[ImmutableFlowDef], (Config, Mode, ConcurrentExecutionContext), Future, T] or something like that. There are three monads here, Reader, Writer, Future.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a similar note, should we lift this out to have a ScaldingExecution[...], this seems like something that would be useful outside of scalding jobs themselves. -- Though i'm quite in favor of getting this down and abstracting later also.

* This is the functionally pure approach to building jobs. Note,
* that you have to call run on the result for anything to happen here.
*/
def writeExecution(dest: TypedSink[T]): Execution[Unit] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so right now, it's common to do pipe.stuff.write(sink).stuff.write(sink2) and so on...if we want to encourage functional purity, we'd force the user to compose executions, right? I wonder if it would be useful to have another

def writeThrough(dest: TypedSource[T] with TypedSink[T]): Execution[TypedPipe[T]]

where it wouldn't force the user to compose the Executions.

That said, I'm still ruminating on whether or not this is necessary...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it is, that example would all be part of one execution right? what does writeThrough mean there? we force write it out and resume from there?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add that easily. snapshotExecution is similar, you just don't control the task.

Alternatively:

val forked = somestuff.fork
val ex1 = forked.writeExecution(sink)
val ex2 = someMore(forked)
zip(ex1, ex2).unit : Execution[Unit]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edit: I wrote this before I saw oscar's, but I think it still stands

That's what I wasn't sure about. I mean once they do .write(), how can they continue computation?

If you have (tpipe:TypedPipeFactory).stuff.writeExecution(sink), now you're stuff. If you do

val tp = (tpipe:TypedPipeFactory).stuff
tp.writeExecution(sink)
tp.writeExecution(sink2)

What is going on, right? We have to call run on each I guess, and the composition is a bit weird. likely I'm misunderstanding something

Edit: this is after Oscar's

Is that really how we want people to use this? I feel like we should have a nicer way to do continued computation, or maybe we layer that on top since this is just a lower level set of primitives anyway

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is like summingbird .also. If you don't .zip them, one of them will be ignored. Watch out!

you can also flatMap them.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, the problem if we flatMap an Execution, it is going to push it into two flows. a Zip will be only one.

Let's recall, to make this pure, all the side effects (write) must wind up in the Execution type. So, they can use for (which will sequence things) or zip (which will run the branches in parallel), or they can avoid functional purity and go back the way the had it.

Any better suggestions?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about a method like:

def zipAll[T](exs: Seq[Execution[T]]): Execution[Seq[T]] =

And then you capture all your job into one.

How does that sound?

@johnynek
Copy link
Collaborator Author

Check out if these added zip methods make it clearer how to compose after writes.

@johnynek
Copy link
Collaborator Author

I implemented K-means this weekend without this patch. It is insane how much work you have to do to implement an iterative job without this pull request.

I'm really looking forward to merging this, as writing page-rank and k-means will become almost trivial (and will serve as excellent examples).

@jcoveney
Copy link
Contributor

Why not add the example to this patch, to yhe scalding test code that runs?
Would be great to have, and also good to concretize the API

@johnynek
Copy link
Collaborator Author

@jcoveney what your bar for shipping? Is implementing (and writing tests for, because I don't want a buggy example) of kmeans needed to see the utility in a composable execution monad?

@jcoveney
Copy link
Contributor

Certainly not part of the bar at all. I was just suggesting that if you
already had an example, it could be good to clean up and include as a nice
motivator for why this patch is awesome. People def ask about doing
iterative or dependent processing in scalding and this goes from awful to
amazing.

But definitely not necessary!

prev.run(conf, mode).map(fn)

// Don't bother applying the function if we are mapped
override def unit = prev.unit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to think if this could break people...it definitely will if fn has side effects. Trying to think if there are "more legitimate" cases where it would as well... ie where they need the computation to happen but don't care about the result.

Still, seems like it could lead to hard to understand bugs?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say, that is not map, that is foreach. Map is pure. We make the assumption that mapping functions are pure all the time.

By the way, what kind of side-effect would be safe to apply?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. I mean I agree with you. I suppose we will just punish people who abuse the API :)

@jcoveney
Copy link
Contributor

Had the question about .unit in Mapped, otherwise lgtm

@jcoveney
Copy link
Contributor

You need to merge master, feel free to merge.

@johnynek
Copy link
Collaborator Author

@jcoveney ok. Here is kmeans. With tests.

In a later pull request we can add .foreach or .recover, or other methods from Future.

@jcoveney
Copy link
Contributor

Wow, thanks for going above and beyond. I think that's an incredibly useful example though. A very nontrivial computation which can be expressed quite succinctly with this API.

jcoveney added a commit that referenced this pull request Jul 28, 2014
Make an Execution[T] type, which is a monad, which makes composing Jobs easy.
@jcoveney jcoveney merged commit 2da0540 into develop Jul 28, 2014
@caniszczyk caniszczyk deleted the execution_monad branch May 18, 2015 19:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants