Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snapshot a pipe in the REPL #918

Merged
merged 35 commits into from
Jul 1, 2014
Merged

Snapshot a pipe in the REPL #918

merged 35 commits into from
Jul 1, 2014

Conversation

bholt
Copy link
Contributor

@bholt bholt commented Jun 26, 2014

Fixes #879, could be in the territory of #889.

This pull request attempts to address the issue with not being able to run sub-flows in the repl. Rather than resetting the implicit flowDef (which disallows re-running pipes -- #879), this adds the ability to run a "sub-flow" for a given TypedPipe that includes only the upstream pipes and sources needed to compute that tail pipe.

This is accessible in the REPL via a couple new enrichments to TypedPipe:

  • .snapshot: immediately runs and writes the pipe to a temporary SequenceFile, returning a TypedPipe with the SequenceFile as a source.
  • .save(dest): same as snapshot but with a user-provided Sink (kinda like shorthand for .write(dest).run), also returning a TypedPipe with the saved file as a new Source.

In my current way of thinking, snapshots allow one to inspect intermediate steps of a job as they build it:

> val input = TypedPipe.from(TextLine("tutorial/data/hello.txt"))
> val wordsPipe: TypedPipe[String] = input.flatMap(_.split("\\s+"))
> val s1: TypedPipe[String] = wordsPipe.snapshot

One then has a choice, when continuing to work on a job, to use the snapshot pipe, if they wish to "memoize" the intermediate work, or use the original pipe, which will cause the flow to be re-evaluated.

Before continuing to work on this, I wanted to elicit some feedback on the intended direction. Overall, I'm hoping to come up with behavior that is least surprising to users of Scalding, but also cater to users coming from other ad-hoc analytics backgrounds (i.e. Pig, Spark)

  1. Part of using snapshots should be good ways to actually view the contents of the snapshot. I would like to allow a toList kind of behavior that would read the contents into memory. This could either be implicit (.toList calls .snapshot internally), or .snapshot could return a new kind of TypedPipe that allowed .toList to be called on it. (Add a better temp Source/Sink to the repl #709 may be related -- I believe this "sub-flow" ability may fix one of the outstanding issues there)
    • There could also be a Pig-like dump call added to snapshots that would just print the contents to stdout (though .toList pretty much subsumes this, as you can print a list to stdout)
  2. What should the semantics of run be in the context of this change? Currently it is broken because all the snapshots add to the implicit flowDef, causing run on the whole flow to balk at all the unconnected sources and sinks. Should run be fixed by just discarding the extraneous sources and sinks created in the REPL? Or should run just go away, in favor of always calling something like save to commit the contents of a pipe to a single Sink?
  3. Should snapshot ultimately be completely hidden from the user? This iteration allows an explicit choice to be made between building off of a snapshot of intermediate results or re-running the whole flow. An alternative would be to only create snapshots when people request to see the contents (via toList), and then swap out the memoized pipe for the original so the user gets consistent results. This leads to questions about whether memoized pipes should ever be invalidated when the original contents have changed (which sounds really daunting).

Thoughts?

@jcoveney
Copy link
Contributor

Nice! First responding to your overall thoughts, first thoughts are:

  • fix run to run the whole flow as the user has specified, ignoring extraneous sources etc
  • add dump
  • add toList (difference between dump is that you do not have to materialize all of the data into memory, toList is really just reading the result of dump into a List instead of the console)
  • exposing snapshots is ok, will allow uers to work more fluently with larger data sets

Great work. Was just going to bug you to put it on github :) Will comment on code as well.

*/
def writeFrom(pipe: Pipe)(implicit flowDef: FlowDef, mode: Mode) = {
def writeFromAndGetTail(pipe: Pipe)(implicit flowDef: FlowDef, mode: Mode) = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a return type to public methods (this old code didn't follow that rule).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes me pretty nervous, as this breaks old code. Anyone that overrides writeFrom will not have this behavior, right? Is there a way to be compatible?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we just look through the tals in the flowDef for the sinkName to get the pipe back?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will break code using writeFromAndGetTail directly when writeFrom is overridden. So far that just entails the REPL. Though we did talk yesterday about maybe looking through the sinks after the call for the new one and working from that. It would seem to be a bit more maybe janky, but far less of an impact on other code and robust to overrides too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good points. I'll see if I can come up with the least-brittle way to get the right tail pipe.

@johnynek
Copy link
Collaborator

Also note that #915 is related here. I think if we get that merged, the REPL code can call that and not make a Job.

This looks really great.

@bholt
Copy link
Contributor Author

bholt commented Jun 26, 2014

Also note that #915 is related here. I think if we get that merged, the REPL code can call that and not make a Job.

Agreed. I'll give it a try once that gets merged.


// come up with unique temporary filename
// TODO: refactor into TemporarySequenceFile class
val tmpSeq = "/tmp/scalding-repl/snapshot-" + UUID.randomUUID() + ".seq"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/**
* New flow def with only sources upstream from tails.
*/
def withoutUnusedSources = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public API's should always have explicit return types

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gets me every time.


override def mode = inmode
def toList[R](implicit ev: T <:< TypedPipe[R], manifest: Manifest[R]): List[R] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this go in ShellTypedPipe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's from before, and should probably just be left for now. Another PR should fix it with some real support for .toIterator, .dump, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could move it for now, though.

@jcoveney
Copy link
Contributor

This is some good stuff @bholt

fd.addSources(o.getSources)
fd.addSinks(o.getSinks)
fd.addTails(o.getTails)
fd.mergeMisc(o)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't like this. Just add all of it here. No need for two methods.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, you call it below. Nevermind... Can you add a comment avoid to explain the strange choice?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should return Unit to make it clear that there is mutation here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, .mergeFrom might make it clearer too.

}

def toList[R](implicit ev: T <:< TypedPipe[R], manifest: Manifest[R]): List[R] = {
def toList(implicit manifest: Manifest[T]): List[T] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just delete this for now. It is pretty broken now. It should be save, followed by .toIterator on a typed-version of SequenceFile.

We can get this working in the next PR.

@johnynek
Copy link
Collaborator

This is getting close to merge. I just had a few comments around renaming, TODOs, and deleting some code.

@sriramkrishnan
Copy link
Collaborator

Looks like pretty good stuff. How about additions to the README or Wiki page?

@bholt
Copy link
Contributor Author

bholt commented Jun 30, 2014

@johnynek: I've started trying to figure out how to use the new Execution code for the REPL, but I'll do that as a separate PR.

@sriramkrishnan: I think it'll make more sense to update the documentation when we've figured out how we're gonna handle toList and dump, as those would be part of demonstrating how the REPL is supposed to work.

* Mutate current flow def to add all sources/sinks/etc from given FlowDef
*/
def mergeFrom(o: FlowDef): Unit = {
fd.addSources(o.getSources)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like the add* methods FlowDef in cascading are not idempotent:

https://github.com/cwensel/cascading/blob/wip-2.6/cascading-core/src/main/java/cascading/flow/FlowDef.java#L153

Is that going to be a problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, addSources does look idempotent: it's just adding what is already in one map into another.

As for addSource(Pipe), it shouldn't be any different -- my understanding is that head pipes have the same name as their source, so addSource(Pipe,...) should be equivalent to addSource(name: String), which is simply adding to a map.

In any case, all we're doing is re-adding references to the same pipes, so the IllegalArgumentException can't fire (unless something else broke the flow) because we know we already have a valid pipe with 1 head.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, bad link:

https://github.com/cwensel/cascading/blob/wip-2.6/cascading-core/src/main/java/cascading/flow/FlowDef.java#L128

addSource(String, Tap) is not adding to a map, it is making sure that the key is not already present. Am I missing something?

I am not sure your code ever does this. Does it? But, ultimately, we want Equiv.equiv(x.mergeFrom(x), x) as a law, where Equiv[FlowDef] is set/map equality of all the structure of the flow.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually:

  val y = x.copy
  x.mergeFrom(x)
  Equiv.equiv(y, x) // damn mutable variables

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as you're copying over all sources, as in mergeFrom, you should get map equality for getSources and the rest. The only place where you might have a problem is if you build up your own set of sources, in which case you have to ensure this property yourself -- as we do here in onlyUpstreamFlow.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, we can merge with this now, but we need to add an issue to fix it.

mergeFrom should be idempotent, or it will be very difficult to use in interesting cases, specifically for the case I have in mind for making TypedPipe referentially transparent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. So you're proposing making a deterministic choice when keys collide?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the keys can collide with the same value, in that case there is no choice to make. You can still error if there are keys that collide, but the values are not equal.

@bholt
Copy link
Contributor Author

bholt commented Jun 30, 2014

@johnynek, I can't think of anything to do to make RichFlowDef safer. let me know if you think of anything I should fix/change/investigate.


joined.write(TypedTsv("final_out.tsv"))
// run the overall flow (with the 'final_out' sink), uses snapshot 's1'
run
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this test run as part of the travis build? It looks more like an example than a unit test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, it currently won't be run. My plan is to test using the new Execution stuff in the next PR. I'd be happy to track these plans in Issues.

johnynek added a commit that referenced this pull request Jul 1, 2014
Snapshot a pipe in the REPL
@johnynek johnynek merged commit fe566cd into twitter:develop Jul 1, 2014
@johnynek
Copy link
Collaborator

johnynek commented Jul 1, 2014

Okay. Merged. Can you make issues for the next items you have?

I'd like to have a unit test running to make sure we don't break things.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Enable the Scalding REPL to run a pipe more than once
5 participants