Skip to content

Scalding REPL

Brandon Holt edited this page Aug 14, 2014 · 9 revisions

In addition to production batch jobs, Scalding can be run interactively to help develop ad-hoc queries or give beginners a chance to learn about the platform step-by-step.

The Tutorial will walk through using the REPL in depth. Users already familiar with Scalding may wish to skip to the summary of REPL functionality below.

Interactive Scalding

Overview of REPL-specific features, for users already familiar with Scalding. New users should go to the Tutorial below.

Quick Start

Starting up the REPL in local mode is as easy as:

> ./sbt scalding-repl/console

To run full HDFS mode, build the assembly jar:

> ./sbt assembly
> ./scripts/scald.rb

Enrichments available on TypedPipe/Grouped/CoGrouped objects:

  • .save(dest: TypedSink[T]): TypedPipe[T]: runs just the flow to produce this pipe and saves to the given sink, returns a new pipe reading from the source
  • .snapshot: TypedPipe[T]: Runs just the part of the flow needed to produce the output for this pipe, saves the results to a SequenceFile (if in Hadoop mode), or to memory (in Local mode), and returns a new TypedPipe which reads from the new source.
  • .toIterator: Returns an iterator. If there is any work to be done to produce the values, it will automatically call snapshot first and return an iterator on the snapshot instead.
  • .dump: Print the contents to stdout (uses .toIterator)
  • .toList: Load the contents into an in-memory List (uses .toIterator)

Additionally, ValuePipe gets the enrichment .toOption which is like toIterator but for a single value.

Repl globals

Everything in a REPL session shares a common FlowDef to allow users to build up a large complicated flow step-by-step and then run it in its entirety. When building a job this way, use write (rather than save) to add sinks to the global flow.

Importing ReplImplicits._ makes the following commands available:

  • run: Runs the overall flow. Trims anything not feeding into a persistent sink (created by write). Note: any sinks created but not connected up will cause this to fail. Extra sources, however, are trimmed.
  • resetFlowDef: Clear the global flowDef. Will require that all pipes/sources/sinks be re-created.

ReplImplicits also includes the execution context, which can be tweaked manually:

  • mode: Local or Hdfs mode (determined initially by how the repl was launched but can be changed later)
  • config: Various configuration parameters, including Hadoop configuration parameters specified on the command line, by the Hadoop cluster, or by Scalding.

Customizing the REPL

  • On startup, the REPL searches for files named .scalding_repl from the current directory up to the filesystem root and automatically loads them into the current session in order of specificity. Common definitions, values, etc can be put in these files. For instance, one can specify global defaults in their home directory (~/.scalding_repl), and project-specific settings in the project directory (~/workspace/scalding/.scalding_repl).

  • ScaldingShell.prompt: () => String: A function to produce the prompt displayed by Scalding. This can also be customized. For instance, to print the current directory at the prompt:

    ScaldingShell.prompt = { () =>
      "scalding(" + System.getProperty("user.dir").replace(System.getProperty("user.home"),"~") + ")>"
    }

Tutorial

Assuming you've checked out the Scalding code, the fastest way to start running Scalding is to launch the REPL in Local mode. Simply run:

> ./sbt scalding-repl/console

It will spend some time downloading and compiling, but should eventually show:

[info] Set current project to scalding (in build file:/Users/bholt/hub/scalding/)
import com.twitter.scalding._
import com.twitter.scalding.ReplImplicits._
import com.twitter.scalding.ReplImplicitContext._
...
scala> 

As you can see, we've imported a bunch of Scalding code, including implicits that make it easier to run jobs interactively. Several of these enhancements are "enrichments" on Scalding's TypedPipe. The tutorial will go into them in more detail.

Let's take a look at some.

Viewing pipe contents

First, dump allows you to print what's in a TypedPipe:

// load a plain text file into a TypedPipe
scala> val hello = TypedPipe.from(TextLine("tutorial/data/hello.txt"))
hello: TypedPipe[String] = TypedPipeInst(/*...tutorial/data/hello.txt...*/)

scala> hello.dump
Hello world
Goodbye world

We can also load the contents into memory as a list:

scala> val lst = hello.toList
lst: List[String] = List(Hello world, Goodbye world)

In fact, both dump and toList use another TypedPipe enrichment, toIterator, which can be used directly if one wishes to stream over the data and perform some operation.

Remember that each of these operations run locally, so if you are actually working with large datasets, ensure that you do most of your work using TypedPipe operations, which will run through Cascading/Hadoop, and only call toIterator (or dump, or toList) once you've filtered the results sufficiently. Some helpful TypedPipe operations to downsize datasets are limit and sample.

Save

Usually in Scalding, results are written to a Sink using .write(). However, the write only happens when the flow is actually run. That way you can build up a complicated flow with multiple sources and sinks and intermediate calculations and run it as one big job. When running interactively, however, we want a way to immediately run part of a job to see if it's correct before moving on. When running in the REPL, save can be used in place of write to immediately run that pipe and write out the results.

scala> val hello = TypedPipe.from(TextLine("tutorial/data/hello.txt"))
hello: TypedPipe[String] = TypedPipeInst(/*...tutorial/data/hello.txt...*/)

