Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snapshot a pipe in the REPL #918

Merged
merged 35 commits into from
Jul 1, 2014
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
3d737f8
playing around with things
Jun 14, 2014
19ec513
add 'snapshot' method to repl; saves a TypedPipe to a SequenceFile
Jun 23, 2014
d837f7e
remove commented-out lines
Jun 23, 2014
5e347d1
snapshot successfully creates new flowDef
Jun 24, 2014
b6b5ebf
much simpler code to find sources and add them to newFlow
Jun 24, 2014
4924b3b
factor out into 'localizedFlow()', make repl jobs take an optional Fl…
Jun 24, 2014
66f87ea
implement writeAndRun
Jun 25, 2014
f13ff17
get rid of added code in TypedPipe and ReplImplicits
Jun 25, 2014
af8b3ff
remove unnecessary write() override
Jun 25, 2014
bd9c0f4
refactor writeFrom to not break compatibility
Jun 25, 2014
ea3b9a3
explain localizedFlow method
Jun 25, 2014
e262328
refactor snapshot enrichment into ShellTypedPipe for better type infe…
Jun 26, 2014
6ac5051
add ReplTest (currently just checks to see if things compile)
Jun 26, 2014
c844f41
comments
Jun 26, 2014
acbcb2b
explain tests a bit
Jun 26, 2014
2733a88
rename reachable -> upstream, move pipe methods into ShellTypedPipe
bholt Jun 26, 2014
3b85bb2
revert whitespace changes in TypedPipe
bholt Jun 26, 2014
3f6920e
easy fixes ({}, new FlowDef, toSet)
Jun 26, 2014
9102d4c
get rid of 'writeFromAndGetTail' and instead just get the last pipe i…
Jun 26, 2014
932ce34
fix upstream heads filter, remove unnecessary import
Jun 26, 2014
36e55d9
move 'upstreamPipes' and 'localizedFlow' to RichPipe
Jun 26, 2014
e9e7a1c
make `.localizedFlow` handle non-tail pipes, use this in `save` and `…
Jun 27, 2014
1f015ed
decidedly terrible (and possibly overkill) FlowDef sanitation on `run…
Jun 27, 2014
f266f89
add RichFlowDef enrichment
Jun 27, 2014
68049e8
move 'getJob','run' to ReplImplicits (don't need to be an enrichment)…
Jun 27, 2014
63ba01d
add return types
Jun 27, 2014
c01a188
get rid of ShellObj, move toList as-is into ShellTypedPipe (with TODO…
Jun 27, 2014
759bd86
fixes after getting rid of ShellObj
Jun 27, 2014
a01ef97
move `localizedFlow` method into RichFlowDef as `onlyUpstreamFrom(pipe)`
Jun 27, 2014
48693dc
accidentally checked in change to project/Build
Jun 27, 2014
fa2bf25
make merge return Unit, explain mergeMisc in comment
Jun 27, 2014
70a46c7
rename merge -> mergeFrom, refactor out 'heads'
Jun 28, 2014
4b53fd3
delete broken `toList`
bholt Jun 30, 2014
875af09
add TODO's for `toList` and `dump`
bholt Jun 30, 2014
70fa0a1
add TODO for handling checkpoints
bholt Jun 30, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions scalding-core/src/main/scala/com/twitter/scalding/Dsl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ limitations under the License.
package com.twitter.scalding

import cascading.pipe.Pipe
import cascading.flow.FlowDef

/**
* This object has all the implicit functions and values that are used
Expand All @@ -29,4 +30,10 @@ import cascading.pipe.Pipe
*/
object Dsl extends FieldConversions with java.io.Serializable {
implicit def pipeToRichPipe(pipe: Pipe): RichPipe = new RichPipe(pipe)

/**
* Enrichment on FlowDef
*/
implicit def flowDefToRichFlowDef(fd: FlowDef): RichFlowDef = new RichFlowDef(fd)

}
101 changes: 101 additions & 0 deletions scalding-core/src/main/scala/com/twitter/scalding/RichFlowDef.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
Copyright 2014 Twitter, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.twitter.scalding

import cascading.flow.FlowDef
import cascading.pipe.Pipe

/**
* This is an enrichment-pattern class for cascading.flow.FlowDef.
* The rule is to never use this class directly in input or return types, but
* only to add methods to FlowDef.
*/
class RichFlowDef(val fd: FlowDef) {
// RichPipe and RichFlowDef implicits
import Dsl._

def copy: FlowDef = {
val newFd = new FlowDef
newFd.merge(fd)
newFd
}

private[scalding] def mergeMisc(o: FlowDef) {
fd.addTags(o.getTags)
fd.addTraps(o.getTraps)
fd.addCheckpoints(o.getCheckpoints)
fd.setAssertionLevel(o.getAssertionLevel)
fd.setName(o.getName)
fd
}

/**
* Mutate current flow def to add all sources/sinks/etc from given FlowDef
*/
def merge(o: FlowDef): FlowDef = {
fd.addSources(o.getSources)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like the add* methods FlowDef in cascading are not idempotent:

https://github.com/cwensel/cascading/blob/wip-2.6/cascading-core/src/main/java/cascading/flow/FlowDef.java#L153

Is that going to be a problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, addSources does look idempotent: it's just adding what is already in one map into another.

As for addSource(Pipe), it shouldn't be any different -- my understanding is that head pipes have the same name as their source, so addSource(Pipe,...) should be equivalent to addSource(name: String), which is simply adding to a map.

In any case, all we're doing is re-adding references to the same pipes, so the IllegalArgumentException can't fire (unless something else broke the flow) because we know we already have a valid pipe with 1 head.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, bad link:

https://github.com/cwensel/cascading/blob/wip-2.6/cascading-core/src/main/java/cascading/flow/FlowDef.java#L128

addSource(String, Tap) is not adding to a map, it is making sure that the key is not already present. Am I missing something?

I am not sure your code ever does this. Does it? But, ultimately, we want Equiv.equiv(x.mergeFrom(x), x) as a law, where Equiv[FlowDef] is set/map equality of all the structure of the flow.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually:

  val y = x.copy
  x.mergeFrom(x)
  Equiv.equiv(y, x) // damn mutable variables

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as you're copying over all sources, as in mergeFrom, you should get map equality for getSources and the rest. The only place where you might have a problem is if you build up your own set of sources, in which case you have to ensure this property yourself -- as we do here in onlyUpstreamFlow.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, we can merge with this now, but we need to add an issue to fix it.

mergeFrom should be idempotent, or it will be very difficult to use in interesting cases, specifically for the case I have in mind for making TypedPipe referentially transparent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. So you're proposing making a deterministic choice when keys collide?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the keys can collide with the same value, in that case there is no choice to make. You can still error if there are keys that collide, but the values are not equal.

fd.addSinks(o.getSinks)
fd.addTails(o.getTails)
fd.mergeMisc(o)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't like this. Just add all of it here. No need for two methods.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, you call it below. Nevermind... Can you add a comment avoid to explain the strange choice?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should return Unit to make it clear that there is mutation here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, .mergeFrom might make it clearer too.

fd
}

/**
* New flow def with only sources upstream from tails.
*/
def withoutUnusedSources: FlowDef = {
import collection.JavaConverters._

// find all heads reachable from the tails (as a set of names)
val heads = fd.getTails.asScala
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's break this out:

def heads: Set[Pipe] then filterKeys(head.map { p => p.getName }) note Sets are alread Functions, so you don't need to make another with contains.

.flatMap(_.getHeads).map(_.getName).toSet
// add taps associated with heads to localFlow
val filteredSources = fd.getSources.asScala.filterKeys(src => heads.contains(src)).asJava

val newFd = fd.copy
newFd.getSources.clear()
newFd.addSources(filteredSources)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this really work? What about checkpoints that point to sources? They will still be in there, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know about Cascading's Checkpoints. Does Scalding use them at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see, forceToDisk.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, I know this patch is a lot of work, but in fact I think it will
turn out to be one of the best contributions in quite a while when done.

I think this can actually fix a design flaw in scalding: that using pipe
and TypedPipe has side effects (mutating FlowDef) that are not easy to
reason about.

Especially with TypedPipe, when this code is all done, even outside of the
repl, we could use it to make TypedPipe referentially transparent, as it
should have been all along.

The trick is: keep a local FlowDef when we make a TypedPipe, and at write
or toPipe, we merge into the implicit one in that scope.

I think this can keep TypedPipe referentially transparent.

On Friday, June 27, 2014, Brandon Holt notifications@github.com wrote:

In scalding-core/src/main/scala/com/twitter/scalding/RichFlowDef.scala:

  • /**
  • * New flow def with only sources upstream from tails.
  • */
  • def withoutUnusedSources: FlowDef = {
  • import collection.JavaConverters._
  • // find all heads reachable from the tails (as a set of names)
  • val heads = fd.getTails.asScala
  •  .flatMap(_.getHeads).map(_.getName).toSet
    
  • // add taps associated with heads to localFlow
  • val filteredSources = fd.getSources.asScala.filterKeys(src => heads.contains(src)).asJava
  • val newFd = fd.copy
  • newFd.getSources.clear()
  • newFd.addSources(filteredSources)

Ah, I see, forceToDisk.


Reply to this email directly or view it on GitHub
https://github.com/twitter/scalding/pull/918/files#r14320751.

Oscar Boykin :: @posco :: http://twitter.com/posco


newFd
}

/**
* FlowDef that only includes things upstream from the given Pipe
*/
def onlyUpstreamFrom(pipe: Pipe): FlowDef = {
val newFd = new FlowDef
// don't copy any sources/sinks
newFd.mergeMisc(fd)

val sourceTaps = fd.getSources
val newSrcs = newFd.getSources

pipe.upstreamPipes
.filter(_.getPrevious.length == 0) // implies _ is a head
.foreach { head =>
if (!newSrcs.containsKey(head.getName))
newFd.addSource(head, sourceTaps.get(head.getName))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to check for Checkpoints in these pipes here, otherwise scalding's .forceToDisk will be ignored in these flows, which is not great.

At the very least, add a TODO for that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a TODO for this. Not sure what's supposed to happen -- I tried adding a checkpoint, but it seems to go into the flow as just another pipe (doesn't get added to the flowDef (.addCheckpoint), so it seems to me like it should work out the same in the new snapshot. But I could be misunderstanding it.

}

val sinks = fd.getSinks
if (sinks.containsKey(pipe.getName)) {
newFd.addTailSink(pipe, sinks.get(pipe.getName))
}

newFd
}

}
11 changes: 11 additions & 0 deletions scalding-core/src/main/scala/com/twitter/scalding/RichPipe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,17 @@ class RichPipe(val pipe: Pipe) extends java.io.Serializable with JoinAlgorithms
val setter = unpacker.newSetter(toFields)
pipe.mapTo(fields) { input: T => input } (conv, setter)
}

/**
* Set of pipes reachable from this pipe (transitive closure of 'Pipe.getPrevious')
*/
def upstreamPipes: Set[Pipe] =
Iterator
.iterate(Seq(pipe))(pipes => for (p <- pipes; prev <- p.getPrevious) yield prev)
.takeWhile(_.length > 0)
.flatten
.toSet

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,14 @@ abstract class Source extends java.io.Serializable {
}

/**
* write the pipe and return the input so it can be chained into
* write the pipe but return the input so it can be chained into
* the next operation
*/
def writeFrom(pipe: Pipe)(implicit flowDef: FlowDef, mode: Mode) = {
def writeFrom(pipe: Pipe)(implicit flowDef: FlowDef, mode: Mode): Pipe = {
checkFlowDefNotNull

//insane workaround for scala compiler bug
val sinks = flowDef.getSinks().asInstanceOf[JMap[String, Any]]
val sinks = flowDef.getSinks.asInstanceOf[JMap[String, Any]]
val sinkName = this.toString
if (!sinks.containsKey(sinkName)) {
sinks.put(sinkName, createTap(Write)(mode))
Expand All @@ -124,7 +124,8 @@ abstract class Source extends java.io.Serializable {
case (test: TestMode, false) => pipe
case _ => transformForWrite(pipe)
}
flowDef.addTail(new Pipe(sinkName, newPipe))
val outPipe = new Pipe(sinkName, newPipe)
flowDef.addTail(outPipe)
pipe
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package com.twitter.scalding

import cascading.flow.FlowDef
import cascading.pipe.Pipe
import typed.KeyedListLike

/**
* Object containing various implicit conversions required to create Scalding flows in the REPL.
Expand Down Expand Up @@ -46,6 +47,72 @@ object ReplImplicits extends FieldConversions {
fd
}

/**
* Gets a job that can be used to run the data pipeline.
*
* @param args that should be used to construct the job.
* @return a job that can be used to run the data pipeline.
*/
private[scalding] def getJob(args: Args, inmode: Mode, inFlowDef: FlowDef): Job = new Job(args) {
/**
* The flow definition used by this job, which should be the same as that used by the user
* when creating their pipe.
*/
override val flowDef = inFlowDef

override def mode = inmode

/**
* Obtains a configuration used when running the job.
*
* This overridden method uses the same configuration as a standard Scalding job,
* but adds options specific to KijiScalding, including adding a jar containing compiled REPL
* code to the distributed cache if the REPL is running.
*
* @return the configuration that should be used to run the job.
*/
override def config: Map[AnyRef, AnyRef] = {
// Use the configuration from Scalding Job as our base.
val configuration: Map[AnyRef, AnyRef] = super.config

/** Appends a comma to the end of a string. */
def appendComma(str: Any): String = str.toString + ","

// If the REPL is running, we should add tmpjars passed in from the command line,
// and a jar of REPL code, to the distributed cache of jobs run through the REPL.
val replCodeJar = ScaldingShell.createReplCodeJar()
val tmpJarsConfig: Map[String, String] =
if (replCodeJar.isDefined) {
Map("tmpjars" -> {
// Use tmpjars already in the configuration.
configuration
.get("tmpjars")
.map(appendComma)
.getOrElse("") +
// And a jar of code compiled by the REPL.
"file://" + replCodeJar.get.getAbsolutePath
})
} else {
// No need to add the tmpjars to the configuration
Map()
}

configuration ++ tmpJarsConfig
}

}

/**
* Runs this pipe as a Scalding job.
*
* Automatically cleans up the flowDef to include only sources upstream from tails.
*/
def run(implicit flowDef: FlowDef) = {
import Dsl.flowDefToRichFlowDef

getJob(new Args(Map()), ReplImplicits.mode, flowDef.withoutUnusedSources).run
}

/**
* Converts a Cascading Pipe to a Scalding RichPipe. This method permits implicit conversions from
* Pipe to RichPipe.
Expand Down Expand Up @@ -125,15 +192,16 @@ object ReplImplicits extends FieldConversions {
}

/**
* Converts a Cascading Pipe to a Scalding ShellPipe. This method permits implicit conversions
* from Pipe to ShellPipe.
*
* @param pipe to convert to a ShellPipe.
* @return a ShellPipe wrapping the specified Pipe.
* Convert KeyedListLike to enriched ShellTypedPipe
* (e.g. allows .snapshot to be called on Grouped, CoGrouped, etc)
*/
implicit def pipeToShellPipe(pipe: Pipe): ShellObj[Pipe] = new ShellObj(pipe)
implicit def typedPipeToShellPipe[T](pipe: TypedPipe[T]): ShellObj[TypedPipe[T]] =
new ShellObj(pipe)
implicit def keyedListToShellPipe[K, V](pipe: KeyedList[K, V]): ShellObj[KeyedList[K, V]] =
new ShellObj(pipe)
implicit def keyedListLikeToShellTypedPipe[K, V, T[K, +V] <: KeyedListLike[K, V, T]](kll: KeyedListLike[K, V, T]) = new ShellTypedPipe(kll.toTypedPipe)

/**
* Enrich TypedPipe for the shell
* (e.g. allows .snapshot to be called on it)
*/
implicit def typedPipeToShellTypedPipe[T](pipe: TypedPipe[T]): ShellTypedPipe[T] =
new ShellTypedPipe[T](pipe)

}
Loading