Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Big cleanup of the examples/ directory

  • Loading branch information...
commit 947156b6903851f51711b0aa2fe67f6b8f886fee 1 parent bfaccc9
Philip (flip) Kromer authored
Showing with 204 additions and 299 deletions.
  1. +32 −0 CHANGELOG.textile
  2. +58 −12 README.textile
  3. +17 −11 Rakefile
  4. +0 −8 TODO.textile
  5. +0 −56 examples/count_keys.rb
  6. +0 −57 examples/count_keys_at_mapper.rb
  7. +14 −21 examples/network_graph/breadth_first_search.rb
  8. +22 −13 examples/network_graph/gen_multi_edge.rb
  9. +1 −1  examples/pagerank/pagerank.rb
  10. +6 −10 examples/pagerank/pagerank_initialize.rb
  11. +6 −16 examples/sample_records.rb
  12. +0 −4 examples/server_logs/apache_log_parser.rb
  13. +3 −2 examples/size.rb
  14. +9 −11 examples/{ → stats}/binning_percentile_estimator.rb
  15. +2 −2 examples/{ → stats}/rank_and_bin.rb
  16. +0 −18 examples/store/chunked_store_example.rb
  17. +11 −14 examples/stupidly_simple_filter.rb
  18. +16 −36 examples/word_count.rb
  19. +2 −2 lib/wukong/script.rb
  20. +2 −2 lib/wukong/streamer/filter.rb
  21. +3 −3 lib/wukong/streamer/list_reducer.rb
  22. 0  {examples → old}/cassandra_streaming/avromapper.rb
  23. 0  {examples → old}/cassandra_streaming/berlitz_for_cassandra.textile
  24. 0  {examples → old}/cassandra_streaming/cassandra.avpr
  25. 0  {examples → old}/cassandra_streaming/cassandra_random_partitioner.rb
  26. 0  {examples → old}/cassandra_streaming/catter.sh
  27. 0  {examples → old}/cassandra_streaming/client_interface_notes.textile
  28. 0  {examples → old}/cassandra_streaming/client_schema.avpr
  29. 0  {examples → old}/cassandra_streaming/client_schema.textile
  30. 0  {examples → old}/cassandra_streaming/foofile.avr
  31. 0  {examples → old}/cassandra_streaming/pymap.sh
  32. 0  {examples → old}/cassandra_streaming/pyreduce.sh
  33. 0  {examples → old}/cassandra_streaming/smutation.avpr
  34. 0  {examples → old}/cassandra_streaming/streamer.sh
  35. 0  {examples → old}/cassandra_streaming/struct_loader.rb
  36. 0  {examples → old}/cassandra_streaming/tuning.textile
  37. 0  {examples → old}/keystore/cassandra_batch_test.rb
  38. 0  {examples → old}/keystore/conditional_outputter_example.rb
