Skip to content
This repository

Contents

Getting help

Documentation

Matrix API

Third Party Modules

Videos

How-tos

Tutorials

Articles

Other

A collection of MapReduce tasks translated (from Pig, Hive, MapReduce streaming, etc.) into Scalding. For fully runnable code, see the repository here.

Word Count

Hadoop Streaming (Ruby)

# Emit (word, count) pairs.
def mapper
  STDIN.each_line do |line|
    line.split.each do |word|
      puts [word, 1].join("\t")
    end
  end
end

# Aggregate all (word, count) pairs for a particular word.
#
# In Hadoop Streaming (unlike standard Hadoop), the reducer receives
# rows from the mapper *one at a time*, though the rows are guaranteed
# to be sorted by key (and every row associated to a particular key
# will be sent to the same reducer).
def reducer
  curr_word = nil
  curr_count = 0
  STDIN.each_line do |line|
    word, count = line.strip.split("\t")
    if word != curr_word
      puts [curr_word, curr_count].join("\t")
      curr_word = word
      curr_count = 0
    end
    curr_count += count.to_i
  end

  puts [curr_word, curr_count].join("\t") unless curr_word.nil?
end

Hive

# tokenizer.py
import sys

for line in sys.stdin:
  for word in line.split():
    print word
CREATE TABLE tweets (text STRING);
LOAD DATA LOCAL INPATH 'tweets.tsv' OVERWRITE INTO TABLE tweets;

SELECT word, COUNT(*) AS count
FROM (
  SELECT TRANSFORM(text) USING 'python tokenizer.py' AS word
  FROM tweets
) t
GROUP BY word;

Pig

tweets = LOAD 'tweets.tsv' AS (text:chararray);
words = FOREACH tweets GENERATE FLATTEN(TOKENIZE(text)) AS word;
word_groups = GROUP words BY word;
word_counts = FOREACH word_groups GENERATE group AS word, COUNT(words) AS count;

STORE word_counts INTO 'word_counts.tsv';

Cascalog 2.0

(cascalog.repl/bootstrap)

(?<- (hfs-textline "word_counts.tsv") [?word ?count]
     ((hfs-textline "tweets.tsv") ?text)
     ((mapcatfn [text] (.split text "\\s+")) ?text :> ?word)
     (c/count ?count)))

Scalding

import com.twitter.scalding._

class ScaldingTestJob(args: Args) extends Job(args) {
  Tsv("tweets.tsv", 'text)
    .flatMap('text -> 'word) { text : String => text.split("\\s+") }
    .groupBy('word) { _.size }
    .write(Tsv("word_counts.tsv"))
}

Distributed Grep

Hadoop Streaming (Ruby)

PATTERN = /.*hello.*/

# Emit words that match the pattern.
def mapper
  STDIN.each_line do |line|
    puts line if line =~ PATTERN
  end
end

# Identity reducer.
def reducer
  STDIN.each_line do |line|
    puts line
  end
end

Pig

%declare PATTERN '.*hello.*';

tweets = LOAD 'tweets.tsv' AS (text:chararray);
results = FILTER tweets BY (text MATCHES '$PATTERN');

Cascalog

(def pattern #".*hello.*")

(deffilterop matches-pattern? [text pattern]
  (re-matches pattern text))

(defn distributed-grep [input pattern]
  (<- [?textline]
      (input ?textline)
      (matches-pattern? ?textline pattern)))

(?- (stdout) (distributed-grep (hfs-textline "tweets.tsv") pattern))

Scalding

val Pattern = ".*hello.*";

Tsv("tweets.tsv", 'text)
  .filter('text) { text : String => text.matches(Pattern) }

Inverted Index

Hadoop Streaming (Ruby)

# Emit (word, tweet_id) pairs.
def mapper
  STDIN.each_line do |line|
    tweet_id, text = line.strip.split("\t")
    text.split.each do |word|
      puts [word, tweet_id].join("\t")
    end
  end
end

# Aggregate all (word, tweet_id) pairs for a particular word.
#
# In Hadoop Streaming (unlike standard Hadoop), the reducer receives
# rows from the mapper *one at a time*, though the rows are guaranteed
# to be sorted by key (and every row associated to a particular key
# will be sent to the same reducer).
def reducer
  curr_word = nil
  curr_inv_index = []
  STDIN.each_line do |line|
    word, tweet_id = line.strip.split("\t")
    if word != curr_word
      # New key.
      puts [curr_word, curr_inv_index.join(",")].join("\t")
      curr_word = word
      curr_inv_index = []
    end
    curr_inv_index << tweet_id
  end

  unless curr_word.nil?
    puts [curr_word, curr_inv_index.join(", ")].join("\t") 
  end
end

Pig

tweets = LOAD 'tweets.tsv' AS (tweet_id:int, text:chararray);

words = FOREACH tweets GENERATE tweet_id, FLATTEN(TOKENIZE(text)) AS word;
word_groups = GROUP words BY word;
inverted_index = FOREACH word_groups GENERATE group AS word, words.tweet_id;

Cascalog

;; define the data
(def index [
  [0 "Hello World"]
  [101 "The quick brown fox jumps over the lazy dog"]
  [42 "Answer to the Ultimate Question of Life, the Universe, and Everything"]
])

;; the tokenize function
(defmapcatop tokenize [text]
  (seq (.split text "\\s+")))

;; ensure inverted index is distinct per word
(defbufferop distinct-vals [tuples]
  (list (set (map first tuples))))

;; run the query on data
(?<- (stdout) [?word ?ids]
        (index ?id ?text)
        (tokenize ?text :> ?word)
        (distinct-vals ?id :> ?ids))

Scalding

val tweets = Tsv("tweets.tsv", ('id, 'text))

val wordToTweets =
  tweets
    .flatMap(('id, 'text) -> ('word, 'tweetId)) { 
      fields : (Long, String) => 
      val (tweetId, text) = fields
      text.split("\\s+").map { word => (word, tweetId) }
    }

val invertedIndex =
  wordToTweets.groupBy('word) {  _.toList[Long]('tweetId -> 'tweetIds) }
Something went wrong with that request. Please try again.