diff --git a/README.textile b/README.textile index bffa3d77..51259c8f 100644 --- a/README.textile +++ b/README.textile @@ -1,6 +1,6 @@ h1. Wukong -Wukong makes "Hadoop":http://hadoop.apache.org/core so easy a chimpanzee can use it. +Wukong is Ruby for Hadoop -- it makes "Hadoop":http://hadoop.apache.org/core so easy a chimpanzee can use it. Treat your dataset like a * stream of lines when it's efficient to process by lines diff --git a/lib/wukong/script.rb b/lib/wukong/script.rb index cfe20496..89832215 100644 --- a/lib/wukong/script.rb +++ b/lib/wukong/script.rb @@ -118,11 +118,11 @@ class Script # end # MyScript.new(MyMapper, nil).run # - def initialize mapper_klass, reducer_klass, extra_options={} + def initialize mapper_klass, reducer_klass=nil, extra_options={} self.options = Settings.dup - options.resolve! - options.merge! self.default_options - options.merge! extra_options + self.options.resolve! + self.options.merge! self.default_options + self.options.merge! extra_options self.mapper_klass = mapper_klass self.reducer_klass = reducer_klass # If no reducer_klass and no reduce_command, then skip the reduce phase @@ -141,24 +141,29 @@ def default_options end # - # by default, call this script in --map mode + # Shell command for map phase. By default, calls the script in --map mode + # In hadoop mode, this is given to the hadoop streaming command. + # In local mode, it's given to the system() call # def map_command - case - when mapper_klass - "#{ruby_interpreter_path} #{this_script_filename} --map " + non_wukong_params - else options[:map_command] || options[:default_mapper] end + if mapper_klass + "#{ruby_interpreter_path} #{this_script_filename} --map " + non_wukong_params + else + options[:map_command] || options[:default_mapper] + end end # - # Shell command for reduce phase - # by default, call this script in --reduce mode + # Shell command for reduce phase. By default, calls the script in --reduce mode + # In hadoop mode, this is given to the hadoop streaming command. + # In local mode, it's given to the system() call # def reduce_command - case - when reducer_klass - "#{ruby_interpreter_path} #{this_script_filename} --reduce " + non_wukong_params - else options[:reduce_command] || options[:default_reducer] end + if reducer_klass + "#{ruby_interpreter_path} #{this_script_filename} --map " + non_wukong_params + else + options[:reduce_command] + end end # @@ -187,10 +192,10 @@ def run_mode end def input_output_paths - # input / output paths - input_path, output_path = options.rest[0..1] - raise "You need to specify a parsed input directory and a directory for output. Got #{ARGV.inspect}" if (! options[:dry_run]) && (input_path.blank? || output_path.blank?) - [input_path, output_path] + output_path = options.rest.pop + input_paths = options.rest.reject(&:blank?) + raise "You need to specify a parsed input directory and a directory for output. Got #{ARGV.inspect}" if (! options[:dry_run]) && (input_paths.blank? || output_path.blank?) + [input_paths, output_path] end def maybe_overwrite_output_paths! output_path @@ -205,7 +210,7 @@ def maybe_overwrite_output_paths! output_path def non_wukong_params options. reject{|param, val| options.param_definitions[param][:wukong] }. - map{|param,val| "--#{param}=#{val}" }. + map{|param,val| "--#{param}=\"#{val}\"" }. join(" ") end @@ -218,8 +223,7 @@ def this_script_filename def ruby_interpreter_path Pathname.new( File.join(Config::CONFIG["bindir"], - Config::CONFIG["RUBY_INSTALL_NAME"]+ - Config::CONFIG["EXEEXT"]) + Config::CONFIG["RUBY_INSTALL_NAME"]+Config::CONFIG["EXEEXT"]) ).realpath end @@ -228,9 +232,9 @@ def ruby_interpreter_path # def exec_hadoop_streaming $stderr.puts "Streaming on self" - input_path, output_path = input_output_paths + input_paths, output_path = input_output_paths maybe_overwrite_output_paths! output_path - command = runner_command(input_path, output_path) + command = runner_command(input_paths.join(','), output_path) $stderr.puts command unless options[:dry_run] $stdout.puts `#{command}`