Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snapshot a pipe in the REPL #918

Merged
merged 35 commits into from
Jul 1, 2014
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
3d737f8
playing around with things
Jun 14, 2014
19ec513
add 'snapshot' method to repl; saves a TypedPipe to a SequenceFile
Jun 23, 2014
d837f7e
remove commented-out lines
Jun 23, 2014
5e347d1
snapshot successfully creates new flowDef
Jun 24, 2014
b6b5ebf
much simpler code to find sources and add them to newFlow
Jun 24, 2014
4924b3b
factor out into 'localizedFlow()', make repl jobs take an optional Fl…
Jun 24, 2014
66f87ea
implement writeAndRun
Jun 25, 2014
f13ff17
get rid of added code in TypedPipe and ReplImplicits
Jun 25, 2014
af8b3ff
remove unnecessary write() override
Jun 25, 2014
bd9c0f4
refactor writeFrom to not break compatibility
Jun 25, 2014
ea3b9a3
explain localizedFlow method
Jun 25, 2014
e262328
refactor snapshot enrichment into ShellTypedPipe for better type infe…
Jun 26, 2014
6ac5051
add ReplTest (currently just checks to see if things compile)
Jun 26, 2014
c844f41
comments
Jun 26, 2014
acbcb2b
explain tests a bit
Jun 26, 2014
2733a88
rename reachable -> upstream, move pipe methods into ShellTypedPipe
bholt Jun 26, 2014
3b85bb2
revert whitespace changes in TypedPipe
bholt Jun 26, 2014
3f6920e
easy fixes ({}, new FlowDef, toSet)
Jun 26, 2014
9102d4c
get rid of 'writeFromAndGetTail' and instead just get the last pipe i…
Jun 26, 2014
932ce34
fix upstream heads filter, remove unnecessary import
Jun 26, 2014
36e55d9
move 'upstreamPipes' and 'localizedFlow' to RichPipe
Jun 26, 2014
e9e7a1c
make `.localizedFlow` handle non-tail pipes, use this in `save` and `…
Jun 27, 2014
1f015ed
decidedly terrible (and possibly overkill) FlowDef sanitation on `run…
Jun 27, 2014
f266f89
add RichFlowDef enrichment
Jun 27, 2014
68049e8
move 'getJob','run' to ReplImplicits (don't need to be an enrichment)…
Jun 27, 2014
63ba01d
add return types
Jun 27, 2014
c01a188
get rid of ShellObj, move toList as-is into ShellTypedPipe (with TODO…
Jun 27, 2014
759bd86
fixes after getting rid of ShellObj
Jun 27, 2014
a01ef97
move `localizedFlow` method into RichFlowDef as `onlyUpstreamFrom(pipe)`
Jun 27, 2014
48693dc
accidentally checked in change to project/Build
Jun 27, 2014
fa2bf25
make merge return Unit, explain mergeMisc in comment
Jun 27, 2014
70a46c7
rename merge -> mergeFrom, refactor out 'heads'
Jun 28, 2014
4b53fd3
delete broken `toList`
bholt Jun 30, 2014
875af09
add TODO's for `toList` and `dump`
bholt Jun 30, 2014
70fa0a1
add TODO for handling checkpoints
bholt Jun 30, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions scalding-core/src/main/scala/com/twitter/scalding/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,9 @@ abstract class Source extends java.io.Serializable {
}

/**
* write the pipe and return the input so it can be chained into
* the next operation
* Write the given pipe and return the new pipe which was added as the tail
*/
def writeFrom(pipe: Pipe)(implicit flowDef: FlowDef, mode: Mode) = {
def writeFromAndGetTail(pipe: Pipe)(implicit flowDef: FlowDef, mode: Mode) = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a return type to public methods (this old code didn't follow that rule).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes me pretty nervous, as this breaks old code. Anyone that overrides writeFrom will not have this behavior, right? Is there a way to be compatible?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we just look through the tals in the flowDef for the sinkName to get the pipe back?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will break code using writeFromAndGetTail directly when writeFrom is overridden. So far that just entails the REPL. Though we did talk yesterday about maybe looking through the sinks after the call for the new one and working from that. It would seem to be a bit more maybe janky, but far less of an impact on other code and robust to overrides too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good points. I'll see if I can come up with the least-brittle way to get the right tail pipe.

checkFlowDefNotNull

//insane workaround for scala compiler bug
Expand All @@ -124,7 +123,17 @@ abstract class Source extends java.io.Serializable {
case (test: TestMode, false) => pipe
case _ => transformForWrite(pipe)
}
flowDef.addTail(new Pipe(sinkName, newPipe))
val finalPipe = new Pipe(sinkName, newPipe)
flowDef.addTail(finalPipe)
finalPipe
}

/**
* write the pipe but return the input so it can be chained into
* the next operation
*/
def writeFrom(pipe: Pipe)(implicit flowDef: FlowDef, mode: Mode) = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a return type please.

writeFromAndGetTail(pipe)
pipe
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package com.twitter.scalding

import cascading.flow.FlowDef
import cascading.pipe.Pipe
import typed.KeyedListLike

/**
* Object containing various implicit conversions required to create Scalding flows in the REPL.
Expand Down Expand Up @@ -132,8 +133,18 @@ object ReplImplicits extends FieldConversions {
* @return a ShellPipe wrapping the specified Pipe.
*/
implicit def pipeToShellPipe(pipe: Pipe): ShellObj[Pipe] = new ShellObj(pipe)
implicit def typedPipeToShellPipe[T](pipe: TypedPipe[T]): ShellObj[TypedPipe[T]] =
new ShellObj(pipe)
implicit def keyedListToShellPipe[K, V](pipe: KeyedList[K, V]): ShellObj[KeyedList[K, V]] =
new ShellObj(pipe)

/**
* Convert KeyedListLike to enriched ShellTypedPipe
* (e.g. allows .snapshot to be called on Grouped, CoGrouped, etc)
*/
implicit def keyedListLikeToShellTypedPipe[K, V, T[K, +V] <: KeyedListLike[K, V, T]](kll: KeyedListLike[K, V, T]) = new ShellTypedPipe(kll.toTypedPipe)

/**
* Enrich TypedPipe for the shell
* (e.g. allows .snapshot to be called on it)
*/
implicit def typedPipeToShellTypedPipe[T](pipe: TypedPipe[T]): ShellTypedPipe[T] =
new ShellTypedPipe[T](pipe)

}
85 changes: 78 additions & 7 deletions scalding-repl/src/main/scala/com/twitter/scalding/ShellPipe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
package com.twitter.scalding

import cascading.flow.Flow
import cascading.flow.FlowDef
import cascading.pipe.Pipe
import java.util.UUID
import com.twitter.scalding.ReplImplicits._

/**
* Adds ability to run a pipe in the REPL.
Expand All @@ -30,12 +33,12 @@ class ShellObj[T](obj: T) {
* @param args that should be used to construct the job.
* @return a job that can be used to run the data pipeline.
*/
private[scalding] def getJob(args: Args, inmode: Mode): Job = new Job(args) {
private[scalding] def getJob(args: Args, inmode: Mode, inFlowDef: FlowDef): Job = new Job(args) {
/**
* The flow definition used by this job, which should be the same as that used by the user
* when creating their pipe.
*/
override val flowDef = ReplImplicits.flowDef
override val flowDef = inFlowDef

override def mode = inmode

Expand Down Expand Up @@ -89,24 +92,92 @@ class ShellObj[T](obj: T) {
*/
override def buildFlow: Flow[_] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this override is just calling super. Let's drop that.

val flow = super.buildFlow
ReplImplicits.resetFlowDef()
flow
}
}

/**
* Runs this pipe as a Scalding job.
*/
def run() {
val args = new Args(Map())
getJob(args, ReplImplicits.mode).run
}
def run(inFlowDef: FlowDef = ReplImplicits.flowDef) =
getJob(new Args(Map()), ReplImplicits.mode, inFlowDef).run

def toList[R](implicit ev: T <:< TypedPipe[R], manifest: Manifest[R]): List[R] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs to be totally redone.

  1. we should use the SequenceFile storage.
  2. we should use your .save code, not the run above.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in fact, it should be on ShellTypedPipe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I was thinking the opposite: this ensures that only the sources that are upstream from sinks are included. The snapshot and save methods are special cases were we drop all the other sinks and just do the new one. But I don't yet know how to make this clean.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this go in ShellTypedPipe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's from before, and should probably just be left for now. Another PR should fix it with some real support for .toIterator, .dump, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could move it for now, though.

import ReplImplicits._
ev(obj).toPipe("el").write(Tsv("item"))
run()
TypedTsv[R]("item").toIterator.toList
}

}

/**
* Enrichment on TypedPipes allowing them to be run locally, independent of the overall flow.
* @param pipe to wrap
*/
class ShellTypedPipe[T](pipe: TypedPipe[T]) extends ShellObj[TypedPipe[T]](pipe) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these functions are way too useful to be put here.

Let's move them to object RichPipe or somewhere else in scalding-core. I can think of a few uses of these methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're referring to the two operations on pipes (localizedFlow and upstreamPipes)?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I am.


/**
* Iterator for all pipes reachable from this pipe (recursively using 'Pipe.getPrevious')
*/
def upstreamPipes(inpipe: Pipe): Iterator[Pipe] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iterators can be dangerous types (due to mutability) to return, and I generally only like them for performance critical code. Can we make this a List or a Set? By putting toSet at the last line, Set[Pipe] should be returned, and I think that a Set is what we want, right? In case there is a diamond in the dag?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess not, this is the transitive closure of this and the previous.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I'll note that in the comments.

Iterator
.iterate(Seq(inpipe))(pipes => for (pipe <- pipes; prev <- pipe.getPrevious) yield prev)
.takeWhile(_.length > 0)
.flatten

/**
* Construct a new FlowDef for only the flow that ends with the given pipe.
* That is, it copies over only the sources and sinks that contribute to the
* flow, allowing repl users to build up flows incrementally.
*/
def localizedFlow(tailPipe: Pipe): FlowDef = {
val newFlow = getEmptyFlowDef
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the same as new FlowDef, can you comment to say why not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, I don't want the name-change that ReplImplicits.getEmptyFlowDef does. Changed to new FlowDef


val sourceTaps = flowDef.getSources
val newSrcs = newFlow.getSources

upstreamPipes(tailPipe)
.filter(_.getParent == null)
.flatMap(_.getHeads)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/Cascading/cascading/blob/2.5/cascading-core/src/main/java/cascading/pipe/Pipe.java#L345

This seems redundant. If getParent == null, getHeads == Seq(this) right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, you're right.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it turns out that all of these have "getParent == null", so really the non-redundant criteria should be (_.getPrevious.length == 0).

.foreach(head =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer { } for a closure this large.

if (!newSrcs.containsKey(head.getName))
newFlow.addSource(head, sourceTaps.get(head.getName)))

newFlow.addTailSink(tailPipe, flowDef.getSinks.get(tailPipe.getName))

newFlow
}

/**
* Shorthand for .write(dest).run
*/
def save(dest: TypedSink[T] with Mappable[T]): TypedPipe[T] = {

val d = dest
val thisPipe = pipe.toPipe(d.sinkFields)(d.setter)
val outPipe = d.writeFromAndGetTail(thisPipe)

run(localizedFlow(outPipe))

TypedPipe.from(d)
}

/**
* Save snapshot of a typed pipe to a temporary sequence file.
* @return A TypedPipe to a new Source, reading from the sequence file.
*/
def snapshot: TypedPipe[T] = {
import ReplImplicits._
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you comment what this is getting?


// come up with unique temporary filename
// TODO: refactor into TemporarySequenceFile class
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

val tmpSeq = "/tmp/scalding-repl/snapshot-" + UUID.randomUUID() + ".seq"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val outPipe = SequenceFile(tmpSeq, 'record).writeFromAndGetTail(pipe.toPipe('record))

run(localizedFlow(outPipe))

TypedPipe.fromSingleField[T](SequenceFile(tmpSeq))
}

}
44 changes: 44 additions & 0 deletions scalding-repl/src/test/scala/com/twitter/scalding/ReplTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
Copyright 2012 Twitter, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.twitter.scalding

object ReplTest {
import ReplImplicits._

def test() {
val hello = TypedPipe.from(TextLine("tutorial/data/hello.txt"))

val wordScores =
TypedPipe.from(OffsetTextLine("tutorial/data/words.txt"))
.map{ case (offset, word) => (word, offset) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this just here to be explicit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reordering so it will group by word. Is that not needed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh whoops totally misread that haha. carry on...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could do _.swap but this is fine

.group

// snapshot intermediate results without wiring everything up
val s1 = hello.snapshot

val s2 = hello.save(TypedTsv("dump.tsv"))

// use snapshot in further flows
val linesByWord = s1.flatMap(_.split("\\s+")).groupBy(_.toLowerCase)
val counts = linesByWord.size

// ensure snapshot enrichment works on KeyedListLike (CoGrouped, UnsortedGrouped), too
val s3 = counts.snapshot
val s4 = linesByWord.join(wordScores).snapshot
}

}