Skip to content

Commit

Permalink
Merge branch 'release/0.11.2'
Browse files Browse the repository at this point in the history
  • Loading branch information
Brandon Holt committed Aug 7, 2014
2 parents c4aef4f + 3132309 commit 2b1220f
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 31 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Scalding #

### Version 0.11.2 ###
* hadoop.tmp.dir for snapshot in config

### Version 0.11.1 ###
* Fixes bad release portion where code wasn't updated for new scalding version number.
* use cascading-jdbc 2.5.3 for table exists fix and cascading 2.5.5: https://github.com/twitter/scalding/pull/951
Expand Down
2 changes: 1 addition & 1 deletion scalding-core/src/main/scala/com/twitter/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ package object scalding {
/**
* Make sure this is in sync with version.sbt
*/
val scaldingVersion: String = "0.11.1"
val scaldingVersion: String = "0.11.2"

object RichPathFilter {
implicit def toRichPathFilter(f: PathFilter) = new RichPathFilter(f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,45 +49,47 @@ object ReplImplicits extends FieldConversions {
fd
}

def replConfig = {
val conf = Config.default ++ {
mode match {
case h: HadoopMode => Config.fromHadoop(h.jobConf)
case _ => Config.empty
}
}
// Create a jar to hold compiled code for this REPL session in addition to
// "tempjars" which can be passed in from the command line, allowing code
// in the repl to be distributed for the Hadoop job to run.
val replCodeJar = ScaldingShell.createReplCodeJar()
val tmpJarsConfig: Map[String, String] =
replCodeJar match {
case Some(jar) =>
Map("tmpjars" -> {
// Use tmpjars already in the configuration.
conf.get("tmpjars").map(_ + ",").getOrElse("")
// And a jar of code compiled by the REPL.
.concat("file://" + jar.getAbsolutePath)
})
case None =>
// No need to add the tmpjars to the configuration
Map()
}

conf ++ tmpJarsConfig
}

/**
* Runs this pipe as a Scalding job.
*
* Automatically cleans up the flowDef to include only sources upstream from tails.
*/
def run(implicit fd: FlowDef, md: Mode): Option[JobStats] = {

def config = {
val conf = Config.default

// Create a jar to hold compiled code for this REPL session in addition to
// "tempjars" which can be passed in from the command line, allowing code
// in the repl to be distributed for the Hadoop job to run.
val replCodeJar = ScaldingShell.createReplCodeJar()
val tmpJarsConfig: Map[String, String] =
replCodeJar match {
case Some(jar) =>
Map("tmpjars" -> {
// Use tmpjars already in the configuration.
conf.get("tmpjars").map(_ + ",").getOrElse("")
// And a jar of code compiled by the REPL.
.concat("file://" + jar.getAbsolutePath)
})
case None =>
// No need to add the tmpjars to the configuration
Map()
}

conf ++ tmpJarsConfig
}

ExecutionContext.newContext(config)(fd, md).waitFor match {
def run(implicit fd: FlowDef, md: Mode): Option[JobStats] =
ExecutionContext.newContext(replConfig)(fd, md).waitFor match {
case Success(stats) => Some(stats)
case Failure(e) =>
println("Flow execution failed!")
e.printStackTrace()
None
}
}

/**
* Converts a Cascading Pipe to a Scalding RichPipe. This method permits implicit conversions from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ class ShellTypedPipe[T](pipe: TypedPipe[T]) {
case _: HadoopMode =>
// come up with unique temporary filename
// TODO: refactor into TemporarySequenceFile class
val tmpSeq = "/tmp/scalding-repl/snapshot-" + UUID.randomUUID + ".seq"
val conf = replConfig
val tmpDir = conf.get("hadoop.tmp.dir")
.orElse(conf.get("cascading.tmp.dir"))
.getOrElse("/tmp")

val tmpSeq = tmpDir + "/scalding-repl/snapshot-" + java.util.UUID.randomUUID + ".seq"
val dest = TypedSequenceFile[T](tmpSeq)
dest.writeFrom(p)(localFlow, md)
run(localFlow, md)
Expand Down
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@

version in ThisBuild := "0.11.1"
version in ThisBuild := "0.11.2"

0 comments on commit 2b1220f

Please sign in to comment.