View
32 CHANGELOG.textile
@@ -1,3 +1,35 @@
+h2. Wukong v2.0.0
+
+h4. Important changes
+
+* Passing options to streamers is now deprecated. Use @Settings@ instead.
+
+* Streamer by default has a periodic monitor that logs (to STDERR by default) every 10_000 lines or 30 seconds
+
+* Examples cleaned up, should all run
+
+h4. Simplified syntax
+
+* you can now pass Script.new an *instance* of Streamer to use as mapper or reducer
+* Adding an experimental sugar:
+
+ <pre>
+ #!/usr/bin/env ruby
+ require 'wukong/script'
+
+ LineStreamer.map do |line|
+ emit line.reverse
+ end.run
+ </pre>
+
+ Note that you can now tweet a wukong script.
+
+* It's now recommended that at the top of a wukong script you say
+ <pre>
+ require 'wukong/script'
+ </pre>
+ Among other benefits, this lets you refer to wukong streamers without prefix.
+
h2. Wukong v1.5.4
* EMR support now works very well
View
70 README.textile
@@ -19,18 +19,6 @@ The **main documentation** lives on the "Wukong Pages.":http://mrflip.github.com
* Wukong is licensed under the "Apache License":http://mrflip.github.com/wukong/LICENSE.html (same as Hadoop)
* "More info":http://mrflip.github.com/wukong/moreinfo.html
-h2. Imminent Changes
-
-I'm pushing to release "Wukong 3.0 the actual 1.0 release".
-
-* For reducing/uniqing, a notion of mutable_fields and immutable_fields and extrinsic_fields: two objects compare the same/differently if their mutable fields compare the same/differently
-* Methods on TypedStruct to
-
- * Make to_flat(false) the default, with the sort_fields / partition_fields defaulting to 2 each and very prominently documented
- * Standardize the notion that wukong classes have a "key"; by default, it will be to_a.first for Structs/TypedStructs. This shouldn't break anything.
- * May make some things that are derived classes into mixin'ed modules
- * Will probably change the name of AccumulatingReducer into just Accumulator, and have all Accumulator-derived classes include Accumulator; I'll make sure the old names continue to work though.
-
h2. Help!
@@ -193,6 +181,64 @@ You'd end up with
@newman @elaine @jerry @kramer
</code></pre>
+h2. Gotchas
+
+h4. RecordStreamer dies on blank lines with "wrong number of arguments"
+
+If your lines don't always have a full complement of fields, and you define #process() to take fixed named arguments, then ruby will complain when some of them don't show up:
+
+<pre>
+ class MyUnhappyMapper < Wukong::Streamer::RecordStreamer
+ # this will fail if the line has more or fewer than 3 fields:
+ def process x, y, z
+ p [x, y, z]
+ end
+ end
+</pre>
+
+The cleanest way I know to fix this is with recordize, which you should recall always returns an array of fields:
+
+<pre>
+ class MyHappyMapper < Wukong::Streamer::RecordStreamer
+ # extracts three fields always; any missing fields are nil, any extra fields discarded
+ # @example
+ # recordize("a") # ["a", nil, nil]
+ # recordize("a\t\b\tc") # ["a", "b", "c"]
+ # recordize("a\t\b\tc\td") # ["a", "b", "c"]
+ def recordize raw_record
+ x, y, z = super(raw_record)
+ [x, y, z]
+ end
+
+ # Now all lines produce exactly three args
+ def process x, y, z
+ p [x, y, z]
+ end
+ end
+</pre>
+
+If you want to preserve any extra fields, use the extra argument to #split():
+
+<pre>
+ class MyMoreThanHappyMapper < Wukong::Streamer::RecordStreamer
+ # extracts three fields always; any missing fields are nil, the final field will contain a tab-separated string of all trailing fields
+ # @example
+ # recordize("a") # ["a", nil, nil]
+ # recordize("a\t\b\tc") # ["a", "b", "c"]
+ # recordize("a\t\b\tc\td") # ["a", "b", "c\td"]
+ def recordize raw_record
+ x, y, z = split(raw_record, "\t", 3)
+ [x, y, z]
+ end
+
+ # Now all lines produce exactly three args
+ def process x, y, z
+ p [x, y, z]
+ end
+ end
+</pre>
+
+
h2. Why is it called Wukong?
Hadoop, as you may know, is "named after a stuffed elephant.":http://en.wikipedia.org/wiki/Hadoop Since Wukong was started by the "infochimps":http://infochimps.org team, we needed a simian analog. A Monkey King who journeyed to the land of the Elephant seems to fit the bill:
View
28 Rakefile
@@ -32,18 +32,24 @@ rescue LoadError
puts "Jeweler (or a dependency) not available. Install it with: gem install jeweler"
end
-require 'spec/rake/spectask'
-Spec::Rake::SpecTask.new(:spec) do |spec|
- spec.libs << 'lib' << 'spec'
- spec.spec_files = FileList['spec/**/*_spec.rb']
-end
-Spec::Rake::SpecTask.new(:rcov) do |spec|
- spec.libs << 'lib' << 'spec'
- spec.pattern = 'spec/**/*_spec.rb'
- spec.rcov = true
+begin
+ require 'spec/rake/spectask'
+ Spec::Rake::SpecTask.new(:spec) do |spec|
+ spec.libs << 'lib' << 'spec'
+ spec.spec_files = FileList['spec/**/*_spec.rb']
+ end
+ Spec::Rake::SpecTask.new(:rcov) do |spec|
+ spec.libs << 'lib' << 'spec'
+ spec.pattern = 'spec/**/*_spec.rb'
+ spec.rcov = true
+ end
+ task :spec => :check_dependencies
+ task :default => :spec
+rescue LoadError
+ task :spec do
+ abort "rspec is not available. In order to run rspec, you must: sudo gem install rspec"
+ end
end
-task :spec => :check_dependencies
-task :default => :spec
begin
require 'reek/rake_task'
View
8 TODO.textile
@@ -1,13 +1,5 @@
-
-
-
* add GEM_PATH to hadoop_recycle_env
-* Hadoop_command function received an array for the input_path parameter
-
** We should be able to specify comma *or* space separated paths; the last
space-separated path in Settings.rest becomes the output file, the others are
used as the input_file list.
-
-* Make configliere Settings and streamer_instance.options() be the same
- thing. (instead of almost-but-confusingly-not-always the same thing).
View
56 examples/count_keys.rb
@@ -1,56 +0,0 @@
-#!/usr/bin/env ruby
-$: << File.dirname(__FILE__)+'/../lib'
-require 'wukong'
-require 'wukong/streamer/count_keys'
-require 'wukong/streamer/count_lines'
-
-#
-#
-class CountKeysReducer < Wukong::Streamer::CountLines
- #
- # Taken from the actionpack Rails component ('action_view/helpers/number_helper')
- #
- # Formats a +number+ with grouped thousands using +delimiter+. You
- # can customize the format using optional <em>delimiter</em> and <em>separator</em> parameters.
- # * <tt>delimiter</tt> - Sets the thousands delimiter, defaults to ","
- # * <tt>separator</tt> - Sets the separator between the units, defaults to "."
- #
- # number_with_delimiter(12345678) => 12,345,678
- # number_with_delimiter(12345678.05) => 12,345,678.05
- # number_with_delimiter(12345678, ".") => 12.345.678
- def number_with_delimiter(number, delimiter=",", separator=".")
- begin
- parts = number.to_s.split('.')
- parts[0].gsub!(/(\d)(?=(\d\d\d)+(?!\d))/, "\\1#{delimiter}")
- parts.join separator
- rescue
- number
- end
- end
-
- # Override to look nice
- def formatted_count item, key_count
- key_count_str = number_with_delimiter(key_count.to_i)
- "%-25s\t%12s" % [item, key_count_str]
- end
-end
-
-#
-class CountKeysScript < Wukong::Script
- def map_command
- # Use `cut` to extract the first field
- %Q{ cut -d"\t" -f1 }
- end
-
- #
- # There's just the one field
- #
- def default_options
- super.merge :sort_fields => 1
- end
-end
-
-# Executes the script when run from command line
-if __FILE__ == $0
- CountKeysScript.new(nil, CountKeysReducer).run
-end
View
57 examples/count_keys_at_mapper.rb
@@ -1,57 +0,0 @@
-#!/usr/bin/env ruby
-$: << File.dirname(__FILE__)+'/../lib'
-require 'wukong'
-
-#
-#
-module CountKeys
- #
- class Mapper < Wukong::Streamer::Base
- attr_accessor :keys_count
- def initialize *args
- self.keys_count = {}
- end
- def process key, *args
- key.gsub!(/-.*/, '') # kill off the slug
- self.keys_count[key] ||= 0
- self.keys_count[key] += 1
- end
- def stream *args
- super *args
- self.keys_count.each do |key, count|
- emit [key, count].to_flat
- end
- end
- end
- # Identity Mapper
- class Reducer < Wukong::Streamer::AccumulatingReducer
- attr_accessor :key_count
- require 'active_support'
- require 'action_view/helpers/number_helper'; include ActionView::Helpers::NumberHelper
-
- # Override to look nice
- def formatted_count item, key_count
- key_count_str = number_with_delimiter(key_count.to_i, :delimiter => ',')
- "%-25s\t%12s" % [item, key_count_str]
- end
- def start! *args
- self.key_count = 0
- end
- def accumulate key, count
- self.key_count += count.to_i
- end
- def finalize
- yield formatted_count(key, key_count)
- end
- end
-
- #
- class Script < Wukong::Script
- # There's just the one field
- def default_options
- super.merge :sort_fields => 1, :reduce_tasks => 1
- end
- end
-end
-
-CountKeys::Script.new(CountKeys::Mapper, CountKeys::Reducer).run
View
35 examples/network_graph/breadth_first_search.rb
@@ -1,6 +1,6 @@
#!/usr/bin/env ruby
-$: << ENV['WUKONG_PATH']
-require 'wukong'
+$: << File.dirname(__FILE__)+'/../lib'
+require 'wukong/script'
#
# Use this script to do a Breadth-First Search (BFS) of a graph.
@@ -9,19 +9,18 @@
# ./make_paths --head=[path_in_key] --tail=[path_out_key] --out_rsrc=[combined_path_key]
#
# For example, given an edge list in the file '1path.tsv' that looks like
-# 1path n1 n2
-# 1path n1 n3
+# 1path n1 n2
+# 1path n1 n3
# ... and so forth ...
# you can run
# for t in 1 2 3 4 5 6 7 8 9 ; do next=$((t+1)) ; time cat 1path.tsv "${t}path.tsv" | ./make_paths.rb --map --head="1path" --tail="${t}path" | sort -u | ./make_paths.rb --reduce --out_rsrc="${next}path" | sort -u > "${next}path.tsv" ; done
# to do a 9-deep breadth-first search.
#
module Gen1HoodEdges
- class Mapper < Wukong::Streamer::Base
- attr_accessor :head, :tail
- def initialize options
- self.head = options[:head]
- self.tail = options[:tail]
+ class Mapper < Wukong::Streamer::RecordStreamer
+ def initialize
+ @head = Settings[:head]
+ @tail = Settings[:tail]
end
def process rsrc, *nodes
yield [ nodes.last, 'i', nodes[0..-2] ] if (rsrc == self.head)
@@ -37,8 +36,8 @@ def process rsrc, *nodes
#
class Reducer < Wukong::Streamer::AccumulatingReducer
attr_accessor :paths_in, :out_rsrc
- def initialize options
- self.out_rsrc = options[:out_rsrc]
+ def initialize
+ self.out_rsrc = Settings[:out_rsrc]
end
# clear the list of incoming paths
def start! *args
@@ -63,17 +62,11 @@ def get_key mid, *_
mid
end
end
-
- class Script < Wukong::Script
- def default_options
- super.merge :sort_fields => 2, :partition_fields => 1
- end
- end
-
end
# Execute the script
-Gen1HoodEdges::Script.new(
+Wukong.run(
Gen1HoodEdges::Mapper,
- Gen1HoodEdges::Reducer
- ).run
+ Gen1HoodEdges::Reducer,
+ :sort_fields => 2, :partition_fields => 1
+ )
View
35 examples/network_graph/gen_multi_edge.rb
@@ -2,7 +2,6 @@
require 'rubygems'
$: << File.dirname(__FILE__)+'/../../lib'
require 'wukong'
-require 'wukong/models/graph'; include Wukong::Models
#
# Takes any number of flavors of directed edge with the form
@@ -88,17 +87,27 @@ def finalize
yield self.multi_edge
end
end
+end
- #
- # Sort on the first two keys: each @[src, dest]@ pair winds up at the same
- # reducer.
- #
- class Script < Wukong::Script
- def default_options
- super.merge :sort_fields => 2
- end
- end
+Edge = TypedStruct.new(
+ [:src, Integer],
+ [:dest, Integer]
+ )
- # Execute the script
- Script.new(Mapper, Reducer).run
-end
+MultiEdge = TypedStruct.new(
+ [:src, Integer],
+ [:dest, Integer],
+ [:a_follows_b, Integer],
+ [:b_follows_a, Integer],
+ [:a_replies_b, Integer],
+ [:b_replies_a, Integer],
+ [:a_atsigns_b, Integer],
+ [:b_atsigns_a, Integer],
+ [:a_retweets_b, Integer],
+ [:b_retweets_a, Integer],
+ [:a_favorites_b, Integer],
+ [:b_favorites_a, Integer]
+ )
+
+# Execute the script
+Script.new(Mapper, Reducer, :sort_fields => 2).run
View
2  examples/pagerank/pagerank.rb
@@ -1,6 +1,6 @@
#!/usr/bin/env ruby
$: << File.dirname(__FILE__)+'/../../lib'
-require 'wukong'
+require 'wukong/script'
#
#
View
16 examples/pagerank/pagerank_initialize.rb
@@ -1,7 +1,7 @@
#!/usr/bin/env ruby
$: << File.dirname(__FILE__)+'/../../lib'
-require 'wukong'
-require 'wukong/streamer/set_reducer'
+require 'wukong/script'
+require 'wukong/streamer/list_reducer'
module PageRank
class Script < Wukong::Script
@@ -15,10 +15,6 @@ class Script < Wukong::Script
def map_command
%Q{/usr/bin/cut -d"\t" -f2,3}
end
-
- def default_options
- super.merge :extra_args => ' -jobconf io.sort.record.percent=0.25 '
- end
end
#
@@ -28,18 +24,18 @@ def default_options
#
class Reducer < Wukong::Streamer::ListReducer
def accumulate src, dest
- self.values << dest
+ @values << dest
end
# Emit src, initial pagerank, and flattened dests list
def finalize
- self.values = ['dummy'] if self.values.blank?
- yield [key, 1.0, self.values.to_a.join(",")]
+ @values = ['dummy'] if @values.blank?
+ yield [key, 1.0, @values.to_a.join(",")]
end
end
# Execute the script
- Script.new(nil, PageRank::Reducer).run
+ Script.new(nil, PageRank::Reducer, :io_sort_record_percent => 0.25).run
end
View
22 examples/sample_records.rb
@@ -1,7 +1,8 @@
#!/usr/bin/env ruby
$: << File.dirname(__FILE__)+'/../lib'
-require 'rubygems'
-require 'wukong'
+require 'wukong/script'
+
+Settings.define :sampling_fraction, :type => Float, :required => true, :description => "floating-point number between 0 and 1 giving the fraction of lines to emit: at sampling_fraction=1 all records are emitted, at 0 none are."
#
# Probabilistically emit some fraction of record/lines
@@ -15,29 +16,18 @@ class Mapper < Wukong::Streamer::LineStreamer
include Wukong::Streamer::Filter
#
- # floating-point number between 0 and 1 giving the fraction of lines to emit:
- # at sampling_fraction=1 all records are emitted, at 0 none are.
- #
- # Takes its value from a mandatory command-line option
- #
- def sampling_fraction
- @sampling_fraction ||= ( options[:sampling_fraction] && options[:sampling_fraction].to_f ) or
- raise "Please supply a --sampling_fraction= argument, a decimal number between 0 and 1"
- end
-
- #
# randomly decide to emit +sampling_fraction+ fraction of lines
#
def emit? line
- rand < self.sampling_fraction
+ rand < Settings.sampling_fraction
end
end
#
# Executes the script
#
-Wukong::Script.new( Mapper,
+Wukong.run( Mapper,
nil,
:reduce_tasks => 0,
:reuse_jvms => true
- ).run
+ )
View
4 examples/server_logs/apache_log_parser.rb
@@ -1,7 +1,3 @@
-#!/usr/bin/env ruby
-$: << File.dirname(__FILE__)+'/../lib'
-require 'rubygems'
-require 'wukong'
MONTHS = {
'Jan' => '01',
View
5 examples/size.rb
@@ -1,6 +1,6 @@
#!/usr/bin/env ruby
$: << File.dirname(__FILE__)+'/../lib'
-require 'wukong'
+require 'wukong/script'
module Size
#
@@ -56,5 +56,6 @@ def stream *args
# Execute the script
Size::Script.new(
nil,
- Size::Reducer
+ Size::Reducer,
+ :reduce_tasks => 1
).run
View
20 examples/binning_percentile_estimator.rb → examples/stats/binning_percentile_estimator.rb
@@ -1,10 +1,8 @@
#!/usr/bin/env ruby
-
-require 'rubygems'
-require 'wukong'
+$: << File.dirname(__FILE__)+'/../lib'
+require 'wukong/script'
require 'wukong/streamer/count_keys'
-
#
# Ch3ck out dis moist azz code bitches!!
#
@@ -70,14 +68,14 @@ def after_stream
table << "TRSTRANK_TABLE = " << count_bin.inspect
table.close
end
-
+
#
- # Return percentile of a given trstrank for a given follower bracket
+ # Return percentile of a given trstrank for a given follower bracket
#
def percentile bin, rank
- ((count_less_than(bin,rank) + 0.5*frequency_of(bin,rank))/ total_num(bin) )*100.0
+ ((count_less_than(bin,rank) + 0.5*frequency_of(bin,rank))/ total_num(bin) )*100.0
end
-
+
#
# Return the count of values less than rank
#
@@ -119,7 +117,7 @@ def generate_all_pairs bin
big_list.uniq.sort{|x,y| x.first <=> y.first}
end
-
+
#
# Nothing to see here, move along
#
@@ -132,11 +130,11 @@ def interpolate pair1, pair2, dx
num.times do |i|
x = pair1.first + (i+1).to_f*dx
y = m*x + b
- points << [x,y]
+ points << [x,y]
end
points # return an array of pairs
end
-
+
end
Wukong::Script.new(Mapper,Reducer).run
View
4 examples/rank_and_bin.rb → examples/stats/rank_and_bin.rb
@@ -1,6 +1,6 @@
#!/usr/bin/env ruby
-$: << File.dirname(__FILE__)+'/../lib'
-require 'wukong'
+$: << File.dirname(__FILE__)+'/../../lib'
+require 'wukong/script'
require 'wukong/streamer/rank_and_bin_reducer'
#
View
18 examples/store/chunked_store_example.rb
@@ -1,18 +0,0 @@
-#!/usr/bin/env ruby
-require 'rubygems'
-require 'wukong'
-# require 'wukong/store'
-
-require 'configliere'
-Configliere.use :commandline, :define, :config_file
-Settings.read('foo.yaml')
-
-# store = ChunkedFlatFileStore.new(Settings)
-
-100.times do |iter|
- # store.save [iter, Time.now.to_flat].join("\t")
- $stdout.puts [iter, Time.now.to_flat].join("\t")
- sleep 2
-end
-
-
View
25 examples/stupidly_simple_filter.rb
@@ -1,6 +1,6 @@
#!/usr/bin/env ruby
-require 'rubygems'
-require 'wukong'
+$: << File.dirname(__FILE__)+'/../../lib'
+require 'wukong/script'
# Run as (local mode)
#
@@ -15,15 +15,15 @@
# cat input.tsv | ./examples/stupidly_simple_filter.rb --map input.tsv | more
#
-#
-# A very simple mapper -- looks for a regex match in one field,
-# and emits the whole record if the field matches
-#
-class GrepMapper < Wukong::Streamer::RecordStreamer
-
+class Mapper < LineStreamer
+ include Filter
MATCHER = %r{(ford|mercury|saab|mazda|isuzu)}
#
+ # A very simple mapper -- looks for a regex match in one field,
+ # and emits the whole record if the field matches
+ #
+ #
# Given a series of records like:
#
# tweet 123456789 20100102030405 @frank: I'm having a bacon sandwich
@@ -31,13 +31,10 @@ class GrepMapper < Wukong::Streamer::RecordStreamer
#
# emits only the lines matching that regex
#
- def process rsrc, id, timestamp, text, *rest
- yield [rsrc, id, timestamp, text, *rest] if line =~ MATCHER
+ def emit? line
+ MATCHER.match line
end
end
# Execute the script
-Wukong::Script.new(
- GrepMapper,
- nil
- ).run
+Wukong.run(Mapper)
View
52 examples/word_count.rb
@@ -1,6 +1,6 @@
#!/usr/bin/env ruby
require 'rubygems'
-require 'wukong'
+require 'wukong/script'
module WordCount
class Mapper < Wukong::Streamer::LineStreamer
@@ -10,22 +10,22 @@ class Mapper < Wukong::Streamer::LineStreamer
# This is pretty simpleminded:
# * downcase the word
# * Split at any non-alphanumeric boundary, including '_'
- # * However, preserve the special cases of 's or 't at the end of a
+ # * However, preserve the special cases of 's, 'd or 't at the end of a
# word.
#
- # tokenize("Jim's dawg won't hunt: dawg_hunt error #3007a4")
- # # => ["jim's", "dawd", "won't", "hunt", "dawg", "hunt", "error", "3007a4"]
+ # tokenize("Ability is a poor man's wealth #johnwoodenquote")
+ # # => ["ability", "is", "a", "poor", "man's", "wealth", "johnwoodenquote"]
#
def tokenize str
- return [] unless str
+ return [] if str.blank?
str = str.downcase;
# kill off all punctuation except [stuff]'s or [stuff]'t
# this includes hyphens (words are split)
str = str.
gsub(/[^a-zA-Z0-9\']+/, ' ').
- gsub(/(\w)\'([st])\b/, '\1!\2').gsub(/\'/, ' ').gsub(/!/, "'")
+ gsub(/(\w)\'([std])\b/, '\1!\2').gsub(/\'/, ' ').gsub(/!/, "'")
# Busticate at whitespace
- words = str.strip.split(/\s+/)
+ words = str.split(/\s+/)
words.reject!{|w| w.blank? }
words
end
@@ -39,31 +39,13 @@ def process line
end
#
- # Accumulate the sum record-by-record:
+ # You can stack up all the values in a list then sum them at once.
#
- class Reducer0 < Wukong::Streamer::Base
- attr_accessor :key_count
- def process word, count
- @last_word ||= word
- if (@last_word == word)
- self.key_count += 1
- else
- yield [ @last_word, key_count ]
- @last_word = word
- end
- end
- def stream
- emit @last_word, key_count
- end
- end
-
- #
- # You can stack up all the values in a list then sum them at once:
+ # This isn't good style, as it means the whole list is held in memory
#
- require 'active_support/core_ext/enumerable'
class Reducer1 < Wukong::Streamer::ListReducer
def finalize
- yield [ key, values.map(&:last).map(&:to_i).sum ]
+ yield [ key, values.map(&:last).map(&:to_i).inject(0){|x,tot| x+tot } ]
end
end
@@ -71,11 +53,10 @@ def finalize
# A bit kinder to your memory manager: accumulate the sum record-by-record:
#
class Reducer2 < Wukong::Streamer::AccumulatingReducer
- attr_accessor :key_count
- def start!(*args) self.key_count = 0 end
- def accumulate(*args) self.key_count += 1 end
+ def start!(*args) @key_count = 0 end
+ def accumulate(*args) @key_count += 1 end
def finalize
- yield [ key, key_count ]
+ yield [ key, @key_count ]
end
end
@@ -85,11 +66,10 @@ def finalize
require 'wukong/streamer/count_keys'
class Reducer3 < Wukong::Streamer::CountKeys
end
-
end
# Execute the script
-Wukong::Script.new(
+Wukong.run(
WordCount::Mapper,
- WordCount::Reducer1
- ).run
+ WordCount::Reducer
+ )
View
4 lib/wukong/script.rb
@@ -128,8 +128,8 @@ def initialize mapper, reducer=nil, extra_options={}
Settings.resolve!
@options = Settings
options.merge extra_options
- @mapper = (case mapper when Class then mapper.new(options) when nil then nil else mapper ; end)
- @reducer = (case reducer when Class then reducer.new(options) when nil then nil else reducer ; end)
+ @mapper = (case mapper when Class then mapper.new when nil then nil else mapper ; end)
+ @reducer = (case reducer when Class then reducer.new when nil then nil else reducer ; end)
@output_path = options.rest.pop
@input_paths = options.rest.reject(&:blank?)
if (input_paths.blank? || output_path.blank?) && (not options[:dry_run]) && (not ['map', 'reduce'].include?(run_mode))
View
4 lib/wukong/streamer/filter.rb
@@ -12,8 +12,8 @@ module Filter
#
# Subclass and re-define the emit? method
#
- def process *record, &block
- yield record if emit?(record)
+ def process *record
+ yield record if emit?(*record)
end
end
end
View
6 lib/wukong/streamer/list_reducer.rb
@@ -8,13 +8,13 @@ class ListReducer < Wukong::Streamer::AccumulatingReducer
# start with an empty list
def start! *args
- self.values = []
+ @values = []
end
# aggregate all records.
# note that this accumulates the full *record* -- key, value, everything.
def accumulate *record
- self.values << record
+ @values << record
end
# emit the key and all records, tab-separated
@@ -24,7 +24,7 @@ def accumulate *record
# values)
#
def finalize
- yield [key, values.to_flat.join(";")].flatten
+ yield [key, @values.to_flat.join(";")].flatten
end
end
end
View
0  examples/cassandra_streaming/avromapper.rb → old/cassandra_streaming/avromapper.rb
File renamed without changes
View
0  ...cassandra_streaming/berlitz_for_cassandra.textile → ...cassandra_streaming/berlitz_for_cassandra.textile
File renamed without changes
View
0  examples/cassandra_streaming/cassandra.avpr → old/cassandra_streaming/cassandra.avpr
File renamed without changes
View
0  ...ssandra_streaming/cassandra_random_partitioner.rb → ...ssandra_streaming/cassandra_random_partitioner.rb
File renamed without changes
View
0  examples/cassandra_streaming/catter.sh → old/cassandra_streaming/catter.sh
File renamed without changes
View
0  ...assandra_streaming/client_interface_notes.textile → ...assandra_streaming/client_interface_notes.textile
File renamed without changes
View
0  examples/cassandra_streaming/client_schema.avpr → old/cassandra_streaming/client_schema.avpr
File renamed without changes
View
0  examples/cassandra_streaming/client_schema.textile → old/cassandra_streaming/client_schema.textile
File renamed without changes
View
0  examples/cassandra_streaming/foofile.avr → old/cassandra_streaming/foofile.avr
File renamed without changes
View
0  examples/cassandra_streaming/pymap.sh → old/cassandra_streaming/pymap.sh
File renamed without changes
View
0  examples/cassandra_streaming/pyreduce.sh → old/cassandra_streaming/pyreduce.sh
File renamed without changes
View
0  examples/cassandra_streaming/smutation.avpr → old/cassandra_streaming/smutation.avpr
File renamed without changes
View
0  examples/cassandra_streaming/streamer.sh → old/cassandra_streaming/streamer.sh
File renamed without changes
View
0  examples/cassandra_streaming/struct_loader.rb → old/cassandra_streaming/struct_loader.rb
File renamed without changes
View
0  examples/cassandra_streaming/tuning.textile → old/cassandra_streaming/tuning.textile
File renamed without changes
View
0  examples/keystore/cassandra_batch_test.rb → old/keystore/cassandra_batch_test.rb
File renamed without changes
View
0  examples/keystore/conditional_outputter_example.rb → old/keystore/conditional_outputter_example.rb
File renamed without changes
Please sign in to comment.
Something went wrong with that request. Please try again.