Skip to content

Commit

Permalink
Now using generator (yield()) semantics rather than crudely puts'ing …
Browse files Browse the repository at this point in the history
…results
  • Loading branch information
Philip (flip) Kromer committed Feb 16, 2009
1 parent b323256 commit 0f51446
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 89 deletions.
29 changes: 26 additions & 3 deletions README.textile
@@ -1,6 +1,26 @@

Wukong makes using Hadoop so easy a chimpanzee can use it.

h2. How to write a Wukong script

#!/usr/bin/env ruby
require 'wukong'

module WordCount
class Mapper < Wukong::Streamer::LineStreamer
# Emit each word in each line.
def process line
yield line.strip.split(/\W+/).
end
end

class Reducer < Wukong::Streamer::UniqCountLinesReducer
end
end
# Execute the script
Script.new(WordCount::Mapper, WordCount::Reducer).run


h2. How to run a Wukong script

your/script.rb --go path/to/input_files path/to/output_dir
Expand All @@ -19,14 +39,17 @@ syntax to specify more than one input file:
h2. How to test your scripts

To run mapper on its own:

cat ./local/test/input.tsv | ./examples/word_count.rb --map | more

or if your test data lies on the HDFS,

hdp-cat test/input.tsv | ./examples/word_count.rb --map | more


h2. What's up with Wukong::AndPig?

Wukong::AndPig is a small library to more easily generate code for the
@Wukong::AndPig@ is a small library to more easily generate code for the
"Pig":http://hadoop.apache.org/pig data analysis language. See its
"README":wukong/and_pig/README.textile for more.

Expand All @@ -39,14 +62,14 @@ journeys to the land of the Elephant:

Quoting "Sun Wukong's Wikipedia entry:":http://en.wikipedia.org/wiki/Wukong

