Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

API Reference

echen edited this page · 23 revisions

Contents

Getting help

Documentation

Matrix API

Third Party Modules

Videos

How-tos

Tutorials

Articles

Other

Clone this wiki locally

Scalding functions can be divided into three types:

Map-like functions

Map-like functions operate over individual rows in a pipe, usually transforming them in some way. They are defined in RichPipe.scala.

map, flatMap

# pipe.map(existingFields -> additionalFields){function}

Adds new fields that are transformations of existing ones.

// In addition to the existing `speed` field, the new `fasterBirds`
// pipe will contain a new `doubledSpeed` field (plus any other 
// fields that `birds` already contained).
val fasterBirds = birds.map('speed -> 'doubledSpeed) { speed : Int => speed * 2 }

You can also map from and to multiple fields at once.

val britishBirds =
  birds.map(('weightInLbs, 'heightInFt) -> ('weightInKg, 'heightInMeters)) {
    x : (Float, Float) =>
    val (weightInLbs, heightInFt) = x
    (0.454 * weightInLbs, 0.305 * heightInFt)
  }

# pipe.flatMap(originalFields -> newFields){function}

Maps each element to a list (or an Option), and then flattens that list.

val words =
  books
    .flatMap('text -> 'word) {
      text : String =>
      text.split("\\s+").map { word : String => word }
    }

project, discard

# pipe.project(fields)

Remove all unspecified fields.

// The new pipe contains only two fields: `jobTitle` and `salary`.
val onlyKeepWorkInfo = people.project('jobTitle, 'salary)

# pipe.discard(fields)

Removes specified fields. discard is the opposite of project.

val forgetBirth = people.discard('birthplace, 'birthday)

mapTo, flatMapTo

# pipe.mapTo(existingFields -> newFields){function}

MapTo is equivalent to mapping and then projecting to the new fields (but it is more efficient). Thus, the following two lines produce the same result:

pipe.mapTo(existingFields -> newFields){ ... }
pipe.map(existingFields -> newFields){ ... }.project(newFields)

Here is another example:

val savings =
  items.mapTo(('price, 'discountedPrice) -> 'savings) {
    x : (Float, Float) =>
    val (price, discountedPrice) = x
    price - discountedPrice
  }
