Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'master' of github.com:mrflip/wukong

  • Loading branch information...
commit c18627bd4d8dbb0912e0d8d5c888fc7432b80a65 2 parents 78eac5a + 7dacf58
Philip (flip) Kromer mrflip authored
Showing with 351 additions and 1,335 deletions.
  1. +32 −0 CHANGELOG.textile
  2. +58 −12 README.textile
  3. +17 −11 Rakefile
  4. +0 −8 TODO.textile
  5. +1 −1  VERSION
  6. +11 −0 bin/setcat
  7. +0 −56 examples/count_keys.rb
  8. +0 −57 examples/count_keys_at_mapper.rb
  9. +14 −21 examples/network_graph/breadth_first_search.rb
  10. +22 −13 examples/network_graph/gen_multi_edge.rb
  11. +1 −1  examples/pagerank/pagerank.rb
  12. +6 −10 examples/pagerank/pagerank_initialize.rb
  13. +6 −16 examples/sample_records.rb
  14. +0 −4 examples/server_logs/apache_log_parser.rb
  15. +3 −2 examples/size.rb
  16. +9 −11 examples/{ → stats}/binning_percentile_estimator.rb
  17. +2 −2 examples/{ → stats}/rank_and_bin.rb
  18. +0 −18 examples/store/chunked_store_example.rb
  19. +11 −14 examples/stupidly_simple_filter.rb
  20. +16 −36 examples/word_count.rb
  21. +7 −3 lib/wukong.rb
  22. +2 −15 lib/wukong/and_pig.rb
  23. +0 −81 lib/wukong/dfs.rb
  24. +0 −122 lib/wukong/keystore/cassandra_conditional_outputter.rb
  25. +0 −24 lib/wukong/keystore/redis_db.rb
  26. +0 −137 lib/wukong/keystore/tyrant_db.rb
  27. +0 −145 lib/wukong/keystore/tyrant_notes.textile
  28. +7 −28 lib/wukong/logger.rb
  29. +0 −25 lib/wukong/models/graph.rb
  30. +0 −7 lib/wukong/monitor.rb
  31. +0 −23 lib/wukong/monitor/chunked_store.rb
  32. +0 −34 lib/wukong/monitor/periodic_logger.rb
  33. +0 −70 lib/wukong/monitor/periodic_monitor.rb
  34. +24 −9 lib/wukong/periodic_monitor.rb
  35. +0 −104 lib/wukong/rdf.rb
  36. +20 −16 lib/wukong/script.rb
  37. +30 −29 lib/wukong/script/hadoop_command.rb
  38. +44 −2 lib/wukong/streamer/base.rb
  39. +0 −61 lib/wukong/streamer/cassandra_streamer.rb
  40. +3 −8 lib/wukong/streamer/count_keys.rb
  41. +0 −26 lib/wukong/streamer/count_lines.rb
  42. +0 −25 lib/wukong/streamer/counting_reducer.rb
  43. +2 −2 lib/wukong/streamer/filter.rb
  44. +3 −3 lib/wukong/streamer/list_reducer.rb
  45. +0 −22 lib/wukong/streamer/preprocess_with_pipe_streamer.rb
  46. +0 −21 lib/wukong/wukong_class.rb
  47. 0  {examples → old}/cassandra_streaming/avromapper.rb
  48. 0  {examples → old}/cassandra_streaming/berlitz_for_cassandra.textile
  49. 0  {examples → old}/cassandra_streaming/cassandra.avpr
  50. 0  {examples → old}/cassandra_streaming/cassandra_random_partitioner.rb
  51. 0  {examples → old}/cassandra_streaming/catter.sh
  52. 0  {examples → old}/cassandra_streaming/client_interface_notes.textile
  53. 0  {examples → old}/cassandra_streaming/client_schema.avpr
  54. 0  {examples → old}/cassandra_streaming/client_schema.textile
  55. 0  {examples → old}/cassandra_streaming/foofile.avr
  56. 0  {examples → old}/cassandra_streaming/pymap.sh
  57. 0  {examples → old}/cassandra_streaming/pyreduce.sh
  58. 0  {examples → old}/cassandra_streaming/smutation.avpr
  59. 0  {examples → old}/cassandra_streaming/streamer.sh
  60. 0  {examples → old}/cassandra_streaming/struct_loader.rb
  61. 0  {examples → old}/cassandra_streaming/tuning.textile
  62. 0  {examples → old}/keystore/cassandra_batch_test.rb
  63. 0  {examples → old}/keystore/conditional_outputter_example.rb
