Skip to content

Commit

Permalink
now takes the last arg as destination, not the second non-option arg
Browse files Browse the repository at this point in the history
  • Loading branch information
Philip (flip) Kromer committed Apr 10, 2010
1 parent 23b507e commit e04f928
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 25 deletions.
2 changes: 1 addition & 1 deletion README.textile
@@ -1,6 +1,6 @@
h1. Wukong

Wukong makes "Hadoop":http://hadoop.apache.org/core so easy a chimpanzee can use it.
Wukong is Ruby for Hadoop -- it makes "Hadoop":http://hadoop.apache.org/core so easy a chimpanzee can use it.

Treat your dataset like a
* stream of lines when it's efficient to process by lines
Expand Down
52 changes: 28 additions & 24 deletions lib/wukong/script.rb
Expand Up @@ -118,11 +118,11 @@ class Script
# end
# MyScript.new(MyMapper, nil).run
#
def initialize mapper_klass, reducer_klass, extra_options={}
def initialize mapper_klass, reducer_klass=nil, extra_options={}
self.options = Settings.dup
options.resolve!
options.merge! self.default_options
options.merge! extra_options
self.options.resolve!
self.options.merge! self.default_options
self.options.merge! extra_options
self.mapper_klass = mapper_klass
self.reducer_klass = reducer_klass
# If no reducer_klass and no reduce_command, then skip the reduce phase
Expand All @@ -141,24 +141,29 @@ def default_options
end

#
# by default, call this script in --map mode
# Shell command for map phase. By default, calls the script in --map mode
# In hadoop mode, this is given to the hadoop streaming command.
# In local mode, it's given to the system() call
#
def map_command
case
when mapper_klass
"#{ruby_interpreter_path} #{this_script_filename} --map " + non_wukong_params
else options[:map_command] || options[:default_mapper] end
if mapper_klass
"#{ruby_interpreter_path} #{this_script_filename} --map " + non_wukong_params
else
options[:map_command] || options[:default_mapper]
end
end

#
# Shell command for reduce phase
# by default, call this script in --reduce mode
# Shell command for reduce phase. By default, calls the script in --reduce mode
# In hadoop mode, this is given to the hadoop streaming command.
# In local mode, it's given to the system() call
#
def reduce_command
case
when reducer_klass
"#{ruby_interpreter_path} #{this_script_filename} --reduce " + non_wukong_params
else options[:reduce_command] || options[:default_reducer] end
if reducer_klass
"#{ruby_interpreter_path} #{this_script_filename} --map " + non_wukong_params
else
options[:reduce_command]
end
end

#
Expand Down Expand Up @@ -187,10 +192,10 @@ def run_mode
end

def input_output_paths
# input / output paths
input_path, output_path = options.rest[0..1]
raise "You need to specify a parsed input directory and a directory for output. Got #{ARGV.inspect}" if (! options[:dry_run]) && (input_path.blank? || output_path.blank?)
[input_path, output_path]
output_path = options.rest.pop
input_paths = options.rest.reject(&:blank?)
raise "You need to specify a parsed input directory and a directory for output. Got #{ARGV.inspect}" if (! options[:dry_run]) && (input_paths.blank? || output_path.blank?)
[input_paths, output_path]
end

def maybe_overwrite_output_paths! output_path
Expand All @@ -205,7 +210,7 @@ def maybe_overwrite_output_paths! output_path
def non_wukong_params
options.
reject{|param, val| options.param_definitions[param][:wukong] }.
map{|param,val| "--#{param}=#{val}" }.
map{|param,val| "--#{param}=\"#{val}\"" }.
join(" ")
end

Expand All @@ -218,8 +223,7 @@ def this_script_filename
def ruby_interpreter_path
Pathname.new(
File.join(Config::CONFIG["bindir"],
Config::CONFIG["RUBY_INSTALL_NAME"]+
Config::CONFIG["EXEEXT"])
Config::CONFIG["RUBY_INSTALL_NAME"]+Config::CONFIG["EXEEXT"])
).realpath
end

Expand All @@ -228,9 +232,9 @@ def ruby_interpreter_path
#
def exec_hadoop_streaming
$stderr.puts "Streaming on self"
input_path, output_path = input_output_paths
input_paths, output_path = input_output_paths
maybe_overwrite_output_paths! output_path
command = runner_command(input_path, output_path)
command = runner_command(input_paths.join(','), output_path)
$stderr.puts command
unless options[:dry_run]
$stdout.puts `#{command}`
Expand Down

0 comments on commit e04f928

Please sign in to comment.