Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

API Reference

echen edited this page · 23 revisions

Contents

Getting help

Documentation

Matrix API

Third Party Modules

Videos

How-tos

Tutorials

Articles

Other

Clone this wiki locally

Scalding functions can be divided into three types:

Map-like functions

Map-like functions operate over individual rows in a pipe, usually transforming them in some way. They are defined in RichPipe.scala.

# pipe.filter(fields){function}

Filters out rows.

val birds = animals.filter('type) { type : String => type == "bird" }

You can also filter over multiple fields at once.

val fastAndTallBirds =
  birds.filter('speed, 'height) { 
    fields : (Float, Float) =>
    val (speed, height) = fields
    (speed > 100) && (height > 100)
  }

# pipe.map(existingFields -> additionalFields){function}

Adds new fields that are transformations of existing ones.

// In addition to the existing `speed` field, the new `fasterBirds`
// pipe will contain a new `doubledSpeed` field (plus any other 
// fields that `birds` already contained).
val fasterBirds = birds.map('speed -> 'doubledSpeed) { speed : Int => speed * 2 }

You can also map from and to multiple fields at once.

val britishBirds =
  birds.map(('weightInLbs, 'heightInFt) -> ('weightInKg, 'heightInMeters)) {
    x : (Float, Float) =>
    val (weightInLbs, heightInFt) = x
    (0.454 * weightInLbs, 0.305 * heightInFt)
  }

# pipe.project(fields)

Remove all unspecified fields.

// The new pipe contains only two fields: `jobTitle` and `salary`.
val onlyKeepWorkInfo = people.project('jobTitle, 'salary)

# pipe.discard(fields)

Removes specified fields. discard is the opposite of project.

val forgetBirth = people.discard('birthplace, 'birthday)

# pipe.pack(Class)(fields -> object)

You can pack multiple fields into a single object, by using Java reflection. For now this only works for objects that have a default constructor that takes no arguments. The Java reflection only happens once for each field, so the performance should be very good.

For example suppose that you have a class called Person, with fields age and height, and setters setAge and setHeight. Then you can do the following to populate those fields:

val people = data.pack[Person](('age, 'height) -> 'person)

# pipe.unpack(Class)(object -> fields)

Conversely, you can unpack the contents of an object into multiple fields

val data = people.unpack[Person]('person -> ('age, 'height))

The default reflection-based unpacker works for case classes, as well as standard Thrift- and Protobuf-generated classes.

Defining custom packers and unpackers

If you want to use tuple packing and unpacking for objects that do not depend on Java reflection, then you need to implement the TuplePacker and TupleUnpacker abstract classes and define implicit conversions in the context of your Job class see TuplePacker.scala for more.

# pipe.mapTo(existingFields -> newFields){function}


MapTo is equivalent to mapping and then projecting to the new fields (but it is more efficient). Thus, the following two lines produce the same result:

pipe.mapTo(existingFields -> newFields){ ... }
pipe.map(existingFields -> newFields){ ... }.project(newFields)

Here is an example:

val savings =
  items.mapTo(('price, 'discountedPrice) -> 'savings) {
    x : (Float, Float) =>
    val (price, discountedPrice) = x
    price - discountedPrice
  }
val savingsSame =
  items
    .map(('price, 'discountedPrice) -> 'savings) {
      x : (Float, Float) =>
      val (price, discountedPrice) = x
      price - discountedPrice
    }
    .project('savings)

# pipe.flatMap(originalFields -> newFields){function}

flatMap to can be used as a filter and map together. Your function maps each element to a list, and then that list is flattened (thus flat-map).

val words =
  books
    .flatMap('text -> 'word) {
      text : String =>
      text.split("\\s+").map { word : String => word }
    }

# pipe.flatMapTo(originalFields -> newFields){function}

The mapTo analogue of flatMap.

val words =
  books.flatMapTo('text -> 'word) { 
    text : String => 
    text.split("\\s+").map { word : String => word } 
  }

# pipe.limit(number)

Allows only a fixed number of items to pass in a pipe.

val oneHundredPeople = people.limit(100)

# pipe.unique(fields)

Keeps only unique rows based on a specified set of fields.

This looks like a mapping function, but it actually requires a map-reduce pair so doing this during one of your groupBy operations (if you can structure your algorithm to simultaneously do so) will save work.

// Keep only the unique (firstName, lastName) pairs. All other fields are discarded.
people.unique('firstName, 'lastName)

Grouping/Reducing Functions

see GroupBuilder.scala. Most of these functions were inspired from the scala.collection.Iterable API.

groupBy

Group your pipe by the values in a specified set of columns, and then apply a grouping function to the values in each group.

val wordCounts =
  books
    .flatMap('text -> 'word) {
      text : String =>
      text.split("\\s+").map { word : String => word }
    }
    .groupBy('word) {
      // The size function takes the name of the new column.
      // By default, if you don't pass in a new name, the new column is simply called 'size'.
      // Here we call the new column 'count'.
      _.size('count)
    }
    // We now have (word, count) columns in the pipe.

Grouping functions include...

size

Count the number of rows in this group

wordCounts
  .groupBy('word) {
      // By default, if you don't pass in a new name, the new column is simply called 'size'.
      // Here we call the new column 'count'.
      _.size('count)
    }

average

Take the mean of a column.

// Find the mean age of boys vs. girls
people
    .groupBy('sex) {
        // The new column is called 'meanAge'.
        _.average('age -> 'meanAge)
    }

mkString

Turn a column in the group into a string.

wordCounts
  .groupBy('count) {
    // Take all the words with this count, join them with a comma, and call the new column "words".
    _.mkString('word -> 'words, ",")
  }

toList

Turn a column in the group into a list. An idosyncracy about this is that null items in the list are removed. It is equivalent to first filtering null items. Be careful about depending on this behavior as it may be changed before scalding 1.0.

wordCounts
  .groupBy('count) {
    // Take all the words with this count, join them into a list in a new column called "words".
    _.toList[String]('word -> 'words)
  }

sum

Sum over a column in the group.

expenses
  .groupBy('shoppingLocation) {
    // Sum over the 'cost' column, and rename the summed column to 'totalCost'.
    _.sum('cost -> 'totalCost)
  }

reduce

We can also reduce over grouped columns. This is equivalent to the previous sum. The reduce function is required to be associative, so that the work can be done on the map side and not solely on the reduce side (like a combiner).

expenses
  .groupBy('shoppingLocation) {
    _.reduce('cost -> 'totalCost) {
      (costSoFar : Double, cost : Double) => costSoFar + cost
    }
  }

foldLeft

Like reduce, but all the work happens on the reduce side (so the fold function is not required to be associative, and can in fact output a type different from what it takes in). Fold is the fundamental reduce function.

count

We can count the number of rows in a group that satisfy some predicate.

val usersWithImpressions =
  users
    .groupBy('user) { _.count('numImpressions) { x : Long => x > 0 } }

pivot/unpivot

Pivot/Unpivot are equivalents to SQL/Excel functions that change the data from a row-based representation to column-based:

pipe.groupBy('key) { _.pivot(('col, 'val)->('x,'y,'z)) }

or using unpivot, from column-based representation to a row-based (strictly speaking, unpivot is a map-like function which appears in RichPipe.scala and does not require a reduce-phase):

pipe.unpivot(('x,'y,'z)->('col,'val))

In the first example, you need to have rows like:

3, "x", 1.2
3, "y", 3.4
4, "z", 4

and after the pivot you will have:

3, 1.2, 3.4, null
4, null, null, 4

When pivoting, you can provide an explicit default value instead of replacing missing rows with null

pipe.groupBy('key) {_.pivot(('col, 'val) -> ('x,'y,'z), 0.0) }

which will result in

3, 1.2, 3.4, 0.0
4, 0.0, 0.0, 4

groupAll

There's also a groupAll function, which is useful if you want to (say) count the total number of rows in the pipe. Think three times before using this function on Hadoop. This removes the ability to do any parallelism in the reducers. That said, accumulating a global variable may require it. Tip: if you need to bring this value to another pipe, try crossWithTiny (another function you should use with great care).

// vocabSize is now a pipe with a single entry, containing the total number of words in the vocabulary.
val vocabSize =
  wordCounts
    .groupAll { _.size('vocabSize) }

It's also useful if, right before outputting a pipe, you want to sort by certain columns.

val sortedPeople =
    people.groupAll {
        // Sort by lastName, then by firstName.
        _.sortBy('lastName, 'firstName)
    }

Joining Functions

Joins (see JoinAlgorithms.scala)

All the expected joining modes are present: inner, outer, left, right. Cascading implements these as CoGroup operations which are implemented in a single map-reduce job. It is important to hint the relative sizes of your data. There are three main joins: joinWithSmaller, joinWithLarger and joinWithTiny. joinWithTiny is a special join that does not move the left-hand side from from mappers to reducers. Instead, all the right hand side is replicated to the nodes holding the left side. joinWithTiny is appropriate when you know that Small < N * Big, where N is the number of nodes in the job.

When in doubt, choose joinWithSmaller and optimize if that step seems to be taking a very long time.

joinWithSmaller, joinWithLarger

// people is a large pipe with a birth_city_id. We join it with the smaller cities pipe on id.
val peopleWithBirthplaces = people.joinWithSmaller('birth_city_id -> 'id, cities)

// Equivalent to...
val peopleWithBirthplaces = cities.joinWithLarger('id -> 'birth_city_id, people)

// Note that the two pipes can only have a field name in common is they are doing an inner join
// on that field. For example, this is allowed:
people.joinWithSmaller('ssn -> 'ssn, teachers)

// But this would fail, ssn field of one of the pipes:
people.joinWithSmaller('ssn -> 'ssn, teachers, joiner=new OuterJoin)
Something went wrong with that request. Please try again.