32 CHANGELOG.textile
View
@@ -1,3 +1,35 @@
+h2. Wukong v2.0.0
+
+h4. Important changes
+
+* Passing options to streamers is now deprecated. Use @Settings@ instead.
+
+* Streamer by default has a periodic monitor that logs (to STDERR by default) every 10_000 lines or 30 seconds
+
+* Examples cleaned up, should all run
+
+h4. Simplified syntax
+
+* you can now pass Script.new an *instance* of Streamer to use as mapper or reducer
+* Adding an experimental sugar:
+
+ <pre>
+ #!/usr/bin/env ruby
+ require 'wukong/script'
+
+ LineStreamer.map do |line|
+ emit line.reverse
+ end.run
+ </pre>
+
+ Note that you can now tweet a wukong script.
+
+* It's now recommended that at the top of a wukong script you say
+ <pre>
+ require 'wukong/script'
+ </pre>
+ Among other benefits, this lets you refer to wukong streamers without prefix.
+
h2. Wukong v1.5.4
* EMR support now works very well
70 README.textile
View
@@ -19,18 +19,6 @@ The **main documentation** lives on the "Wukong Pages.":http://mrflip.github.com
* Wukong is licensed under the "Apache License":http://mrflip.github.com/wukong/LICENSE.html (same as Hadoop)
* "More info":http://mrflip.github.com/wukong/moreinfo.html
-h2. Imminent Changes
-
-I'm pushing to release "Wukong 3.0 the actual 1.0 release".
-
-* For reducing/uniqing, a notion of mutable_fields and immutable_fields and extrinsic_fields: two objects compare the same/differently if their mutable fields compare the same/differently
-* Methods on TypedStruct to
-
- * Make to_flat(false) the default, with the sort_fields / partition_fields defaulting to 2 each and very prominently documented
- * Standardize the notion that wukong classes have a "key"; by default, it will be to_a.first for Structs/TypedStructs. This shouldn't break anything.
- * May make some things that are derived classes into mixin'ed modules
- * Will probably change the name of AccumulatingReducer into just Accumulator, and have all Accumulator-derived classes include Accumulator; I'll make sure the old names continue to work though.
-
h2. Help!
@@ -193,6 +181,64 @@ You'd end up with
@newman @elaine @jerry @kramer
</code></pre>
+h2. Gotchas
+
+h4. RecordStreamer dies on blank lines with "wrong number of arguments"
+
+If your lines don't always have a full complement of fields, and you define #process() to take fixed named arguments, then ruby will complain when some of them don't show up:
+
+<pre>
+ class MyUnhappyMapper < Wukong::Streamer::RecordStreamer
+ # this will fail if the line has more or fewer than 3 fields:
+ def process x, y, z
+ p [x, y, z]
+ end
+ end
+</pre>
+
+The cleanest way I know to fix this is with recordize, which you should recall always returns an array of fields:
+
+<pre>
+ class MyHappyMapper < Wukong::Streamer::RecordStreamer
+ # extracts three fields always; any missing fields are nil, any extra fields discarded
+ # @example
+ # recordize("a") # ["a", nil, nil]
+ # recordize("a\t\b\tc") # ["a", "b", "c"]
+ # recordize("a\t\b\tc\td") # ["a", "b", "c"]
+ def recordize raw_record
+ x, y, z = super(raw_record)
+ [x, y, z]
+ end
+
+ # Now all lines produce exactly three args
+ def process x, y, z
+ p [x, y, z]
+ end
+ end
+</pre>
+
+If you want to preserve any extra fields, use the extra argument to #split():
+
+<pre>
+ class MyMoreThanHappyMapper < Wukong::Streamer::RecordStreamer
+ # extracts three fields always; any missing fields are nil, the final field will contain a tab-separated string of all trailing fields
+ # @example
+ # recordize("a") # ["a", nil, nil]
+ # recordize("a\t\b\tc") # ["a", "b", "c"]
+ # recordize("a\t\b\tc\td") # ["a", "b", "c\td"]
+ def recordize raw_record
+ x, y, z = split(raw_record, "\t", 3)
+ [x, y, z]
+ end
+
+ # Now all lines produce exactly three args
+ def process x, y, z
+ p [x, y, z]
+ end
+ end
+</pre>
+
+
h2. Why is it called Wukong?
Hadoop, as you may know, is "named after a stuffed elephant.":http://en.wikipedia.org/wiki/Hadoop Since Wukong was started by the "infochimps":http://infochimps.org team, we needed a simian analog. A Monkey King who journeyed to the land of the Elephant seems to fit the bill:
28 Rakefile
View
@@ -32,18 +32,24 @@ rescue LoadError
puts "Jeweler (or a dependency) not available. Install it with: gem install jeweler"
end
-require 'spec/rake/spectask'
-Spec::Rake::SpecTask.new(:spec) do |spec|
- spec.libs << 'lib' << 'spec'
- spec.spec_files = FileList['spec/**/*_spec.rb']
-end
-Spec::Rake::SpecTask.new(:rcov) do |spec|
- spec.libs << 'lib' << 'spec'
- spec.pattern = 'spec/**/*_spec.rb'
- spec.rcov = true
+begin
+ require 'spec/rake/spectask'
+ Spec::Rake::SpecTask.new(:spec) do |spec|
+ spec.libs << 'lib' << 'spec'
+ spec.spec_files = FileList['spec/**/*_spec.rb']
+ end
+ Spec::Rake::SpecTask.new(:rcov) do |spec|
+ spec.libs << 'lib' << 'spec'
+ spec.pattern = 'spec/**/*_spec.rb'
+ spec.rcov = true
+ end
+ task :spec => :check_dependencies
+ task :default => :spec
+rescue LoadError
+ task :spec do
+ abort "rspec is not available. In order to run rspec, you must: sudo gem install rspec"
+ end
end
-task :spec => :check_dependencies
-task :default => :spec
begin
require 'reek/rake_task'
8 TODO.textile
View
@@ -1,13 +1,5 @@
-
-
-
* add GEM_PATH to hadoop_recycle_env
-* Hadoop_command function received an array for the input_path parameter
-
** We should be able to specify comma *or* space separated paths; the last
space-separated path in Settings.rest becomes the output file, the others are
used as the input_file list.
-
-* Make configliere Settings and streamer_instance.options() be the same
- thing. (instead of almost-but-confusingly-not-always the same thing).
2  VERSION
View
@@ -1 +1 @@
-1.5.4
+2.0.0
11 bin/setcat
View
@@ -0,0 +1,11 @@
+#!/usr/bin/env bash
+
+#
+# This script is useful for debugging. it dumps your environment to STDERR
+# and otherwise runs as `cat`
+#
+
+set >&2
+
+cat
+true
56 examples/count_keys.rb
View
@@ -1,56 +0,0 @@
-#!/usr/bin/env ruby
-$: << File.dirname(__FILE__)+'/../lib'
-require 'wukong'
-require 'wukong/streamer/count_keys'
-require 'wukong/streamer/count_lines'
-
-#
-#
-class CountKeysReducer < Wukong::Streamer::CountLines
- #
- # Taken from the actionpack Rails component ('action_view/helpers/number_helper')
- #
- # Formats a +number+ with grouped thousands using +delimiter+. You
- # can customize the format using optional <em>delimiter</em> and <em>separator</em> parameters.
- # * <tt>delimiter</tt> - Sets the thousands delimiter, defaults to ","
- # * <tt>separator</tt> - Sets the separator between the units, defaults to "."
- #
- # number_with_delimiter(12345678) => 12,345,678
- # number_with_delimiter(12345678.05) => 12,345,678.05
- # number_with_delimiter(12345678, ".") => 12.345.678
- def number_with_delimiter(number, delimiter=",", separator=".")
- begin
- parts = number.to_s.split('.')
- parts[0].gsub!(/(\d)(?=(\d\d\d)+(?!\d))/, "\\1#{delimiter}")
- parts.join separator
- rescue
- number
- end
- end
-
- # Override to look nice
- def formatted_count item, key_count
- key_count_str = number_with_delimiter(key_count.to_i)
- "%-25s\t%12s" % [item, key_count_str]
- end
-end
-
-#
-class CountKeysScript < Wukong::Script
- def map_command
- # Use `cut` to extract the first field
- %Q{ cut -d"\t" -f1 }
- end
-
- #
- # There's just the one field
- #
- def default_options
- super.merge :sort_fields => 1
- end
-end
-
-# Executes the script when run from command line
-if __FILE__ == $0
- CountKeysScript.new(nil, CountKeysReducer).run
-end
57 examples/count_keys_at_mapper.rb
View
@@ -1,57 +0,0 @@
-#!/usr/bin/env ruby
-$: << File.dirname(__FILE__)+'/../lib'
-require 'wukong'
-
-#
-#
-module CountKeys
- #
- class Mapper < Wukong::Streamer::Base
- attr_accessor :keys_count
- def initialize *args
- self.keys_count = {}
- end
- def process key, *args
- key.gsub!(/-.*/, '') # kill off the slug
- self.keys_count[key] ||= 0
- self.keys_count[key] += 1
- end
- def stream *args
- super *args
- self.keys_count.each do |key, count|
- emit [key, count].to_flat
- end
- end
- end
- # Identity Mapper
- class Reducer < Wukong::Streamer::AccumulatingReducer
- attr_accessor :key_count
- require 'active_support'
- require 'action_view/helpers/number_helper'; include ActionView::Helpers::NumberHelper
-
- # Override to look nice
- def formatted_count item, key_count
- key_count_str = number_with_delimiter(key_count.to_i, :delimiter => ',')
- "%-25s\t%12s" % [item, key_count_str]
- end
- def start! *args
- self.key_count = 0
- end
- def accumulate key, count
- self.key_count += count.to_i
- end
- def finalize
- yield formatted_count(key, key_count)
- end
- end
-
- #
- class Script < Wukong::Script
- # There's just the one field
- def default_options
- super.merge :sort_fields => 1, :reduce_tasks => 1
- end
- end
-end
-
-CountKeys::Script.new(CountKeys::Mapper, CountKeys::Reducer).run
35 examples/network_graph/breadth_first_search.rb
View
@@ -1,6 +1,6 @@
#!/usr/bin/env ruby
-$: << ENV['WUKONG_PATH']
-require 'wukong'
+$: << File.dirname(__FILE__)+'/../lib'
+require 'wukong/script'
#
# Use this script to do a Breadth-First Search (BFS) of a graph.
@@ -9,19 +9,18 @@
# ./make_paths --head=[path_in_key] --tail=[path_out_key] --out_rsrc=[combined_path_key]
#
# For example, given an edge list in the file '1path.tsv' that looks like
-# 1path n1 n2
-# 1path n1 n3
+# 1path n1 n2
+# 1path n1 n3
# ... and so forth ...
# you can run
# for t in 1 2 3 4 5 6 7 8 9 ; do next=$((t+1)) ; time cat 1path.tsv "${t}path.tsv" | ./make_paths.rb --map --head="1path" --tail="${t}path" | sort -u | ./make_paths.rb --reduce --out_rsrc="${next}path" | sort -u > "${next}path.tsv" ; done
# to do a 9-deep breadth-first search.
#
module Gen1HoodEdges
- class Mapper < Wukong::Streamer::Base
- attr_accessor :head, :tail
- def initialize options
- self.head = options[:head]
- self.tail = options[:tail]
+ class Mapper < Wukong::Streamer::RecordStreamer
+ def initialize
+ @head = Settings[:head]
+ @tail = Settings[:tail]
end
def process rsrc, *nodes
yield [ nodes.last, 'i', nodes[0..-2] ] if (rsrc == self.head)
@@ -37,8 +36,8 @@ def process rsrc, *nodes
#
class Reducer < Wukong::Streamer::AccumulatingReducer
attr_accessor :paths_in, :out_rsrc
- def initialize options
- self.out_rsrc = options[:out_rsrc]
+ def initialize
+ self.out_rsrc = Settings[:out_rsrc]
end
# clear the list of incoming paths
def start! *args
@@ -63,17 +62,11 @@ def get_key mid, *_
mid
end
end
-
- class Script < Wukong::Script
- def default_options
- super.merge :sort_fields => 2, :partition_fields => 1
- end
- end
-
end
# Execute the script
-Gen1HoodEdges::Script.new(
+Wukong.run(
Gen1HoodEdges::Mapper,
- Gen1HoodEdges::Reducer
- ).run
+ Gen1HoodEdges::Reducer,
+ :sort_fields => 2, :partition_fields => 1
+ )
35 examples/network_graph/gen_multi_edge.rb
View
@@ -2,7 +2,6 @@
require 'rubygems'
$: << File.dirname(__FILE__)+'/../../lib'
require 'wukong'
-require 'wukong/models/graph'; include Wukong::Models
#
# Takes any number of flavors of directed edge with the form
@@ -88,17 +87,27 @@ def finalize
yield self.multi_edge
end
end
+end
- #
- # Sort on the first two keys: each @[src, dest]@ pair winds up at the same
- # reducer.
- #
- class Script < Wukong::Script
- def default_options
- super.merge :sort_fields => 2
- end
- end
+Edge = TypedStruct.new(
+ [:src, Integer],
+ [:dest, Integer]
+ )
- # Execute the script
- Script.new(Mapper, Reducer).run
-end
+MultiEdge = TypedStruct.new(
+ [:src, Integer],
+ [:dest, Integer],
+ [:a_follows_b, Integer],
+ [:b_follows_a, Integer],
+ [:a_replies_b, Integer],
+ [:b_replies_a, Integer],
+ [:a_atsigns_b, Integer],
+ [:b_atsigns_a, Integer],
+ [:a_retweets_b, Integer],
+ [:b_retweets_a, Integer],
+ [:a_favorites_b, Integer],
+ [:b_favorites_a, Integer]
+ )
+
+# Execute the script
+Script.new(Mapper, Reducer, :sort_fields => 2).run
2  examples/pagerank/pagerank.rb
View
@@ -1,6 +1,6 @@
#!/usr/bin/env ruby
$: << File.dirname(__FILE__)+'/../../lib'
-require 'wukong'
+require 'wukong/script'
#
#
16 examples/pagerank/pagerank_initialize.rb
View
@@ -1,7 +1,7 @@
#!/usr/bin/env ruby
$: << File.dirname(__FILE__)+'/../../lib'
-require 'wukong'
-require 'wukong/streamer/set_reducer'
+require 'wukong/script'
+require 'wukong/streamer/list_reducer'
module PageRank
class Script < Wukong::Script
@@ -15,10 +15,6 @@ class Script < Wukong::Script
def map_command
%Q{/usr/bin/cut -d"\t" -f2,3}
end
-
- def default_options
- super.merge :extra_args => ' -jobconf io.sort.record.percent=0.25 '
- end
end
#
@@ -28,18 +24,18 @@ def default_options
#
class Reducer < Wukong::Streamer::ListReducer
def accumulate src, dest
- self.values << dest
+ @values << dest
end
# Emit src, initial pagerank, and flattened dests list
def finalize
- self.values = ['dummy'] if self.values.blank?
- yield [key, 1.0, self.values.to_a.join(",")]
+ @values = ['dummy'] if @values.blank?
+ yield [key, 1.0, @values.to_a.join(",")]
end
end
# Execute the script
- Script.new(nil, PageRank::Reducer).run
+ Script.new(nil, PageRank::Reducer, :io_sort_record_percent => 0.25).run
end
22 examples/sample_records.rb
View
@@ -1,7 +1,8 @@
#!/usr/bin/env ruby
$: << File.dirname(__FILE__)+'/../lib'
-require 'rubygems'
-require 'wukong'
+require 'wukong/script'
+
+Settings.define :sampling_fraction, :type => Float, :required => true, :description => "floating-point number between 0 and 1 giving the fraction of lines to emit: at sampling_fraction=1 all records are emitted, at 0 none are."
#
# Probabilistically emit some fraction of record/lines
@@ -15,29 +16,18 @@ class Mapper < Wukong::Streamer::LineStreamer
include Wukong::Streamer::Filter
#
- # floating-point number between 0 and 1 giving the fraction of lines to emit:
- # at sampling_fraction=1 all records are emitted, at 0 none are.
- #
- # Takes its value from a mandatory command-line option
- #
- def sampling_fraction
- @sampling_fraction ||= ( options[:sampling_fraction] && options[:sampling_fraction].to_f ) or
- raise "Please supply a --sampling_fraction= argument, a decimal number between 0 and 1"
- end
-
- #
# randomly decide to emit +sampling_fraction+ fraction of lines
#
def emit? line
- rand < self.sampling_fraction
+ rand < Settings.sampling_fraction
end
end
#
# Executes the script
#
-Wukong::Script.new( Mapper,
+Wukong.run( Mapper,
nil,
:reduce_tasks => 0,
:reuse_jvms => true
- ).run
+ )
4 examples/server_logs/apache_log_parser.rb
View
@@ -1,7 +1,3 @@
-#!/usr/bin/env ruby
-$: << File.dirname(__FILE__)+'/../lib'
-require 'rubygems'
-require 'wukong'
MONTHS = {
'Jan' => '01',
5 examples/size.rb
View
@@ -1,6 +1,6 @@
#!/usr/bin/env ruby
$: << File.dirname(__FILE__)+'/../lib'
-require 'wukong'
+require 'wukong/script'
module Size
#
@@ -56,5 +56,6 @@ def stream *args
# Execute the script
Size::Script.new(
nil,
- Size::Reducer
+ Size::Reducer,
+ :reduce_tasks => 1
).run
20 examples/binning_percentile_estimator.rb → examples/stats/binning_percentile_estimator.rb
View
@@ -1,10 +1,8 @@
#!/usr/bin/env ruby
-
-require 'rubygems'
-require 'wukong'
+$: << File.dirname(__FILE__)+'/../lib'
+require 'wukong/script'
require 'wukong/streamer/count_keys'
-
#
# Ch3ck out dis moist azz code bitches!!
#
@@ -70,14 +68,14 @@ def after_stream
table << "TRSTRANK_TABLE = " << count_bin.inspect
table.close
end
-
+
#
- # Return percentile of a given trstrank for a given follower bracket
+ # Return percentile of a given trstrank for a given follower bracket
#
def percentile bin, rank
- ((count_less_than(bin,rank) + 0.5*frequency_of(bin,rank))/ total_num(bin) )*100.0
+ ((count_less_than(bin,rank) + 0.5*frequency_of(bin,rank))/ total_num(bin) )*100.0
end
-
+
#
# Return the count of values less than rank
#
@@ -119,7 +117,7 @@ def generate_all_pairs bin
big_list.uniq.sort{|x,y| x.first <=> y.first}
end
-
+
#
# Nothing to see here, move along
#
@@ -132,11 +130,11 @@ def interpolate pair1, pair2, dx
num.times do |i|
x = pair1.first + (i+1).to_f*dx
y = m*x + b
- points << [x,y]
+ points << [x,y]
end
points # return an array of pairs
end
-
+
end
Wukong::Script.new(Mapper,Reducer).run
4 examples/rank_and_bin.rb → examples/stats/rank_and_bin.rb
View
@@ -1,6 +1,6 @@
#!/usr/bin/env ruby
-$: << File.dirname(__FILE__)+'/../lib'
-require 'wukong'
+$: << File.dirname(__FILE__)+'/../../lib'
+require 'wukong/script'
require 'wukong/streamer/rank_and_bin_reducer'
#
18 examples/store/chunked_store_example.rb
View
@@ -1,18 +0,0 @@
-#!/usr/bin/env ruby
-require 'rubygems'
-require 'wukong'
-# require 'wukong/store'
-
-require 'configliere'
-Configliere.use :commandline, :define, :config_file
-Settings.read('foo.yaml')
-
-# store = ChunkedFlatFileStore.new(Settings)
-
-100.times do |iter|
- # store.save [iter, Time.now.to_flat].join("\t")
- $stdout.puts [iter, Time.now.to_flat].join("\t")
- sleep 2
-end
-
-
25 examples/stupidly_simple_filter.rb
View
@@ -1,6 +1,6 @@
#!/usr/bin/env ruby
-require 'rubygems'
-require 'wukong'
+$: << File.dirname(__FILE__)+'/../../lib'
+require 'wukong/script'
# Run as (local mode)
#
@@ -15,15 +15,15 @@
# cat input.tsv | ./examples/stupidly_simple_filter.rb --map input.tsv | more
#
-#
-# A very simple mapper -- looks for a regex match in one field,
-# and emits the whole record if the field matches
-#
-class GrepMapper < Wukong::Streamer::RecordStreamer
-
+class Mapper < LineStreamer
+ include Filter
MATCHER = %r{(ford|mercury|saab|mazda|isuzu)}
#
+ # A very simple mapper -- looks for a regex match in one field,
+ # and emits the whole record if the field matches
+ #
+ #
# Given a series of records like:
#
# tweet 123456789 20100102030405 @frank: I'm having a bacon sandwich
@@ -31,13 +31,10 @@ class GrepMapper < Wukong::Streamer::RecordStreamer
#
# emits only the lines matching that regex
#
- def process rsrc, id, timestamp, text, *rest
- yield [rsrc, id, timestamp, text, *rest] if line =~ MATCHER
+ def emit? line
+ MATCHER.match line
end
end
# Execute the script
-Wukong::Script.new(
- GrepMapper,
- nil
- ).run
+Wukong.run(Mapper)
52 examples/word_count.rb
View
@@ -1,6 +1,6 @@
#!/usr/bin/env ruby
require 'rubygems'
-require 'wukong'
+require 'wukong/script'
module WordCount
class Mapper < Wukong::Streamer::LineStreamer
@@ -10,22 +10,22 @@ class Mapper < Wukong::Streamer::LineStreamer
# This is pretty simpleminded:
# * downcase the word
# * Split at any non-alphanumeric boundary, including '_'
- # * However, preserve the special cases of 's or 't at the end of a
+ # * However, preserve the special cases of 's, 'd or 't at the end of a
# word.
#
- # tokenize("Jim's dawg won't hunt: dawg_hunt error #3007a4")
- # # => ["jim's", "dawd", "won't", "hunt", "dawg", "hunt", "error", "3007a4"]
+ # tokenize("Ability is a poor man's wealth #johnwoodenquote")
+ # # => ["ability", "is", "a", "poor", "man's", "wealth", "johnwoodenquote"]
#
def tokenize str
- return [] unless str
+ return [] if str.blank?
str = str.downcase;
# kill off all punctuation except [stuff]'s or [stuff]'t
# this includes hyphens (words are split)
str = str.
gsub(/[^a-zA-Z0-9\']+/, ' ').
- gsub(/(\w)\'([st])\b/, '\1!\2').gsub(/\'/, ' ').gsub(/!/, "'")
+ gsub(/(\w)\'([std])\b/, '\1!\2').gsub(/\'/, ' ').gsub(/!/, "'")
# Busticate at whitespace
- words = str.strip.split(/\s+/)
+ words = str.split(/\s+/)
words.reject!{|w| w.blank? }
words
end
@@ -39,31 +39,13 @@ def process line
end
#
- # Accumulate the sum record-by-record:
+ # You can stack up all the values in a list then sum them at once.
#
- class Reducer0 < Wukong::Streamer::Base
- attr_accessor :key_count
- def process word, count
- @last_word ||= word
- if (@last_word == word)
- self.key_count += 1
- else
- yield [ @last_word, key_count ]
- @last_word = word
- end
- end
- def stream
- emit @last_word, key_count
- end
- end
-
- #
- # You can stack up all the values in a list then sum them at once:
+ # This isn't good style, as it means the whole list is held in memory
#
- require 'active_support/core_ext/enumerable'
class Reducer1 < Wukong::Streamer::ListReducer
def finalize
- yield [ key, values.map(&:last).map(&:to_i).sum ]
+ yield [ key, values.map(&:last).map(&:to_i).inject(0){|x,tot| x+tot } ]
end
end
@@ -71,11 +53,10 @@ def finalize
# A bit kinder to your memory manager: accumulate the sum record-by-record:
#
class Reducer2 < Wukong::Streamer::AccumulatingReducer
- attr_accessor :key_count
- def start!(*args) self.key_count = 0 end
- def accumulate(*args) self.key_count += 1 end
+ def start!(*args) @key_count = 0 end
+ def accumulate(*args) @key_count += 1 end
def finalize
- yield [ key, key_count ]
+ yield [ key, @key_count ]
end
end
@@ -85,11 +66,10 @@ def finalize
require 'wukong/streamer/count_keys'
class Reducer3 < Wukong::Streamer::CountKeys
end
-
end
# Execute the script
-Wukong::Script.new(
+Wukong.run(
WordCount::Mapper,
- WordCount::Reducer1
- ).run
+ WordCount::Reducer
+ )
10 lib/wukong.rb
View
@@ -1,13 +1,17 @@
+require 'configliere'; Configliere.use :define
require 'wukong/extensions'
require 'wukong/datatypes'
+require 'wukong/periodic_monitor'
require 'wukong/logger'
-require 'wukong/bad_record'
+autoload :BadRecord, 'wukong/bad_record'
autoload :TypedStruct, 'wukong/typed_struct'
-require 'configliere'; Configliere.use :define
module Wukong
- autoload :Dfs, 'wukong/dfs'
autoload :Script, 'wukong/script'
autoload :Streamer, 'wukong/streamer'
autoload :Store, 'wukong/store'
autoload :FilenamePattern, 'wukong/filename_pattern'
+
+ def self.run mapper, reducer=nil, options={}
+ Wukong::Script.new(mapper, reducer, options).run
+ end
end
17 lib/wukong/and_pig.rb
View
@@ -2,19 +2,13 @@ module Enumerable
#
# Convert an array of values to a string representing it as a pig tuple
#
- # def to_pig_tuple
- # map{|*vals| '(' + vals.join(',') + ')' }
- # end
-
- #
- # Convert an array to a pig tuple
- #
def to_pig_tuple
'(' + self.join(',') + ')'
end
+
#
# Convert an array of values to a string pig format
- # Delegates to to_pig_tuple -- see also to_pig_bag
+ # see also to_pig_bag
#
def to_pig *args
to_pig_tuple *args
@@ -23,13 +17,6 @@ def to_pig *args
#
# Convert an array of values to a string representing it as a pig bag
#
- # def to_pig_bag
- # '{' + self.join(',') + '}'
- # end
-
- #
- # Convert and array of values to a string representing it as a pig bag
- #
def to_pig_bag
'{' + self.map{|*vals| vals.to_pig_tuple}.join(",") + '}'
end
81 lib/wukong/dfs.rb
View
@@ -1,81 +0,0 @@
-require 'time' # ain't it always that way
-module Wukong
- module Dfs
- def self.list_files dfs_path
- Log.info{ "DFS: listing #{dfs_path}" }
- listing = `hadoop dfs -ls #{dfs_path}`.split("\n").reject{|ls_line| ls_line =~ /Found \d+ items/i}
- listing.map{|ls_line| HFile.new_from_ls(ls_line)}
- end
-
- #
- # FIXME -- this will fail if multiple files in a listing have the
- # same basename. Sorry.
- #
- def self.compare_listings src_files, dest_files, &block
- src_files.sort.each do |src_file|
- dest_file = dest_files.find{|df| File.basename(src_file) == df.basename }
- case
- when (! dest_file) then yield :missing, src_file, nil
- when (! dest_file.kinda_equal(src_file)) then yield :differ, src_file, dest_file
- else yield :same, src_file, dest_file
- end
- end
- end
-
- HFile = TypedStruct.new(
- [:mode_str, String],
- [:i_count, String],
- [:owner, String],
- [:group, String],
- [:size, Integer],
- [:date, Bignum],
- [:path, String]
- )
- HFile.class_eval do
- def self.new_from_ls ls_line
- mode, ic, o, g, sz, dt, tm, path = ls_line.chomp.split(/\s+/)
- date = Time.parse("#{dt} #{tm}").utc.to_flat
- new mode, ic.to_i, o, g, sz.to_i, date, path
- end
- def dirname
- @dirname ||= File.dirname(path)
- end
- def basename
- @basename ||= File.basename(path)
- end
- #
- # Two files are kinda_equal if they match in size and if
- # the hdfs version is later than the filesystem version.
- #
- def kinda_equal file
- (self.size == File.size(file)) # && (self.date >= File.mtime(file).utc.to_flat)
- end
- def to_s
- to_a.join("\t")
- end
-
- #
- # These will be very slow.
- # If some kind soul will integrate JRuby callouts the bards shall
- # celebrate your name evermore.
- #
-
- # rename the file on the HDFS
- def mv new_filename
- self.class.run_dfs_command :mv, path, new_filename
- end
-
- def self.mkdir dirname
- run_dfs_command :mkdir, dirname
- end
- def self.mkdir_p(*args) self.mkdir *args ; end # HDFS is always -p
-
- def self.run_dfs_command *args
- cmd = 'hadoop dfs -'+ args.flatten.compact.join(" ")
- Log.debug{ "DFS: Running #{cmd}" }
- Log.info{ `#{cmd} 2>&1`.gsub(/[\r\n\t]+/, " ") }
- end
-
- end
- end
-end
122 lib/wukong/keystore/cassandra_conditional_outputter.rb
View
@@ -1,122 +0,0 @@
-
-#
-# For a stream process that sees a significant number of duplicated heavyweight
-# objects, it may be better to deduplicate them midflight (rather than, say,
-# using a reducer to effectively `cat | sort | uniq` the data).
-#
-# This uses a cassandra key-value store to track unique IDs and prevent output
-# of any record already present in the database. (Why cassandra? Because we use
-# it in production. Might be nice to rewrite this example against redis or
-# TokyoTyrant or something less demanding.)
-#
-# Things you have to do:
-#
-# * Override the conditional_output_key method to distinguish identical records
-# * Define a constant CASSANDRA_KEYSPACE giving the Cassandra keyspace you're working in
-# * (Optionally) override conditional_output_key_column
-#
-# * In your cassandra storage-conf.xml, add a column family to your keyspace:
-#
-# <Keyspace Name="CorpusAnalysis">
-# <KeysCachedFraction>0.01</KeysCachedFraction>
-#
-# <!-- Added for CassandraConditionalOutputter -->
-# <ColumnFamily CompareWith="UTF8Type" Name="LetterPairMapperKeys" />
-#
-# <ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
-# <ReplicationFactor>1</ReplicationFactor>
-# <EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
-# </Keyspace>
-#
-# In this example, the CASSANDRA_KEYSPACE is 'CorpusAnalysis' and the
-# conditional_output_key_column is 'LetterPairMapperKeys'
-#
-# @example
-# Given
-# tweet 123456789 20100102030405 @frank: I'm having a bacon sandwich
-# tweet 24601 20100104136526 @jerry, I'm having your baby
-# tweet 8675309 20100102030405 I find pastrami to be the most sensual of the salted, cured meats.
-# tweet 24601 20100104136526 @jerry, I'm having your baby
-# tweet 1137 20100119234532 These pretzels are making me thirsty
-# ....
-# will emit:
-# tweet 123456789 20100102030405 @frank: I'm having a bacon sandwich
-# tweet 24601 20100104136526 @jerry, I'm having your baby
-# tweet 8675309 20100102030405 I find pastrami to be the most sensual of the salted, cured meats.
-# tweet 24601 20100104136526 @jerry, I'm having your baby
-# tweet 1137 20100119234532 These pretzels are making me thirsty
-# ....
-#
-module CassandraConditionalOutputter
-
- #
- # A unique key for the given record. If an object with
- # that key has been seen, it won't be re-emitted.
- #
- # You will almost certainly want to override this method in your subclass. Be
- # sure that the key is a string, and is encoded properly (Cassandra likes to
- # strip whitespace from keys, for instance).
- #
- def conditional_output_key record
- record.to_s
- end
-
- #
- # Checks each record against the key cache
- # Swallows records already there,
- #
- #
- def emit record, &block
- key = conditional_output_key(record)
- if should_emit?(record)
- set_key(key, {'t' => record.timestamp})
- super record
- end
- end
-
- # Default. Emit record if its key is not already contained
- # in the key-value store. Overwrite this as necessary
- def should_emit? record
- key = conditional_output_key(record)
- !has_key?(key)
- end
-
- # Check for presence of key in the cache
- def has_key? key
- not key_cache.get(conditional_output_key_column, key).blank?
- end
-
- # register key in the key_cache
- def set_key key, data={'t' => '0'}
- key_cache.insert(conditional_output_key_column, key, data)
- end
-
- # nuke key from the key_cache
- def remove_key key
- key_cache.remove(conditional_output_key_column, key)
- end
-
- #
- # Key cache implementation in Cassandra
- #
-
- # The cache
- def key_cache
- @key_cache ||= Cassandra.new(CASSANDRA_KEYSPACE)
- end
-
- # The column to use for the key cache. By default, the class name plus 'Keys',
- # but feel free to override.
- #
- # @example
- #
- # class FooMapper < Wukong::Streamer::RecordStreamer
- # include ConditionalOutputter
- # end
- # FooMapper.new.conditional_output_key_column
- # # => 'FooMapperKeys'
- #
- def conditional_output_key_column
- self.class.to_s+'Keys'
- end
-end
24 lib/wukong/keystore/redis_db.rb
View
@@ -1,24 +0,0 @@
-#!/usr/bin/env ruby
-require 'rubygems' ;
-require 'redis' ;
-
-RDB = Redis.new(:host => 'localhost', :port => 6379)
-
-start_time = Time.now.utc.to_f ;
-iter=0;
-
-
-$stdin.each do |line|
- _r, id, scat, sn, pr, fo, fr, st, fv, crat, sid, full = line.chomp.split("\t");
- iter+=1 ;
- break if iter > 20_000_000
-
- if (iter % 10_000 == 0)
- elapsed = (Time.now.utc.to_f - start_time)
- puts "%-20s\t%7d\t%7d\t%7.2f\t%7.2f" % [sn, fo, iter, elapsed, iter.to_f/elapsed]
- end
-
- RDB['sn:'+sn.downcase] = id unless sn.empty?
- RDB['sid:'+sid] = id unless sid.empty?
- RDB['uid:'+id] = [sn,sid,crat,scat].join(',') unless id.empty?
-end
137 lib/wukong/keystore/tyrant_db.rb
View
@@ -1,137 +0,0 @@
-require 'tokyo_tyrant'
-require 'tokyo_tyrant/balancer'
-
-# -- Installing
-# make sure tokyocabinet and tokyotyrant are installed (cehf recipe)
-# make sure ruby-tokyotyrant is installed
-# ldconfig
-# mkdir -p /data/db/ttyrant /var/run/tyrant /var/log/tyrant
-#
-# -- Starting
-# ttserver -port 12001 -thnum 96 -tout 3 -pid /var/run/tyrant/screen_names.pid -kl -log /var/log/tyrant/user_ids.tch '/data/db/ttyrant/user_ids.tch#bnum=100000000#opts=l#rcnum=50000#xmsiz=268435456'
-# ttserver -port 12002 -thnum 96 -tout 3 -pid /var/run/tyrant/screen_names.pid -kl -log /var/log/tyrant/screen_names.tch '/data/db/ttyrant/screen_names.tch#bnum=100000000#opts=l#rcnum=50000#xmsiz=268435456'
-# ttserver -port 12003 -thnum 96 -tout 3 -pid /var/run/tyrant/screen_names.pid -kl -log /var/log/tyrant/search_ids.tch '/data/db/ttyrant/search_ids.tch#bnum=100000000#opts=l#rcnum=50000#xmsiz=268435456'
-# ttserver -port 12004 -thnum 96 -tout 3 -pid /var/run/tyrant/screen_names.pid -kl -log /var/log/tyrant/tweets_parsed.tch '/data/db/ttyrant/tweets_parsed.tch#bnum=800000000#opts=l#rcnum=50000#xmsiz=268435456'
-# ttserver -port 12005 -thnum 96 -tout 3 -pid /var/run/tyrant/screen_names.pid -kl -log /var/log/tyrant/users_parsed.tch '/data/db/ttyrant/users_parsed.tch#bnum=100000000#opts=l#rcnum=50000#xmsiz=268435456'
-#
-# -- Monitoring
-# tcrmgr inform -port $port -st $hostname
-# active conns:
-# lsof -i | grep ttserver | wc -l
-# netstat -a -W | grep ':120' | ruby -ne 'puts $_.split(/ +/)[3 .. 4].join("\t")' | sort | cut -d: -f1-2 | uniq -c | sort -n
-# use db.rnum for most lightweight ping method
-#
-# -- Tuning
-# http://korrespondence.blogspot.com/2009/09/tokyo-tyrant-tuning-parameters.html
-# http://capttofu.livejournal.com/23381.html
-# http://groups.google.com/group/tokyocabinet-users/browse_thread/thread/5a46ee04006a791c#
-# opts "l" of large option (the size of the database can be larger than 2GB by using 64-bit bucket array.), "d" of Deflate option (each record is compressed with Deflate encoding), "b" of BZIP2 option, "t" of TCBS option
-# bnum number of elements of the bucket array. If it is not more than 0, the default value is specified. The default value is 131071 (128K). Suggested size of the bucket array is about from 0.5 to 4 times of the number of all records to be stored.
-# rcnum maximum number of records to be cached. If it is not more than 0, the record cache is disabled. It is disabled by default.
-# xmsiz size of the extra mapped memory. If it is not more than 0, the extra mapped memory is disabled. The default size is 67108864 (64MB).
-# apow size of record alignment by power of 2. If it is negative, the default value is specified. The default value is 4 standing for 2^4=16.
-# fpow maximum number of elements of the free block pool by power of 2. If it is negative, the default value is specified. The default value is 10 standing for 2^10=1024.
-# dfunit unit step number of auto defragmentation. If it is not more than 0, the auto defragmentation is disabled. It is disabled by default.
-# mode "w" of writer, "r" of reader,"c" of creating,"t" of truncating ,"e" of no locking,"f" of non-blocking lock
-#
-# -- Links
-# http://1978th.net/tokyocabinet/spex-en.html
-# http://groups.google.com/group/tokyocabinet-users/browse_thread/thread/3bd2a93322c09eec#
-
-
-class TokyoTyrant::Balancer::Base
- def initialize(hostnames = [], timeout = 20.0, should_retry = true)
- @servers = hostnames.map do |hostname|
- host, port = hostname.split(':')
- klass.new(host, port.to_i, timeout, should_retry)
- end
- # yes, for some reason it's spelled 'Constistent' here
- # DO NOT fix it because it goes deep...
- @ring = TokyoTyrant::ConstistentHash.new(servers)
- end
-
- def close
- @servers.all?{ |server| server.close rescue nil}
- end
-end
-
-module TokyoDbConnection
- class TyrantDb
- attr_reader :dataset
- DB_SERVERS = [
- '10.194.101.156',
- '10.196.73.156',
- '10.196.75.47',
- '10.242.217.140',
- ].freeze unless defined?(TokyoDbConnection::TyrantDb::DB_SERVERS)
-
- DB_PORTS = {
- :tw_screen_names => 12002,
- :tw_search_ids => 12003,
- #
- :tw_user_info => 14000,
- :tw_wordbag => 14101,
- :tw_influence => 14102,
- :tw_trstrank => 14103,
- :tw_conversation => 14104,
- #
- :tw_screen_names2 => 12004,
- :tw_search_ids2 => 12005,
- #
- :tw_user_info2 => 14200,
- :tw_wordbag2 => 14201,
- :tw_influence2 => 14202,
- :tw_trstrank2 => 14203,
- :tw_conversation2 => 14204,
- :tw_strong_links2 => 14205,
- :tw_word_stats2 => 14210,
- #
- :ip_geo_census => 14400,
- } unless defined?(TokyoDbConnection::TyrantDb::DB_PORTS)
-
- def initialize dataset
- @dataset = dataset
- end
-
- def db
- return @db if @db
- port = DB_PORTS[dataset] or raise "Don't know how to reach dataset #{dataset}"
- @db = TokyoTyrant::Balancer::DB.new(DB_SERVERS.map{|s| s+':'+port.to_s})
- end
-
- def [](*args) ; db[*args] ; end
- def size(*args) ; db.size(*args) ; end
- def vanish!(*args) ; db.vanish(*args) ; end
-
- #
- # Insert into the cassandra database with default settings
- #
- def insert key, value
- begin
- db.putnr(key, value)
- rescue StandardError => e ; handle_error("Insert #{[key, value].inspect}", e); end
- end
-
- def insert_array key, value
- insert(key, value.join(','))
- end
-
- def get *args
- begin
- db.get(*args)
- rescue StandardError => e ; handle_error("Fetch #{args.inspect}", e); end
- end
-
- def handle_error action, e
- Log.warn "#{action} failed: #{e} #{e.backtrace.join("\t")}" ;
- invalidate!
- end
-
- def invalidate!
- (@db && @db.close) or warn "Couldn't close #{@db.inspect}"
- @db = nil
- sleep 2
- end
- end
-end
-
145 lib/wukong/keystore/tyrant_notes.textile
View
@@ -1,145 +0,0 @@
-
-# -- Installing
-# make sure tokyocabinet and tokyotyrant are installed (cehf recipe)
-# make sure ruby-tokyotyrant is installed
-# ldconfig
-# mkdir -p /data/db/ttyrant /var/run/tyrant /var/log/tyrant
-#
-# -- Starting
-# ttserver -port 12001 -thnum 96 -tout 3 -pid /var/run/tyrant/screen_names.pid -kl -log /var/log/tyrant/user_ids.tch '/data/db/ttyrant/user_ids.tch#bnum=100000000#opts=l#rcnum=50000#xmsiz=268435456'
-# ttserver -port 12002 -thnum 96 -tout 3 -pid /var/run/tyrant/screen_names.pid -kl -log /var/log/tyrant/screen_names.tch '/data/db/ttyrant/screen_names.tch#bnum=100000000#opts=l#rcnum=50000#xmsiz=268435456'
-# ttserver -port 12003 -thnum 96 -tout 3 -pid /var/run/tyrant/screen_names.pid -kl -log /var/log/tyrant/search_ids.tch '/data/db/ttyrant/search_ids.tch#bnum=100000000#opts=l#rcnum=50000#xmsiz=268435456'
-# ttserver -port 12004 -thnum 96 -tout 3 -pid /var/run/tyrant/screen_names.pid -kl -log /var/log/tyrant/tweets_parsed.tch '/data/db/ttyrant/tweets_parsed.tch#bnum=800000000#opts=l#rcnum=50000#xmsiz=268435456'
-# ttserver -port 12005 -thnum 96 -tout 3 -pid /var/run/tyrant/screen_names.pid -kl -log /var/log/tyrant/users_parsed.tch '/data/db/ttyrant/users_parsed.tch#bnum=100000000#opts=l#rcnum=50000#xmsiz=268435456'
-#
-# -- Monitoring
-# tcrmgr inform -port $port -st $hostname
-# active conns:
-# lsof -i | grep ttserver | wc -l
-# netstat -a -W | grep ':120' | ruby -ne 'puts $_.split(/ +/)[3 .. 4].join("\t")' | sort | cut -d: -f1-2 | uniq -c | sort -n
-# use db.rnum for most lightweight ping method
-#
-# -- Tuning
-# http://korrespondence.blogspot.com/2009/09/tokyo-tyrant-tuning-parameters.html
-# http://capttofu.livejournal.com/23381.html
-# http://groups.google.com/group/tokyocabinet-users/browse_thread/thread/5a46ee04006a791c#
-# opts "l" of large option (the size of the database can be larger than 2GB by using 64-bit bucket array.), "d" of Deflate option (each record is compressed with Deflate encoding), "b" of BZIP2 option, "t" of TCBS option
-# bnum number of elements of the bucket array. If it is not more than 0, the default value is specified. The default value is 131071 (128K). Suggested size of the bucket array is about from 0.5 to 4 times of the number of all records to be stored.
-# rcnum maximum number of records to be cached. If it is not more than 0, the record cache is disabled. It is disabled by default.
-# xmsiz size of the extra mapped memory. If it is not more than 0, the extra mapped memory is disabled. The default size is 67108864 (64MB).
-# apow size of record alignment by power of 2. If it is negative, the default value is specified. The default value is 4 standing for 2^4=16.
-# fpow maximum number of elements of the free block pool by power of 2. If it is negative, the default value is specified. The default value is 10 standing for 2^10=1024.
-# dfunit unit step number of auto defragmentation. If it is not more than 0, the auto defragmentation is disabled. It is disabled by default.
-# mode "w" of writer, "r" of reader,"c" of creating,"t" of truncating ,"e" of no locking,"f" of non-blocking lock
-#
-# -- Links
-# http://1978th.net/tokyocabinet/spex-en.html
-# http://groups.google.com/group/tokyocabinet-users/browse_thread/thread/3bd2a93322c09eec#
-# Performance limits: http://groups.google.com/group/tokyocabinet-users/browse_thread/thread/3bd2a93322c09eec#
-
-
-h2. Tyrant: ttserver
-
- ttdb="test"
- ttserver -port 12009 -thnum 96 \
- -dmn -pid /var/run/tyrant-${ttdb}.pid
- -ulog /mnt/tmp/ttyrant/tyrant-$[ttdb}.ulog -ulim 268435456 -uas \
- -log /var/log/ttyrant/tyrant-${ttdb}.log \
- "/data/db/ttyrant/${ttdb}.tch#bnum=200000000#opts=l#rcnum=100000#xmsiz=536870912"
-
- can also add host, and umask out to be read-only
-
- * -host name : specify the host name or the address of the server. By default, every network address is bound.
- * -port num : specify the port number. By default, it is 1978.
- * -thnum num : specify the number of worker threads. By default, it is 8.
- * -tout num : specify the timeout of each session in seconds. By default, no timeout is specified.
- * -dmn : work as a daemon process.
- * -pid path : output the process ID into the file.
- * -kl : kill the existing process if the process ID file is detected.
- * -log path : output log messages into the file.
- * -ld : log debug messages also.
- * -le : log error messages only.
- * -ulog path : specify the update log directory.
- * -ulim num : specify the limit size of each update log file.
- * -uas : use asynchronous I/O for the update log.
- * -sid num : specify the server ID.
- * -mhost name : specify the host name of the replication master server.
- * -mport num : specify the port number of the replication master server.
- * -rts path : specify the replication time stamp file.
- * -rcc : check consistency of replication.
- * -skel name : specify the name of the skeleton database library.
- * -mul num : specify the division number of the multiple database mechanism.
- * -ext path : specify the script language extension file.
- * -extpc name period : specify the function name and the calling period of a periodic command.
- * -mask expr : specify the names of forbidden commands.
- * -unmask expr : specify the names of allowed commands.
-
-
-h2. From "Wolfgang Gassler":http://groups.google.com/group/tokyocabinet-users/browse_thread/thread/5a46ee04006a791c#
-
-On Sat, Dec 05, 2009 at 09:32:20PM +0100, Wolfgang Gassler wrote:
-> Hi,
-
-> did anybody look up some of the folowing parameters in the code or can
-> explain them in detail? I just have a guess what they really mean and
-> the short description at the docu homepage
-> http://korrespondence.blogspot.com/2009/09/tokyo-tyrant-tuning-parame...
-> explain them very roughly. Also the already posted blog post
-> http://korrespondence.blogspot.com/2009/09/tokyo-tyrant-tuning-parame...
-> couldn't help.
-
-this is what I gleaned from reading the source code for the hash database
-format ( tchdb.c and tchdb.h ).
-
-> xmsiz
-
-On a TC Hash database, from the beginning of the file, to the end of the bucket
-section, all of that space is mmap'd. Setting 'xmsiz' sets the minimum amount
-of space that is mmap'd. Since 67108864 is the default, this means, that an a
-minimum, the first 64MiB of the file will be mmap'd.
-
-If the header size, plus the bucket region is greater than 'xmsize', then xmsiz
-appers to have no affect.
-
-> apow
-
-On a TC Hash database, 'apow' determines on what byte alignment each record will
-sit. 'apow' is a power of 2. This means that when apow is 4 ( the default for
-hash databases) all records in the database are aligned on a 16 byte boundary,
-in the database file.
-
-This means that every record will take up at a minumum 16 bytes of space, and
-all records are padded to a length that is a multiple of 16.
-
-> fpow
-
-On a TC Hash database, 'fpow' determines the maximum number of free blocks that
-can exist in the free block pool. This is also a power-of-2 parameter so with
-the default in a Hash database of 10, this means that there can be a maximum
-of 2^10, or 1024 free blocks in the database.
-
-Free blocks come into existence when records are deleted from the database
-and their space in the db file is up for reuse. If you never delete an
-item from the database, you will never have any free blocks.
-
-> dfunit
-
-On a TC Hash database, 'dfunit' describes how defragmentation takes place.
-Every time a free block is created a 'dfcnt' is incremented. When 'dfcnt'
-is greater than 'dfunit' and 'dfunit' is greater than 0, defragmentation
-takes place.
-
-I don't know precisely what defragmentation does in TC. A cursory look
-at 'tchdbdefragimpl', the function implementing defagmentation for hash
-databases, it looks like it moves records around filling up free blocks
-in the hash db with real records from the end of the file and then making the
-file smaller if possible.
-
-Basically it moves records around minimizing dead space in the file.
-
-Again, defragmentation will only take place if 'dfunit' has a positive
-value and you remove records from the db creating free blocks.
-
-enjoy,
-
--jeremy
35 lib/wukong/logger.rb
View
@@ -13,37 +13,15 @@ module Wukong
# I, [2009-07-26T19:58:46-05:00 #12332]: Up to 2000 char message
#
def self.logger
- @logger ||= default_ruby_logger
- end
-
- #
- # Log4r logger, set up to produce tab-delimited (and thus, wukong|hadoop
- # friendly) output lines
- #
- def self.default_log4r_logger logger_handle='wukong'
- require 'log4r'
- lgr = Log4r::Logger.new logger_handle
- outputter = Log4r::Outputter.stderr
- # Define timestamp formatter method
- ::Time.class_eval do def utc_iso8601() utc.iso8601 ; end ; end
- # 2009-07-25T00:12:05Z INFO PID\t
- outputter.formatter = Log4r::PatternFormatter.new(
- :pattern => "%d %.4l #{Process.pid}\t%.2000m",
- :date_method => :utc_iso8601
- )
- lgr.outputters = outputter
- lgr
- end
-
- def self.default_ruby_logger
+ return @logger if @logger
require 'logger'
- logger = Logger.new STDERR
- logger.instance_eval do
+ @logger = Logger.new STDERR
+ @logger.instance_eval do
def dump *args
debug args.inspect
end
end
- logger
+ @logger
end
def self.logger= logger
@@ -54,6 +32,7 @@ def self.logger= logger
#
# A convenient logger.
#
-# Define NO_WUKONG_LOG (or define Log yourself) to prevent its creation
+# define Log yourself to prevent its creation
#
-Log = Wukong.logger unless (defined?(Log) || defined?(NO_WUKONG_LOG))
+Log = Wukong.logger unless defined?(Log)
+
25 lib/wukong/models/graph.rb
View
@@ -1,25 +0,0 @@
-
-module Wukong
- module Models
- Edge = TypedStruct.new(
- [:src, Integer],
- [:dest, Integer]
- )
-
- MultiEdge = TypedStruct.new(
- [:src, Integer],
- [:dest, Integer],
- [:a_follows_b, Integer],
- [:b_follows_a, Integer],
- [:a_replies_b, Integer],
- [:b_replies_a, Integer],
- [:a_atsigns_b, Integer],
- [:b_atsigns_a, Integer],
- [:a_retweets_b, Integer],
- [:b_retweets_a, Integer],
- [:a_favorites_b, Integer],
- [:b_favorites_a, Integer]
- )
-
- end
-end
7 lib/wukong/monitor.rb
View
@@ -1,7 +0,0 @@
-module Monkeyshines
- module Monitor
- autoload :PeriodicMonitor, 'monkeyshines/monitor/periodic_monitor'
- autoload :PeriodicLogger, 'monkeyshines/monitor/periodic_logger'
- end
-end
-
23 lib/wukong/monitor/chunked_store.rb
View
@@ -1,23 +0,0 @@
-require 'monkeyshines/monitor/periodic_monitor'
-module Monkeyshines
- module Monitor
- module ChunkedStore
- attr_accessor :file_pattern
- def initialize file_pattern
- self.file_pattern = file_pattern
- super file_pattern.make
- end
-
- def close_and_reopen
- close
- self.filename = file_pattern.make
- dump_file
- end
-
- def save *args
- chunk_monitor.periodically{ close_rename_and_open }
- super *args
- end
- end
- end
-end
34 lib/wukong/monitor/periodic_logger.rb
View
@@ -1,34 +0,0 @@
-module Monkeyshines
- module Monitor
-
- #
- # Emits a log line but only every +iter_interval+ calls or +time_interval+
- # lapse.
- #
- # Since the contents of the block aren't called until the criteria are met,
- # you can put relatively expensive operations in the log without killing
- # your iteration time.
- #
- class PeriodicLogger < PeriodicMonitor
- #
- # Call with a block that returns a string or array to log.
- # If you return
- #
- # Ex: log if it has been at least 5 minutes since last announcement:
- #
- # periodic_logger = Monkeyshines::Monitor::PeriodicLogger.new(:time => 300)
- # loop do
- # # ... stuff ...
- # periodic_logger.periodically{ [morbenfactor, crunkosity, exuberance] }
- # end
- #
- def periodically &block
- super do
- now = Time.now.utc.to_f
- result = [ "%10d"%iter, "%7.1f"%since, "%7.1f"%inst_rate(now), (block ? block.call : nil) ].flatten.compact
- Log.info result.join("\t")
- end
- end
- end
- end
-end
70 lib/wukong/monitor/periodic_monitor.rb
View
@@ -1,70 +0,0 @@
-module Wukong::Monitor
- #
- # Accepts a lightweight call every iteration.
- #
- # Once either a time or an iteration criterion is met, executes the block
- # and resets the timer until next execution.
- #
- # Note that the +time_interval+ is measured *excution to execution* and not
- # in multiples of iter_interval. Say I set a time_interval of 300s, and
- # happen to iterate at 297s and 310s after start. Then the monitor will
- # execute at 310s, and the next execution will happen on or after 610s.
- #
- # Also note that when *either* criterion is met, *both* criteria are
- # reset. Say I set a time interval of 300s and an +iter_interval+ of 10_000;
- # and that at 250s I reach iteration 10_000. Then the monitor will execute
- # on or after 20_000 iteration or 550s, whichever happens first.
- #
-class PeriodicMonitor
- attr_accessor :time_interval, :iter_interval
- attr_accessor :last_time, :current_iter, :iter, :started_at
-
- def initialize options={}
- self.started_at = Time.now.utc.to_f
- self.last_time = started_at
- self.iter = 0
- self.current_iter = 0
- self.time_interval = options[:time]
- self.iter_interval = options[:iters]
- end
-
- # True if more than +iter_interval+ has elapsed since last execution.
- def enough_iterations?
- iter % iter_interval == 0 if iter_interval
- end
-
- # True if more than +time_interval+ has elapsed since last execution.
- def enough_time? now
- (now - last_time) > time_interval if time_interval
- end
-
- # Time since monitor was created
- def since
- Time.now.utc.to_f - started_at
- end
- # Overall iterations per second
- def rate
- iter.to_f / since.to_f
- end
- # "Instantaneous" iterations per second
- def inst_rate now
- current_iter.to_f / (now-last_time).to_f
- end
-
- #
- # if the interval conditions are met, executes block; otherwise just does
- # bookkeeping and returns.
- #
- def periodically &block
- self.iter += 1
- self.current_iter += 1
- now = Time.now.utc.to_f
- if enough_iterations? || enough_time?(now)
- block.call(iter, (now-last_time))
- self.last_time = now
- self.current_iter = 0
- end
- end
-end
-
-end
33 lib/wukong/periodic_monitor.rb
View
@@ -1,4 +1,5 @@
-Settings.define :log_interval, :default => 1000, :type => Integer, :description => 'How many iterations between log statements'
+Settings.define :log_interval, :default => 10_000, :type => Integer, :description => 'How many iterations between log statements'
+Settings.define :log_seconds, :default => 30, :type => Integer, :description => 'How many seconds between log statements'
#
# Periodic monitor
@@ -9,40 +10,48 @@
class PeriodicMonitor
attr_reader :iter, :start_time, :options
attr_accessor :interval
+ attr_accessor :time_interval
def initialize extra_options={}
- @options = {}
+ @options = {}
@options.deep_merge!( extra_options || {} )
- @iter = 0
- @start_time = now
- @interval = (options[:log_interval] || Settings[:log_interval]).to_i
- @interval = 1000 unless @interval >= 1
+ @iter = 0
+ @start_time = now
+ @last_report = @start_time
+ @interval = (options[:log_interval] || Settings[:log_interval]).to_i
+ @interval = 1000 unless @interval >= 1
+ @time_interval = (options[:log_seconds] || Settings[:log_seconds]).to_i
end
def periodically *args, &block
incr!
if ready?
+ @last_report = Time.now
if block
block.call(iter, *args)
else
- $stderr.puts progress(*args)
+ self.emit progress(*args)
end
end
end
+ def emit log_line
+ Log.info log_line
+ end
+
def incr!
@iter += 1
end
def ready?
- iter % @interval == 0
+ (iter % @interval == 0) || (since > time_interval)
end
def progress *stuff
[
"%15d" % iter,
"%7.1f"% elapsed_time, "sec",
- "%7.1f"%(iter.to_f / elapsed_time), "/sec",
+ "%7.1f"% rate, "/sec",
now.to_flat,
*stuff
].flatten.join("\t")
@@ -51,7 +60,13 @@ def progress *stuff
def elapsed_time
now - start_time
end
+ def since
+ now - @last_report
+ end
def now
Time.now.utc
end
+ def rate
+ iter.to_f / elapsed_time
+ end
end
104 lib/wukong/rdf.rb
View
@@ -1,104 +0,0 @@
-module Wukong
- #
- # Dump wukong object as RDF triples:
- #
- # <key attr val module Wukong
- #
- # Dump wukong object as RDF triples:
- #
- # <key> <attr> <val> # <extra>
- #
- # Each element of the triple is XML encoded such that it contains no tab,
- # newline or carriage returns, and the three are tab-separated. Any extra
- # fields -- reification info, for instance -- are appended as a comment.
- #
- # This makes the result not only a valid RDF triple file but perfectly
- # palatable to Wukong for further processing.
- #
- module Rdf
-
- #
- # RDF-formatted date
- #
- def self.encode_datetime dt
- DateTime.parse_safely(dt).xmlschema
- end
-
- #
- # Emit a component (subject or object) with the right semantic encoding
- #
- # Use :boolskip if a false property should just be left out.
- #
- def rdf_component val, type
- case type
- when :tweet then %Q{<http://twitter.com/statuses/show/#{val}.xml>}
- when :user then %Q{<http://twitter.com/users/show/#{val}.xml>}
- when :bool then ((!val) || (val==0) || (val=="0")) ? '"false"^^<xsd:boolean>' : '"true"^^<xsd:boolean>'
- when :boolskip then ((!val) || (val==0) || (val=="0")) ? nil : '"true"^^<xsd:boolean>'
- when :int then %Q{"#{val.to_i}"^^<xsd:integer>}
- when :date then %Q{"#{TwitterRdf.encode_datetime(val)}"^^<xsd:dateTime>}
- when :str then %Q{"#{val}"}
- else raise "Don't know how to encode #{type}"
- end
- end
-
- #
- # Express relationship (predicate) in RDF
- #
- def rdf_pred pred
- case pred
- when :created_at then %Q{<http://twitter.com/##{pred}>}
- else %Q{<http://twitter.com/##{pred}>}
- end
- end
-
- #
- # RDF Triple string for the given (subject, object, predicate)
- # http://www.w3.org/TR/rdf-testcases/#ntriples
- #
- def self.rdf_triple subj, pred, obj, comment=nil
- comment = "\t# " + comment.to_s unless comment.blank?
- %Q{%-55s\t%-39s\t%-23s\t.%s} % [subj, pred, obj, comment]
- end
-
- def mutable?(attr)
- false
- end
-
- #
- # Extract [subject, predicate, object, (extra)] tuples.
- #
- # (extra) is set to +scraped at+ for #mutable? attributes, blank otherwise.
- #
- def to_rdf3_tuples
- members_with_types.map do |attr, type|
- next if self[attr].blank?
- subj = rdf_resource
- pred = rdf_pred(attr)
- obj = rdf_component(self[attr], type) or next
- comment = scraped_at if mutable?(attr)
- [subj, pred, obj, comment]
- end.compact
- end
-
- #
- # Convert an object to an rdf triple.
- #
- # Appends scraped at to #mutable? attributes
- #
- def to_rdf3
- to_rdf3_tuples.map do |tuple|
- self.class.rdf_triple tuple
- end.join("\n")
- end
-
- end
-end
->
- #
- #
- module Rdf
- def to_rdf
- end
- end
-end
36 lib/wukong/script.rb
View
@@ -1,8 +1,10 @@
require 'pathname'
+require 'configliere' ; Configliere.use(:commandline, :env_var, :define)
+require 'wukong'
require 'wukong/script/hadoop_command'
require 'wukong/script/local_command'
-require 'configliere' ; Configliere.use(:commandline, :env_var, :define)
require 'rbconfig' # for uncovering ruby_interpreter_path
+require 'wukong/streamer' ; include Wukong::Streamer
module Wukong
# == How to run a Wukong script
#
@@ -63,7 +65,7 @@ module Wukong
class Script
include Wukong::HadoopCommand
include Wukong::LocalCommand
- attr_reader :mapper_klass, :reducer_klass, :options
+ attr_reader :mapper, :reducer, :options
attr_reader :input_paths, :output_path
# ---------------------------------------------------------------------------
@@ -122,12 +124,12 @@ class Script
# end
# MyScript.new(MyMapper, nil).run
#
- def initialize mapper_klass, reducer_klass=nil, extra_options={}
+ def initialize mapper, reducer=nil, extra_options={}
Settings.resolve!
- @options = Settings.dup
- options.merge! extra_options
- @mapper_klass = mapper_klass
- @reducer_klass = reducer_klass
+ @options = Settings
+ options.merge extra_options
+ @mapper = (case mapper when Class then mapper.new when nil then nil else mapper ; end)
+ @reducer = (case reducer when Class then reducer.new when nil then nil else reducer ; end)
@output_path = options.rest.pop
@input_paths = options.rest.reject(&:blank?)
if (input_paths.blank? || output_path.blank?) && (not options[:dry_run]) && (not ['map', 'reduce'].include?(run_mode))
@@ -142,8 +144,8 @@ def initialize mapper_klass, reducer_klass=nil, extra_options={}
#
def run
case run_mode
- when 'map' then mapper_klass.new(self.options).stream
- when 'reduce' then reducer_klass.new(self.options).stream
+ when 'map' then mapper.stream
+ when 'reduce' then reducer.stream
when 'local' then execute_local_workflow
when 'cassandra' then execute_hadoop_workflow
when 'hadoop', 'mapred' then execute_hadoop_workflow
@@ -172,9 +174,9 @@ def run_mode
# In local mode, it's given to the system() call
#
def mapper_commandline
- if mapper_klass
- # "#{ruby_interpreter_path} #{this_script_filename} --map " + non_wukong_params
- "#{ruby_interpreter_path} #{File.basename(this_script_filename)} --map " + non_wukong_params
+ if mapper
+ "#{ruby_interpreter_path} #{this_script_filename} --map " + non_wukong_params
+ # "#{ruby_interpreter_path} #{File.basename(this_script_filename)} --map " + non_wukong_params
else
options[:map_command]
end
@@ -186,8 +188,9 @@ def mapper_commandline
# In local mode, it's given to the system() call
#
def reducer_commandline
- if reducer_klass
- "#{ruby_interpreter_path} #{File.basename(this_script_filename)} --reduce " + non_wukong_params
+ if reducer
+ "#{ruby_interpreter_path} #{this_script_filename} --reduce " + non_wukong_params
+ # "#{ruby_interpreter_path} #{File.basename(this_script_filename)} --reduce " + non_wukong_params
else
options[:reduce_command]
end
@@ -229,8 +232,9 @@ def execute_command! *args
#
def maybe_overwrite_output_paths! output_path
if (options[:overwrite] || options[:rm]) && (run_mode == 'hadoop')
- Log.info "Removing output file #{output_path}"
- `hdp-rm -r '#{output_path}'`
+ cmd = %Q{#{hadoop_runner} fs -rmr '#{output_path}'}
+ Log.info "Removing output file #{output_path}: #{cmd}"
+ puts `#{cmd}`
end
end
59 lib/wukong/script/hadoop_command.rb
View
@@ -12,27 +12,27 @@ module HadoopCommand
#
# Translate simplified args to their hairy hadoop equivalents
#
- Settings.define :max_node_map_tasks, :jobconf => true, :description => 'mapred.tasktracker.map.tasks.maximum', :wukong => true
- Settings.define :max_node_reduce_tasks, :jobconf => true, :description => 'mapred.tasktracker.reduce.tasks.maximum', :wukong => true
- Settings.define :map_tasks, :jobconf => true, :description => 'mapred.map.tasks', :wukong => true
- Settings.define :reduce_tasks, :jobconf => true, :description => 'mapred.reduce.tasks', :wukong => true
- Settings.define :sort_fields, :jobconf => true, :description => 'stream.num.map.output.key.fields', :wukong => true
- Settings.define :key_field_separator, :jobconf => true, :description => 'map.output.key.field.separator', :wukong => true
- Settings.define :partition_fields, :jobconf => true, :description => 'num.key.fields.for.partition', :wukong => true
- Settings.define :output_field_separator, :jobconf => true, :description => 'stream.map.output.field.separator', :wukong => true
- Settings.define :map_speculative, :jobconf => true, :description => 'mapred.map.tasks.speculative.execution', :wukong => true
- Settings.define :timeout, :jobconf => true, :description => 'mapred.task.timeout', :wukong => true
- Settings.define :reuse_jvms, :jobconf => true, :description => 'mapred.job.reuse.jvm.num.tasks', :wukong => true
- Settings.define :respect_exit_status, :jobconf => true, :description => 'stream.non.zero.exit.is.failure', :wukong => true
Settings.define :io_sort_mb, :jobconf => true, :description => 'io.sort.mb', :wukong => true
Settings.define :io_sort_record_percent, :jobconf => true, :description => 'io.sort.record.percent', :wukong => true
Settings.define :job_name, :jobconf => true, :description => 'mapred.job.name', :wukong => true
- Settings.define :max_reduces_per_node, :jobconf => true, :description => 'mapred.max.reduces.per.node', :wukong => true
- Settings.define :max_reduces_per_cluster,:jobconf => true, :description => 'mapred.max.reduces.per.cluster', :wukong => true
- Settings.define :max_maps_per_node, :jobconf => true, :description => 'mapred.max.maps.per.node', :wukong => true
+ Settings.define :key_field_separator, :jobconf => true, :description => 'map.output.key.field.separator', :wukong => true
+ Settings.define :map_speculative, :jobconf => true, :description => 'mapred.map.tasks.speculative.execution', :wukong => true
+ Settings.define :map_tasks, :jobconf => true, :description => 'mapred.map.tasks', :wukong => true
Settings.define :max_maps_per_cluster, :jobconf => true, :description => 'mapred.max.maps.per.cluster', :wukong => true
+ Settings.define :max_maps_per_node, :jobconf => true, :description => 'mapred.max.maps.per.node', :wukong => true
+ Settings.define :max_node_map_tasks, :jobconf => true, :description => 'mapred.tasktracker.map.tasks.maximum', :wukong => true
+ Settings.define :max_node_reduce_tasks, :jobconf => true, :description => 'mapred.tasktracker.reduce.tasks.maximum', :wukong => true
Settings.define :max_record_length, :jobconf => true, :description => 'mapred.linerecordreader.maxlength', :wukong => true # "Safeguards against corrupted data: lines longer than this (in bytes) are treated as bad records."
+ Settings.define :max_reduces_per_cluster,:jobconf => true, :description => 'mapred.max.reduces.per.cluster', :wukong => true
+ Settings.define :max_reduces_per_node, :jobconf => true, :description => 'mapred.max.reduces.per.node', :wukong => true
Settings.define :min_split_size, :jobconf => true, :description => 'mapred.min.split.size', :wukong => true
+ Settings.define :output_field_separator, :jobconf => true, :description => 'stream.map.output.field.separator', :wukong => true
+ Settings.define :partition_fields, :jobconf => true, :description => 'num.key.fields.for.partition', :wukong => true
+ Settings.define :reduce_tasks, :jobconf => true, :description => 'mapred.reduce.tasks', :wukong => true
+ Settings.define :respect_exit_status, :jobconf => true, :description => 'stream.non.zero.exit.is.failure', :wukong => true
+ Settings.define :reuse_jvms, :jobconf => true, :description => 'mapred.job.reuse.jvm.num.tasks', :wukong => true
+ Settings.define :sort_fields, :jobconf => true, :description => 'stream.num.map.output.key.fields', :wukong => true
+ Settings.define :timeout, :jobconf => true, :description => 'mapred.task.timeout', :wukong => true
Settings.define :noempty, :description => "don't create zero-byte reduce files (hadoop mode only)", :wukong => true
Settings.define :split_on_xml_tag, :description => "Parse XML document by specifying the tag name: 'anything found between <tag> and </tag> will be treated as one record for map tasks'", :wukong => true
@@ -60,7 +60,7 @@ def execute_hadoop_workflow
# Use Settings[:hadoop_home] to set the path your config install.
hadoop_commandline = [
hadoop_runner,
- "jar #{Settings[:hadoop_home]}/contrib/streaming/hadoop-*streaming*.jar",
+ "jar #{options[:hadoop_home]}/contrib/streaming/hadoop-*streaming*.jar",
hadoop_jobconf_options,
"-D mapred.job.name='#{job_name}'",
hadoop_other_args,
@@ -80,8 +80,8 @@ def hadoop_jobconf_options
# Fixup these options
options[:reuse_jvms] = '-1' if (options[:reuse_jvms] == true)
options[:respect_exit_status] = 'false' if (options[:ignore_exit_status] == true)
- # If no reducer_klass and no reduce_command, then skip the reduce phase
- options[:reduce_tasks] = 0 if (! reducer_klass) && (! options[:reduce_command]) && (! options[:reduce_tasks])
+ # If no reducer and no reduce_command, then skip the reduce phase
+ options[:reduce_tasks] = 0 if (! reducer) && (! options[:reduce_command]) && (! options[:reduce_tasks])
# Fields hadoop should use to distribute records to reducers
unless options[:partition_fields].blank?
jobconf_options += [
@@ -90,23 +90,24 @@ def hadoop_jobconf_options
]
end
jobconf_options += [
- :key_field_separator, :sort_fields,
- :map_tasks, :reduce_tasks,
- :max_node_map_tasks, :max_node_reduce_tasks,
- :max_reduces_per_node, :max_reduces_per_cluster,
- :max_maps_per_node, :max_maps_per_cluster,
- :min_split_size,
- :map_speculative,
- :timeout,
- :reuse_jvms, :respect_exit_status
+ :io_sort_mb, :io_sort_record_percent,
+ :map_speculative, :map_tasks,
+ :max_maps_per_cluster, :max_maps_per_node,
+ :max_node_map_tasks, :max_node_reduce_tasks,
+ :max_reduces_per_cluster, :max_reduces_per_node,
+ :max_record_length, :min_split_size,
+ :output_field_separator, :key_field_separator,
+ :partition_fields, :sort_fields,
+ :reduce_tasks, :respect_exit_status,
+ :reuse_jvms, :timeout,
].map{|opt| jobconf(opt)}
jobconf_options.flatten.compact
end
def hadoop_other_args
extra_str_args = [ options[:extra_args] ]
- if Settings.split_on_xml_tag
- extra_str_args << %Q{-inputreader 'StreamXmlRecordReader,begin=<#{Settings.split_on_xml_tag}>,end=</#{Settings.split_on_xml_tag}>'}
+ if options.split_on_xml_tag
+ extra_str_args << %Q{-inputreader 'StreamXmlRecordReader,begin=<#{options.split_on_xml_tag}>,end=</#{options.split_on_xml_tag}>'}
end
extra_str_args << ' -lazyOutput' if options[:noempty] # don't create reduce file if no records
extra_str_args << ' -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner' unless options[:partition_fields].blank?
46 lib/wukong/streamer/base.rb
View
@@ -4,13 +4,17 @@ class Base
# Options, initially set from the command-line args -- see
# Script#process_argv!
- attr_accessor :options
+ attr_reader :own_options
#
# Accepts option hash from script runner
#
def initialize options={}
- self.options = options
+ @own_options = options
+ end
+
+ def options
+ Settings.deep_merge own_options
end
#
@@ -24,6 +28,7 @@ def stream
process(*record) do |output_record|
emit output_record
end
+ monitor.periodically(record.to_s[0..1000])
end
after_stream
end
@@ -75,6 +80,43 @@ def bad_record! key, *args
warn "Bad record #{args.inspect[0..400]}"
puts ["bad_record-"+key, *args].join("\t")
end
+
+ # A periodic logger to track progress
+ def monitor
+ @monitor ||= PeriodicMonitor.new
+ end
+
+ # Defines a process method on the fly to execute the given mapper.
+ #
+ # This is still experimental.
+ # Among other limitations, you can't use ++yield++ -- you have to call
+ # emit() directly.
+ def map &mapper_block
+ @mapper_block = mapper_block.to_proc
+ self.instance_eval do
+ def process *args, &block
+ instance_exec(*args, &@mapper_block)
+ end
+ end
+ self
+ end
+
+ # Creates a new object of this class and injects the given block
+ # as the process method
+ def self.map *args, &block
+ self.new.map *args, &block
+ end
+
+ # Delegates back to Wukong to run this instance as a mapper
+ def run options={}
+ Wukong.run(self, nil, options)
+ end
+
+ # Creates a new object of this class and runs it
+ def self.run options={}
+ Wukong.run(self.new, nil, options)
+ end
+
end
end
end
61 lib/wukong/streamer/cassandra_streamer.rb
View
@@ -1,61 +0,0 @@
-# Defines a base class for streaming data into a cassandra db connection.
-require 'cassandra' ; include Cassandra::Constants
-module Wukong
- module Streamer
-
- class CassandraStreamer < Wukong::Streamer::Base
- attr_accessor :batch_count, :batch_record_count, :batch_size, :column_space, :db_seeds, :cassandra_db
-
- def initialize *args
- super *args
- self.batch_count = 0
- self.batch_record_count = 0
- self.column_space ||= 'Twitter'
- self.batch_size ||= 100
- self.db_seeds ||= %w[10.244.191.178 10.243.19.223 10.243.17.219 10.245.70.85 10.244.206.241].map{ |s| s.to_s+':9160'}
- self.cassandra_db ||= Cassandra.new(self.column_space, self.db_seeds)
- end
-
- def stream
- while still_lines? do
- start_batch do
- while still_lines? && batch_not_full? do
- line = get_line
- record = recordize(line.chomp) or next
- next if record.blank?
- process(*record) do |output_record|
- emit output_record
- end
- self.batch_record_count += 1
- end
- end
- end
- end
-
- def process *args, &blk
- Raise "Overwrite this method to insert into cassandra db"
- end
-
- def start_batch &blk
- self.batch_record_count = 0
- self.batch_count += 1
- self.cassandra_db.batch(&blk)
- end
-
- def get_line
- $stdin.gets
- end
-
- def still_lines?
- !$stdin.eof?
- end
-
- def batch_not_full?
- self.batch_record_count < self.batch_size
- end
-
- end
- end
-
-end
-
11 lib/wukong/streamer/count_keys.rb
View
@@ -4,25 +4,20 @@ module Streamer
# Emit each unique key and the count of its occurrences
#
class CountKeys < Wukong::Streamer::AccumulatingReducer
- attr_accessor :key_count
-
- def formatted_key_count
- "%10d"%key_count.to_i
- end
# reset the counter to zero
def start! *args
- self.key_count = 0
+ @count = 0
end
# record one more for this key
def accumulate *vals
- self.key_count += 1
+ @count += 1
end
# emit each key field and the count, tab-separated.
def finalize
- yield [key, formatted_key_count]
+ yield [key, @count]
end
end
26 lib/wukong/streamer/count_lines.rb
View
@@ -1,26 +0,0 @@
-module Wukong
- module Streamer
- #
- # For each identical line in the map phase output, emit one representative
- # line followed by the count of occrrences (separated by a tab).
- #
- # (This is the functional equivalent of +'uniq -c'+)
- #
- class CountLines < Wukong::Streamer::Base
- def formatted_count item, key_count
- "%s\t%10d" % [item, key_count.to_i]
- end
-
- #
- # Delegate to +uniq -c+, but put the count last for idempotence.
- #
- def stream
- %x{/usr/bin/uniq -c}.split("\n").each do |line|
- key_count, item = line.chomp.strip.split(/\s+/, 2)
- puts formatted_count(item, key_count)
- end
- end
- end
-
- end
-end
25 lib/wukong/streamer/counting_reducer.rb
View
@@ -1,25 +0,0 @@
-module Wukong
- module Streamer
-
- #
- # Count the number of records for each key.
- #
- class CountingReducer < AccumulatingReducer
- attr_accessor :count
-
- # start the sum with 0 for each key
- def start! *_
- self.count = 0
- end
- # ... and count the number of records for this key
- def accumulate *_
- self.count += 1
- end
- # emit [key, count]
- def finalize
- yield [key, count].flatten
- end
- end
-
- end
-end
4 lib/wukong/streamer/filter.rb
View
@@ -12,8 +12,8 @@ module Filter
#
# Subclass and re-define the emit? method
#
- def process *record, &block
- yield record if emit?(record)
+ def process *record
+ yield record if emit?(*record)
end
end
end
6 lib/wukong/streamer/list_reducer.rb
View
@@ -8,13 +8,13 @@ class ListReducer < Wukong::Streamer::AccumulatingReducer
# start with an empty list
def start! *args
- self.values = []
+ @values = []
end