scala> val words = hello.flatMap(_.split("\\s+")).save(TypedTsv("words.tsv"))
INFO flow.Flow: [ScaldingShell] starting
INFO flow.Flow: [ScaldingShell]  source: FileTap["tutorial/data/hello.txt"]
INFO flow.Flow: [ScaldingShell]  sink: FileTap["words.tsv"]
INFO flow.Flow: [ScaldingShell]  parallel execution is enabled: true
INFO flow.Flow: [ScaldingShell]  starting jobs: 1
INFO flow.Flow: [ScaldingShell]  allocating threads: 1
INFO flow.FlowStep: [ScaldingShell] starting step: local
words: TypedPipe[String] = /* FixedPathTypedDelimited(List(words.tsv)) */

Conviently, this returns a new pipe reading from the file which can be used in subsequent computations.

Snapshots

When working interactively with large datasets, it is often useful to be able to save and re-use data that took significant work to generate. The Scalding REPL provides an easy way to explicitly save intermediate results and use these results in subsequent interactive work.

The snapshot enrichment on TypedPipe runs everything necessary to generate the output for the given pipe and saves it to temporary storage. In Local mode, it actually just saves it in memory; in Hadoop, it writes it out to a temporary SequenceFile. Scalding then returns a handle to this new Source as a new TypedPipe.

Snapshots should only be used within a single REPL session. In local mode, you have no choice because they only exist in memory. But even for Hadoop mode, it is not a good idea to try to re-use generated snapshots, as Kryo serialization is unstable and the temporary files may be cleaned up when not in use.

// (using 'hello' TypedPipe declared above)
// split into words, returns TypedPipe all set to do the work (but not run yet)
scala> val words = hello.flatMap(_.split("\\s+")).map(_.toLowerCase)
words: TypedPipe[String] = TypedPipeInst(/*...tutorial/data/hello.txt...*/)

// save a snapshot
scala> val s = words.snapshot
INFO flow.Flow: [ScaldingShell] starting
INFO flow.Flow: [ScaldingShell]  source: FileTap["tutorial/data/hello.txt"]
INFO flow.Flow: [ScaldingShell]  sink: MemoryTap
INFO flow.Flow: [ScaldingShell]  parallel execution is enabled: true
INFO flow.Flow: [ScaldingShell]  starting jobs: 1
INFO flow.Flow: [ScaldingShell]  allocating threads: 1
INFO flow.FlowStep: [ScaldingShell] starting step: local
s: TypedPipe[String] = IterablePipe(/*...*/)

// now take a look at what's in the snapshot
scala> s.dump
hello
world
goodbye
world

You can see to create the snapshot, it ran a small Cascading job, and returned a TypedPipe with the result to us. Now, rather than spending that enormous amount of time to re-generate "words", we can use s, our snapshot, to, for a change, count some words:

scala> val wordCount = s.map((_, 1)).sumByKey
wordCount: UnsortedGrouped[String,Int] = /*...*/

scala> wordCount.snapshot.dump
INFO flow.Flow: [ScaldingShell] starting
INFO flow.Flow: [ScaldingShell]  source: MemoryTap
INFO flow.Flow: [ScaldingShell]  sink: MemoryTap
INFO flow.Flow: [ScaldingShell]  parallel execution is enabled: true
INFO flow.Flow: [ScaldingShell]  starting jobs: 1
INFO flow.Flow: [ScaldingShell]  allocating threads: 1
INFO flow.FlowStep: [ScaldingShell] starting step: local
(goodbye,1)
(hello,1)
(world,2)

Notice above how running wordCount with snapshot used the MemoryTap (snapshot) created before as its source. If, instead, we wanted to run the entire job end-to-end, we simply need to bypass the snapshots and use the original pipes:

// remember, 'words' was a TypedPipe which reads from the original text file
scala> words
res0: TypedPipe[String] = /*... TextLineWrappedArray(tutorial/data/hello.txt) */
scala> words.map((_, 1)).sumByKey.dump
INFO flow.Flow: [ScaldingShell] starting
INFO flow.Flow: [ScaldingShell]  source: FileTap["tutorial/data/hello.txt"]
INFO flow.Flow: [ScaldingShell]  sink: MemoryTap
INFO flow.Flow: [ScaldingShell]  parallel execution is enabled: true
INFO flow.Flow: [ScaldingShell]  starting jobs: 1
INFO flow.Flow: [ScaldingShell]  allocating threads: 1
INFO flow.FlowStep: [ScaldingShell] starting step: local
(goodbye,1)
(hello,1)
(world,2)

See, that time the "source" was the original text file.

Implicit snapshots

The previous example actually pulled out a subtle additional trick. Calling dump on the result of sumByKey ran the flow and printed the results. Because you may be dealing with large datasets, we want to run these pipes using Cascading in Hadoop. The toIterator method, which is called by dump, doesn't want to have to iterate over excessively large datasets locally, so it calls snapshot, which runs the flow in Cascading, and then toIterator can iterate over the snapshot. However, each call to dump as above will need to re-run, because we aren't keeping a handle to the snapshot around. If a flow is taking a while to run, try saving the results to a snapshot first, then using that snapshot to further understand the data.

Running on Hadoop

So far, we've only been running in Cascading's Local mode, which emulates what Hadoop will do, but cannot handle actually large datasets. The easiest way to launch the REPL in Hadoop mode is to use the scald.rb script.

# must first create the massive assembly jar with all the code
scalding$ ./sbt assembly
# then launch the REPL
scalding$ ./scripts/scald.rb --repl --hdfs

Contents

Getting help

Documentation

Matrix API

Third Party Modules

Videos

How-tos

Tutorials

Articles

Other

Clone this wiki locally