bq.. Sun Wukong (traditional Chinese: 孫悟空;
bq. Sun Wukong (traditional Chinese: 孫悟空;
simplified Chinese: 孙悟空; pinyin: Sūn Wùkōng; Wade-Giles: Sun1 Wu4-k'ung1;
Japanese 孫悟空 (Son Gokū?)), known in the West as the Monkey King, is the main
character in the classical Chinese epic novel Journey to the West. In the novel,
he accompanies the monk Xuanzang on the journey to retrieve Buddhist sutras from
India.

Sun Wukong possesses incredible strength, being able to lift his 13,500 jīn
bq. Sun Wukong possesses incredible strength, being able to lift his 13,500 jīn
(8,100 kg) Ruyi Jingu Bang with ease. He also has superb speed, traveling
108,000 li (54,000 kilometers) in one somersault. Sun knows 72 transformations,
which allows him to transform into various animals and objects; he is, however,
Expand Down
32 changes: 15 additions & 17 deletions examples/sample_records.rb
Expand Up @@ -10,31 +10,29 @@
# option: for example, to take a random 1/1000th of the lines in huge_files,
# ./examples/sample_records.rb --sampling_fraction=0.001 --go huge_files sampled_files
#
class Mapper < Wukong::Streamer::LineStreamer
include Wukong::Streamer::Filter

module SampleRecords
class Mapper < Wukong::Streamer::Filter
attr_accessor :sampling_fraction
def initialize options, *args
super options, *args
self.sampling_fraction = options[:sampling_fraction].to_f or
raise "Please supply a --sampling_fraction= argument, a decimal number between 0 and 1"
end

def emit? line
rand < self.sampling_fraction
end
# number between 0 and 1 giving the fraction of lines to emit
attr_accessor :sampling_fraction
#
# Use the command-line option to get the sampling fraction
#
def initialize options, *args
super options, *args
self.sampling_fraction = options[:sampling_fraction].to_f or
raise "Please supply a --sampling_fraction= argument, a decimal number between 0 and 1"
end

#
# randomly decide to emit +sampling_fraction+ fraction of lines
#
class Script < Wukong::Script
def emit? line
rand < self.sampling_fraction
end
end

#
# Executes the script
#
SampleRecords::Script.new(
SampleRecords::Mapper,
nil
).run
Wukong::Script.new( Mapper, nil ).run
16 changes: 3 additions & 13 deletions examples/word_count.rb
Expand Up @@ -41,20 +41,10 @@ def stream
end
end
end

class Reducer < Wukong::Streamer::UniqCountKeysReducer
end

#
#
class Script < Wukong::Script
end
end

#
# Executes the script
#
WordCount::Script.new(
# Execute the script
Wukong::Script.new(
WordCount::Mapper,
WordCount::Reducer
Wukong::Streamer::UniqCountKeysReducer
).run
4 changes: 3 additions & 1 deletion wukong/streamer.rb
@@ -1,9 +1,11 @@
require 'wukong/streamer/base'
require 'wukong/streamer/accumulating_reducer'
require 'wukong/streamer/line_streamer'
require 'wukong/streamer/struct_streamer'
#
require 'wukong/streamer/filter'
#
require 'wukong/streamer/accumulating_reducer'
require 'wukong/streamer/list_reducer'
require 'wukong/streamer/uniq_by_last_reducer'
require 'wukong/streamer/uniq_count_keys_reducer'
require 'wukong/streamer/uniq_count_lines_reducer'
34 changes: 16 additions & 18 deletions wukong/streamer/accumulating_reducer.rb
Expand Up @@ -14,7 +14,7 @@ module Streamer
# accumulate more data than your box can hold before finalizing.
#
class AccumulatingReducer < Wukong::Streamer::Base
attr_accessor :curr_key
attr_accessor :key
def initialize options
super options
reset!
Expand All @@ -23,28 +23,26 @@ def initialize options
#
# override for multiple-field keys, etc.
#
def get_key *vals
vals.first
def get_key *record
record.first
end

#
# Accumulate all values for a given key.
# Accumulate all records for a given key.
#
# When the last value for the key is seen, finalize processing and adopt the
# When the last record for the key is seen, finalize processing and adopt the
# new key.
#
def process *vals
key = get_key(*vals)
# if we've seen nothing, adopt key
self.curr_key ||= key
# if this is a new key,
if key != self.curr_key
finalize # process what we've collected so far
def process *args, &block
this_key = get_key(*args)
self.key ||= this_key
if this_key != self.key # if this is a new key,
finalize(&block) # process what we've collected so far
reset! # then forget about that key
self.curr_key = key # and start a new one
self.key = this_key # and start a new one
end
# collect the current line
accumulate *vals
# collect the current record
accumulate *args
end

#
Expand All @@ -53,11 +51,11 @@ def process *vals
# Make sure to call +super+ if you override
#
def reset!
self.curr_key = nil
self.key = nil
end

#
# Override this to accumulate each value for the given key in turn.
# Override this to accumulate each record for the given key in turn.
#
def accumulate
raise "override the accumulate method in your subclass"
Expand All @@ -76,7 +74,7 @@ def finalize
#
def stream
super
finalize
finalize(){|record| emit record }
end
end

Expand Down
29 changes: 19 additions & 10 deletions wukong/streamer/base.rb
Expand Up @@ -8,34 +8,43 @@ def initialize options={}
end

#
# itemize and process each line
# Pass each record to +#process+
#
def stream
$stdin.each do |line|
item = itemize(line) ; next if item.blank?
process(*item)
record = recordize(line.chomp)
next if record.nil?
process(*record) do |output_record|
emit output_record
end
end
end

#
# Default itemizer: process each record as an array of fields by splitting
# at field separator
# Default recordizer: returns array of fields by splitting at tabs
#
def itemize line
line.chomp.split("\t")
def recordize line
line.split("\t")
end

def emit record
puts record.join("\t")
end

#
# Implement your own [process] method
# Process each record in turn, yielding the records to emit
#
def process *args, &block
raise "override the process method in your implementation: it should process each record."
end

#
# To track processing errors inline,
# pass the line back to bad_record!
#
def bad_record! *args
warn "Bad record #{args.inspect[0..400]}"
puts ["bad_record", args].flatten.join("\t")
puts ["bad_record", *args].join("\t")
end
end
end
Expand Down
13 changes: 6 additions & 7 deletions wukong/streamer/filter.rb
Expand Up @@ -3,18 +3,17 @@ module Streamer
#
# emit only some records, as dictated by the #emit? method
#
class Filter < Wukong::Streamer::Base

# This is a mixin: including this module in your streamer
# implements its +#process+ method.
#
module Filter
#
# Filter out a subset of record/lines
#
# Subclass and re-define the emit? method
#
def stream
$stdin.each do |line|
line.chomp!
puts line if emit?(line)
end
def process *record, &block
yield record if emit?(record)
end
end
end
Expand Down
12 changes: 12 additions & 0 deletions wukong/streamer/line_streamer.rb
@@ -0,0 +1,12 @@
module Wukong
module Streamer
class LineStreamer < Wukong::Streamer::Base
#
# Turns a flat line into a record for +#process+ to consume
#
def recordize line
[line]
end
end
end
end
21 changes: 21 additions & 0 deletions wukong/streamer/list_reducer.rb
@@ -0,0 +1,21 @@
module Wukong
module Streamer
#
# Emit each unique key and the count of its occurrences
#
class ListReducer < Wukong::Streamer::AccumulatingReducer
attr_accessor :values

# reset the counter to zero
def reset!
super
self.values = []
end

# record one more for this key
def accumulate *record
self.values << record
end
end
end
end
50 changes: 34 additions & 16 deletions wukong/streamer/struct_streamer.rb
@@ -1,30 +1,48 @@
module Wukong
module Streamer
#
# Mix StructRecordizer into any streamer to make it accept a stream of
# objects -- the first field in each line is turned into a class and used to
# instantiate an object using the remaining fields on that line.
#
#
class StructStreamer < Wukong::Streamer::Base
def itemize line
StructItemizer.itemize *super(line)
end
end

#
#
#
module StructItemizer
module StructRecordizer
def self.class_from_resource klass_name
begin klass = klass_name.to_s.camelize.constantize
# kill off all but class name
klass_name = klass_name.gsub(/-.*$/, '')
begin
# convert it to class name
klass = klass_name.to_s.camelize.constantize
rescue ; warn "Bogus class name '#{klass_name}'" ; return ; end
end

def self.itemize klass_name, *vals
#
# Turned the first field into a class name, then use the remaining fields
# on that line to instantiate the object to process.
#
def self.recordize klass_name, *fields
return if klass_name =~ /^(?:bogus-|bad_record)/
klass_name.gsub!(/-.*$/, '') # kill off all but class name
klass = self.class_from_resource(klass_name) or return
[ klass.new(*vals) ]
klass = class_from_resource(klass_name) or return
# instantiate the class using the remaining fields on that line
[ klass.new(*fields) ]
end

#
#
#
def recordize line
StructRecordizer.recordize line.split("\t")
end
end

#
# Processes file as a stream of objects -- the first field in each line is
# turned into a class and used to instantiate an object using the remaining
# fields on that line.
#
# See [StructRecordizer] for more.
#
class StructStreamer < Wukong::Streamer::Base
include StructRecordizer
end
end
end

0 comments on commit 0f51446

Please sign in to comment.