val savingsSame =
  items
    .map(('price, 'discountedPrice) -> 'savings) {
      x : (Float, Float) =>
      val (price, discountedPrice) = x
      price - discountedPrice
    }
    .project('savings)

# pipe.flatMapTo(originalFields -> newFields){function}

The flatMap analogue of mapTo.

val words =
  books.flatMapTo('text -> 'word) { 
    text : String => 
    text.split("\\s+").map { word : String => word } 
  }

filter

# pipe.filter(fields){function}

Filters out rows.

val birds = animals.filter('type) { type : String => type == "bird" }

You can also filter over multiple fields at once.

val fastAndTallBirds =
  birds.filter('speed, 'height) { 
    fields : (Float, Float) =>
    val (speed, height) = fields
    (speed > 100) && (height > 100)
  }

limit

# pipe.limit(number)

Allows only a fixed number of items to pass in a pipe.

val oneHundredPeople = people.limit(100)

unique

# pipe.unique(fields)

Keeps only unique rows based on a specified set of fields.

This looks like a mapping function, but it actually requires a map-reduce pair, so doing this during one of your groupBy operations (if you can structure your algorithm to simultaneously do so) will save work.

// Keep only the unique (firstName, lastName) pairs. All other fields are discarded.
people.unique('firstName, 'lastName)

pack, unpack

# pipe.pack(Type)(fields -> object)

You can pack multiple fields into a single object, by using Java reflection. For now this only works for objects that have a default constructor that takes no arguments. The Java reflection only happens once for each field, so the performance should be very good.

For example suppose that you have a class called Person, with fields age and height, and setters setAge and setHeight. Then you can do the following to populate those fields:

val people = data.pack[Person](('age, 'height) -> 'person)

# pipe.unpack(Type)(object -> fields)

Conversely, you can unpack the contents of an object into multiple fields

val data = people.unpack[Person]('person -> ('age, 'height))

The default reflection-based unpacker works for case classes as well as standard Thrift- and Protobuf-generated classes.

If you want to use tuple packing and unpacking for objects that do not depend on Java reflection, then you need to implement the TuplePacker and TupleUnpacker abstract classes and define implicit conversions in the context of your Job class. See TuplePacker.scala for more.

Grouping/reducing functions

Grouping/reducing functions operate over groups of rows in a pipe, often aggregating them in some way. They usually involve a reduce phase. These functions are defined in GroupBuilder.scala.

Most of these functions are inspired by the scala.collection.Iterable API.

groupBy, groupAll

# pipe.groupBy(fields){ group => ... }

Groups your pipe by the values in the specified set of fields, and then applies a set of operations to the group to create a new set of fields.

// Create a new pipe with (word, count) fields.
val wordCounts = words.groupBy('word) { group => group.size }

Group operations chain together.

// Create a new pipe containing 
// (country, sex, # of people in country of sex, mean age sliced by country and sex).
val demographics = people.groupBy('country, 'sex) { _.size.average('age) }

# pipe.groupAll{ group => ... }

Creates a single group consisting of the entire pipe.

Think three times before using this function on Hadoop. This removes the ability to do any parallelism in the reducers. That said, accumulating a global variable may require it. Tip: if you need to bring this value to another pipe, try crossWithTiny (another function you should use with great care).

// vocabSize is now a pipe with a single entry, containing the total number of words in the vocabulary.
val vocabSize = wordCounts.groupAll { _.size }

groupAll is also useful if you want to sort a pipe immediately before outputting it.

val sortedPeople = people.groupAll { _.sortBy('lastName, 'firstName) }

Group operations

# group.size(name)

Counts the number of rows in this group. By default, the name of the new field is size, but you can pass in a new name as well.

// The new `wordCounts` pipe contains "word" and "size" fields.
val wordCounts = words.groupBy('word) { _.size }

// Same, but calls the new field "count" instead of the default "size".
val wordCounts = words.groupBy('word) { _.size('count) }

# group.average(field)

Computes the mean over a field. By default, the new field has the same name as the original field, but you can pass in a new name as well.

// Find the mean age of boys vs. girls. The new pipe contains "sex" and "age" fields.
val demographics = people.groupBy('sex) { _.average('age) }

// Same, but call the new field "meanAge".
val demographics = people.groupBy('sex) { _.average('age -> 'meanAge) }

# group.mkString(field, joiner)

Turns a column in the group into a string. Again, the new field has the same name as the original field by default, but you can also pass in a new name.

// Take all the words with a given count and join them with a comma.
wordCounts.groupBy('count) { _.mkString('word, ",") }

// Same, but call the new column "words".
wordCounts.groupBy('count) { _.mkString('word -> 'words, ",") }

# group.toList(Type)(field)

Turns a column in the group into a list. An idiosyncracy about this is that null items in the list are removed. It is equivalent to first filtering null items. Be careful about depending on this behavior as it may be changed before scalding 1.0.

// Take all the words with this count and join them into a list.
wordCounts.groupBy('count) { _.toList[String]('word -> 'words) }

// Same, but call the new column "words".
wordCounts.groupBy('count) { _.toList[String]('word -> 'words) }

# group.sum(field)

Sums over a column in the group.

expenses.groupBy('shoppingLocation) { _.sum('cost) }

// Same, but call the summed column 'totalCost'.
expenses.groupBy('shoppingLocation) { _.sum('cost -> 'totalCost) }

# group.count(field){function}

Counts the number of rows in a group that satisfy some predicate.

val usersWithImpressions =
  users
    .groupBy('user) { _.count('numImpressions) { x : Long => x > 0 } }

reduce, foldLeft

# group.reduce(field){function}

Applies a reduce function over grouped columns. The reduce function is required to be associative, so that the work can be done on the map side and not solely on the reduce side (like a combiner).

// This example is equivalent to using `sum`, but you can also supply other reduce functions.
expenses.groupBy('shoppingLocation) {
    _.reduce('cost -> 'totalCost) {
      (costSoFar : Double, cost : Double) => costSoFar + cost
    }
  }

# group.foldLeft(field){function}

Like reduce, but all the work happens on the reduce side (so the fold function is not required to be associative, and can in fact output a type different from what it takes in). Fold is the fundamental reduce function.

take, sortWithTake

# group.take(number)

# group.sortWithTake

reducers

# group.reducers(number)

pivot, unpivot

Pivot and unpivot are similar to SQL and Excel functions that change data from a row-based representation to a column-based one (in the case of pivot) or vice-versa (in the case of unpivot).

# group.pivot

Converts data from a row-based representation to a column-based one.

pipe.groupBy('key) { _.pivot(('col, 'val) -> ('x, 'y, 'z)) }

In the first example, you need to have rows like:

3, "x", 1.2
3, "y", 3.4
4, "z", 4

and after the pivot you will have:

3, 1.2, 3.4, null
4, null, null, 4

When pivoting, you can provide an explicit default value instead of replacing missing rows with null:

pipe.groupBy('key) { _.pivot(('col, 'val) -> ('x, 'y, 'z), 0.0) }

This will result in:

3, 1.2, 3.4, 0.0
4, 0.0, 0.0, 4

# group.unpivot

Converts data from a column-based representation to a row-based one. (Strictly speaking, unpivot is a map-like function which appears in RichPipe.scala and does not require a reduce-phase.)

group.unpivot(('x, 'y, 'z) -> ('col, 'val))

Join operations

Join operations merge two pipes on a specified set of keys, similar to SQL joins. They are defined in JoinAlgorithms.scala.

All the expected joining modes are present: inner, outer, left, and right. Cascading implements these as CoGroup operations which are implemented in a single map-reduce job.

joins

Since it is important to hint at the relative sizes of your data, Scalding provides three main types of joins:

  • joinWithSmaller
  • joinWithLarger
  • joinWithTiny: this is a special map-side join that does not move the left-hand side from from mappers to reducers. Instead, the entire right hand side is replicated to the nodes holding the left side.

When in doubt, choose joinWithSmaller and optimize if that step seems to be taking a very long time.

# pipe1.joinWithSmaller(fields, pipe2)

Joins two pipes on a specified set of fields. Use this when pipe2 has fewer rows than pipe1.

// `people` is a large pipe with a "birthCityId" field. 
// Join it against the smaller `cities` pipe, which contains an "id" field.
val peopleWithBirthplaces = people.joinWithSmaller('birthCityId -> 'id, cities)

# pipe1.joinWithLarger(fields, pipe2)

Joins two pipes on a specified set of fields. Use this when pipe2 has more rows than pipe1.

// `cities` is a small pipe with an "id" field. 
// Join it against the larger `people` pipe, which contains a "birthCityId" field.
val peopleWithBirthplaces = cites.joinWithLarger('id -> 'birthCityId, people)

# pipe1.joinWithTiny(fields, pipe2)

Joins two pipes on a specified set of fields. As explained above, this is a special map-side join that does not move the left-hand side from from mappers to reducers. Instead, the entire right hand side is replicated to the nodes holding the left side. joinWithTiny is appropriate when you know that # of rows in bigger pipe < Nodes * # rows in smaller pipe, where Nodes is the number of nodes in the job.

// Assume this is a small pipe containing at most couple thousand rows.
val celebrities = ...

val celebrityBirthplaces = cities.joinWithTiny('id -> 'birthCityId, celebrities)

join modes

By default, all joins are inner joins. You can also specify that you want a left join, a right join, or an outer join.

people.joinWithSmaller('birthCityId -> 'id, cities, joiner = new LeftJoin)
people.joinWithSmaller('birthCityId -> 'id, cities, joiner = new RightJoin)
people.joinWithSmaller('birthCityId -> 'id, cities, joiner = new OuterJoin)

Note that when performing an inner join, the left and right pipes are allowed to join on common field names.

// This is allowed. Only a single "ssn" field will be left in the resulting merged pipe.
people.joinWithSmaller('ssn -> 'ssn, teachers)

However, joining on common field names is not allowed for the left joins, right joins, or outer joins (since it is useful to know whether a missing field value comes from the left pipe or the right pipe).

// This is not allowed.
people.joinWithSmaller('ssn -> 'ssn, teachers, joiner = new OuterJoin)

crossWithTiny

# pipe1.crossWithTiny(pipe2)

Performs the cross product of two pipes.

Something went wrong with that request. Please try again.