Skip to content


Subversion checkout URL

You can clone with
Download ZIP

API Reference

echen edited this page · 23 revisions


Getting help


Matrix API

Third Party Modules






Clone this wiki locally

Scalding functions can be divided into three types:

Map-like functions

Map-like functions operate over individual rows in a pipe, usually transforming them in some way. They are defined in RichPipe.scala.

map, flatMap

# -> additionalFields){function}

Adds new fields that are transformations of existing ones.

// In addition to the existing `speed` field, the new `fasterBirds`
// pipe will contain a new `doubledSpeed` field (plus any other 
// fields that `birds` already contained).
val fasterBirds ='speed -> 'doubledSpeed) { speed : Int => speed * 2 }

You can also map from and to multiple fields at once.

val britishBirds ='weightInLbs, 'heightInFt) -> ('weightInKg, 'heightInMeters)) {
    x : (Float, Float) =>
    val (weightInLbs, heightInFt) = x
    (0.454 * weightInLbs, 0.305 * heightInFt)

# pipe.flatMap(originalFields -> newFields){function}

Maps each element to a list (or an Option), and then flattens that list.

val words =
    .flatMap('text -> 'word) {
      text : String =>
      text.split("\\s+").map { word : String => word }

project, discard

# pipe.project(fields)

Remove all unspecified fields.

// The new pipe contains only two fields: `jobTitle` and `salary`.
val onlyKeepWorkInfo = people.project('jobTitle, 'salary)

# pipe.discard(fields)

Removes specified fields. discard is the opposite of project.

val forgetBirth = people.discard('birthplace, 'birthday)

mapTo, flatMapTo

# pipe.mapTo(existingFields -> newFields){function}

MapTo is equivalent to mapping and then projecting to the new fields (but it is more efficient). Thus, the following two lines produce the same result:

pipe.mapTo(existingFields -> newFields){ ... } -> newFields){ ... }.project(newFields)

Here is another example:

val savings =
  items.mapTo(('price, 'discountedPrice) -> 'savings) {
    x : (Float, Float) =>
    val (price, discountedPrice) = x
    price - discountedPrice
val savingsSame =
    .map(('price, 'discountedPrice) -> 'savings) {
      x : (Float, Float) =>
      val (price, discountedPrice) = x
      price - discountedPrice

# pipe.flatMapTo(originalFields -> newFields){function}

The flatMap analogue of mapTo.

val words =
  books.flatMapTo('text -> 'word) { 
    text : String => 
    text.split("\\s+").map { word : String => word } 


# pipe.filter(fields){function}

Filters out rows.

val birds = animals.filter('type) { type : String => type == "bird" }

You can also filter over multiple fields at once.

val fastAndTallBirds =
  birds.filter('speed, 'height) { 
    fields : (Float, Float) =>
    val (speed, height) = fields
    (speed > 100) && (height > 100)


# pipe.limit(number)

Allows only a fixed number of items to pass in a pipe.

val oneHundredPeople = people.limit(100)


# pipe.unique(fields)

Keeps only unique rows based on a specified set of fields.

This looks like a mapping function, but it actually requires a map-reduce pair, so doing this during one of your groupBy operations (if you can structure your algorithm to simultaneously do so) will save work.

// Keep only the unique (firstName, lastName) pairs. All other fields are discarded.
people.unique('firstName, 'lastName)

pack, unpack

# pipe.pack(Type)(fields -> object)

You can pack multiple fields into a single object, by using Java reflection. For now this only works for objects that have a default constructor that takes no arguments. The Java reflection only happens once for each field, so the performance should be very good.

For example suppose that you have a class called Person, with fields age and height, and setters setAge and setHeight. Then you can do the following to populate those fields:

val people = data.pack[Person](('age, 'height) -> 'person)

# pipe.unpack(Type)(object -> fields)

Conversely, you can unpack the contents of an object into multiple fields

val data = people.unpack[Person]('person -> ('age, 'height))

The default reflection-based unpacker works for case classes as well as standard Thrift- and Protobuf-generated classes.

If you want to use tuple packing and unpacking for objects that do not depend on Java reflection, then you need to implement the TuplePacker and TupleUnpacker abstract classes and define implicit conversions in the context of your Job class. See TuplePacker.scala for more.

Grouping/reducing functions

Grouping/reducing functions operate over groups of rows in a pipe, and usually involve a reduce phase. They are defined in GroupBuilder.scala.

Most of these functions were inspired by the scala.collection.Iterable API.

groupBy, groupAll

# pipe.groupBy(fields){ group => ... }

Groups your pipe by the values in the specified set of fields, and then applies a set of operations to the group to create a new set of fields.

// Create a new pipe with (word, count) fields.
val wordCounts = words.groupBy('word) { group => group.size }

Group operations chain together.

// Create a new pipe containing 
// (country, sex, # of people in country of sex, mean age sliced by country and sex).
val demographics = people.groupBy('country, 'sex) { _.size.average('age) }

# pipe.groupAll{ group => ... }

Creates a single group consisting of the entire pipe.

Think three times before using this function on Hadoop. This removes the ability to do any parallelism in the reducers. That said, accumulating a global variable may require it. Tip: if you need to bring this value to another pipe, try crossWithTiny (another function you should use with great care).

// vocabSize is now a pipe with a single entry, containing the total number of words in the vocabulary.
val vocabSize = wordCounts.groupAll { _.size }

groupAll is also useful if you want to sort a pipe immediately before outputting it.

val sortedPeople = people.groupAll { _.sortBy('lastName, 'firstName) }

grouping operations

# group.size(name)

Counts the number of rows in this group. By default, the name of the new field is size, but you can pass in a new name as well.

// The new `wordCounts` pipe contains "word" and "size" fields.
val wordCounts = words.groupBy('word) { _.size }

// Same, but calls the new field "count" instead of the default "size".
val wordCounts = words.groupBy('word) { _.size('count) }

# group.average(field)

Computes the mean over a field. By default, the new field has the same name as the original field, but you can pass in a new name as well.

// Find the mean age of boys vs. girls. The new pipe contains "sex" and "age" fields.
val demographics = people.groupBy('sex) { _.average('age) }

// Same, but call the new field "meanAge".
val demographics = people.groupBy('sex) { _.average('age -> 'meanAge) }

# group.mkString(field, joiner)

Turns a column in the group into a string. Again, the new field has the same name as the original field by default, but you can also pass in a new name.

// Take all the words with a given count and join them with a comma.
wordCounts.groupBy('count) { _.mkString('word, ",") }

// Same, but call the new column "words".
wordCounts.groupBy('count) { _.mkString('word -> 'words, ",") }

# group.toList(Type)(field)

Turns a column in the group into a list. An idiosyncracy about this is that null items in the list are removed. It is equivalent to first filtering null items. Be careful about depending on this behavior as it may be changed before scalding 1.0.

// Take all the words with this count and join them into a list.
wordCounts.groupBy('count) { _.toList[String]('word -> 'words) }

// Same, but call the new column "words".
wordCounts.groupBy('count) { _.toList[String]('word -> 'words) }

# group.sum(field)

Sums over a column in the group.

expenses.groupBy('shoppingLocation) { _.sum('cost) }

// Same, but call the summed column 'totalCost'.
expenses.groupBy('shoppingLocation) { _.sum('cost -> 'totalCost) }

# group.count(field)

Counts the number of rows in a group that satisfy some predicate.

val usersWithImpressions =
    .groupBy('user) { _.count('numImpressions) { x : Long => x > 0 } }

reduce, foldLeft

# group.reduce(field){function}

Applies a reduce function over grouped columns. The reduce function is required to be associative, so that the work can be done on the map side and not solely on the reduce side (like a combiner).

// This example is equivalent to using `sum`, but you can also supply other reduce functions.
expenses.groupBy('shoppingLocation) {
    _.reduce('cost -> 'totalCost) {
      (costSoFar : Double, cost : Double) => costSoFar + cost

# group.foldLeft(field){function}

Like reduce, but all the work happens on the reduce side (so the fold function is not required to be associative, and can in fact output a type different from what it takes in). Fold is the fundamental reduce function.

pivot, unpivot

Pivot and unpivot are similar to SQL and Excel functions that change data from a row-based representation to a column-based one (in the case of pivot) or vice-versa (in the case of unpivot).

# group.pivot

Converts data from a row-based representation to a column-based one.

pipe.groupBy('key) { _.pivot(('col, 'val) -> ('x, 'y,' z)) }

In the first example, you need to have rows like:

3, "x", 1.2
3, "y", 3.4
4, "z", 4

and after the pivot you will have:

3, 1.2, 3.4, null
4, null, null, 4

When pivoting, you can provide an explicit default value instead of replacing missing rows with null:

pipe.groupBy('key) { _.pivot(('col, 'val) -> ('x, 'y, 'z), 0.0) }

This will result in:

3, 1.2, 3.4, 0.0
4, 0.0, 0.0, 4

# group.unpivot

Converts data from a column-based representation to a row-based one. (Strictly speaking, unpivot is a map-like function which appears in RichPipe.scala and does not require a reduce-phase.)

pipe.unpivot(('x, 'y, 'z) -> ('col,'val))

Join operations

Join operations merge two pipes on a specified set of keys, similar to SQL joins. They are defined in JoinAlgorithms.scala.

All the expected joining modes are present: inner, outer, left, right. Cascading implements these as CoGroup operations which are implemented in a single map-reduce job. It is important to hint the relative sizes of your data. There are three main joins: joinWithSmaller, joinWithLarger and joinWithTiny. joinWithTiny is a special join that does not move the left-hand side from from mappers to reducers. Instead, all the right hand side is replicated to the nodes holding the left side. joinWithTiny is appropriate when you know that Small < N * Big, where N is the number of nodes in the job.

When in doubt, choose joinWithSmaller and optimize if that step seems to be taking a very long time.

joinWithSmaller, joinWithLarger

// people is a large pipe with a birth_city_id. We join it with the smaller cities pipe on id.
val peopleWithBirthplaces = people.joinWithSmaller('birth_city_id -> 'id, cities)

// Equivalent to...
val peopleWithBirthplaces = cities.joinWithLarger('id -> 'birth_city_id, people)

// Note that the two pipes can only have a field name in common is they are doing an inner join
// on that field. For example, this is allowed:
people.joinWithSmaller('ssn -> 'ssn, teachers)

// But this would fail, ssn field of one of the pipes:
people.joinWithSmaller('ssn -> 'ssn, teachers, joiner=new OuterJoin)
Something went wrong with that request. Please try again.