Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Config to openForRead #1023

Merged
merged 3 commits into from
Aug 18, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions scalding-core/src/main/scala/com/twitter/scalding/Job.scala
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ class Job(val args: Args) extends FieldConversions with java.io.Serializable {
.toMap.toMap // the second one is to lift from String -> AnyRef
}

/**
* This is here so that Mappable.toIterator can find an implicit config
*/
implicit protected def scaldingConfig: Config = Config.tryFrom(config).get

def skipStrategy: Option[FlowSkipStrategy] = None

/**
Expand Down
23 changes: 15 additions & 8 deletions scalding-core/src/main/scala/com/twitter/scalding/Mode.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@ trait Mode extends java.io.Serializable {
* Using a new FlowProcess, which is only suitable for reading outside
* of a map/reduce job, open a given tap and return the TupleEntryIterator
*/
def openForRead(tap: Tap[_, _, _]): TupleEntryIterator
def openForRead(config: Config, tap: Tap[_, _, _]): TupleEntryIterator

@deprecated("A Config is needed, especially if any kryo serialization has been used", "0.12.0")
final def openForRead(tap: Tap[_, _, _]): TupleEntryIterator =
openForRead(Config.defaultFrom(this), tap)

// Returns true if the file exists on the current filesystem.
def fileExists(filename: String): Boolean
/** Create a new FlowConnector for this cascading planner */
Expand Down Expand Up @@ -117,11 +122,11 @@ trait HadoopMode extends Mode {
}

// TODO unlike newFlowConnector, this does not look at the Job.config
override def openForRead(tap: Tap[_, _, _]) = {
override def openForRead(config: Config, tap: Tap[_, _, _]) = {
val htap = tap.asInstanceOf[Tap[JobConf, _, _]]
val conf = new JobConf(jobConf)
// copy over Config defaults
Config.default.toMap.foreach{ case (k, v) => conf.set(k, v) }
val conf = new JobConf(false) // initialize an empty config
// copy over Config
config.toMap.foreach{ case (k, v) => conf.set(k, v) }
val fp = new HadoopFlowProcess(conf)
htap.retrieveSourceFields(fp)
htap.sourceConfInit(fp, conf)
Expand All @@ -133,9 +138,11 @@ trait CascadingLocal extends Mode {
override def newFlowConnector(conf: Config) =
new LocalFlowConnector(conf.toMap.toMap[AnyRef, AnyRef].asJava)

override def openForRead(tap: Tap[_, _, _]) = {
override def openForRead(config: Config, tap: Tap[_, _, _]) = {
val ltap = tap.asInstanceOf[Tap[Properties, _, _]]
val fp = new LocalFlowProcess
val props = new java.util.Properties
config.toMap.foreach { case (k, v) => props.setProperty(k, v) }
val fp = new LocalFlowProcess(props)
ltap.retrieveSourceFields(fp)
ltap.openForRead(fp)
}
Expand Down Expand Up @@ -193,7 +200,7 @@ case class HadoopTest(@transient conf: Configuration,
// Now fill up this buffer with the content of the file
val path = getWritePathFor(src)
// We read the write tap in order to add its contents in the test buffers
val it = openForRead(src.createTap(Write)(this))
val it = openForRead(Config.defaultFrom(this), src.createTap(Write)(this))
while (it != null && it.hasNext) {
buf += new Tuple(it.next.getTuple)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ abstract class Source extends java.io.Serializable {
@deprecated("replace with Mappable.toIterator", "0.9.0")
def readAtSubmitter[T](implicit mode: Mode, conv: TupleConverter[T]): Stream[T] = {
val tap = createTap(Read)(mode)
mode.openForRead(tap).asScala.map { conv(_) }.toStream
mode.openForRead(Config.defaultFrom(mode), tap).asScala.map { conv(_) }.toStream
}
}

Expand Down Expand Up @@ -179,10 +179,10 @@ trait Mappable[+T] extends Source with TypedSource[T] {
* Allows you to read a Tap on the submit node NOT FOR USE IN THE MAPPERS OR REDUCERS.
* Typical use might be to read in Job.next to determine if another job is needed
*/
def toIterator(implicit mode: Mode): Iterator[T] = {
def toIterator(implicit config: Config, mode: Mode): Iterator[T] = {
val tap = createTap(Read)(mode)
val conv = converter
mode.openForRead(tap).asScala.map { te => conv(te.selectEntry(sourceFields)) }
mode.openForRead(config, tap).asScala.map { te => conv(te.selectEntry(sourceFields)) }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ class TypedPipeInst[T] private[scalding] (@transient inpipe: Pipe,
// for us. So unwind until you hit the first filter, snapshot,
// then apply the unwound functions
case Some((tap, fields, Converter(conv))) =>
Execution.from(m.openForRead(tap).asScala.map(tup => conv(tup.selectEntry(fields))))
Execution.from(m.openForRead(conf, tap).asScala.map(tup => conv(tup.selectEntry(fields))))
case _ => forceToDiskExecution.flatMap(_.toIteratorExecution)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class FileSourceTest extends Specification {
}
"TextLine.toIterator" should {
"correctly read strings" in {
TextLine("../tutorial/data/hello.txt").toIterator(Local(true)).toList must be_==(
TextLine("../tutorial/data/hello.txt").toIterator(Config.default, Local(true)).toList must be_==(
List("Hello world", "Goodbye world"))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ case class HadoopPlatformJobTest(
sink(TypedTsv[T](location))(toExpect)

def sink[T](in: Mappable[T])(toExpect: Seq[T] => Unit): HadoopPlatformJobTest =
copy(sourceReaders = sourceReaders :+ { m: Mode => toExpect(in.toIterator(m).toSeq) })
copy(sourceReaders = sourceReaders :+ { m: Mode => toExpect(in.toIterator(Config.defaultFrom(m), m).toSeq) })

def inspectCompletedFlow(checker: Flow[JobConf] => Unit): HadoopPlatformJobTest =
copy(flowCheckers = flowCheckers :+ checker)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,4 +220,5 @@ object ReplImplicitContext {
implicit def flowDefImpl = ReplImplicits.flowDef
/** Defaults to running in local mode if no mode is specified. */
implicit def modeImpl = ReplImplicits.mode
implicit def configImpl = ReplImplicits.config
}