Permalink
Browse files

Added new DSL methods to Wukong::Streamer::Base and Wukong. Now call …

…Wukong.run unless you want a custom script class
  • Loading branch information...
1 parent a52c230 commit deea1a4c1690b2dfce2cfa74026cbf3b3739a48a Philip (flip) Kromer committed Jan 28, 2011
Showing with 48 additions and 8 deletions.
  1. +7 −3 lib/wukong.rb
  2. +8 −5 lib/wukong/script.rb
  3. +33 −0 lib/wukong/streamer/base.rb
View
10 lib/wukong.rb
@@ -1,13 +1,17 @@
+require 'configliere'; Configliere.use :define
require 'wukong/extensions'
require 'wukong/datatypes'
+require 'wukong/periodic_monitor'
require 'wukong/logger'
-require 'wukong/bad_record'
+autoload :BadRecord, 'wukong/bad_record'
autoload :TypedStruct, 'wukong/typed_struct'
-require 'configliere'; Configliere.use :define
module Wukong
- autoload :Dfs, 'wukong/dfs'
autoload :Script, 'wukong/script'
autoload :Streamer, 'wukong/streamer'
autoload :Store, 'wukong/store'
autoload :FilenamePattern, 'wukong/filename_pattern'
+
+ def self.run mapper, reducer=nil, options={}
+ Wukong::Script.new(mapper, reducer, options).run
+ end
end
View
13 lib/wukong/script.rb
@@ -1,8 +1,10 @@
require 'pathname'
+require 'configliere' ; Configliere.use(:commandline, :env_var, :define)
+require 'wukong'
require 'wukong/script/hadoop_command'
require 'wukong/script/local_command'
-require 'configliere' ; Configliere.use(:commandline, :env_var, :define)
require 'rbconfig' # for uncovering ruby_interpreter_path
+require 'wukong/streamer' ; include Wukong::Streamer
module Wukong
# == How to run a Wukong script
#
@@ -122,7 +124,7 @@ class Script
# end
# MyScript.new(MyMapper, nil).run
#
- def initialize mapper, reducer_klass=nil, extra_options={}
+ def initialize mapper, reducer=nil, extra_options={}
Settings.resolve!
@options = Settings
options.merge extra_options
@@ -172,7 +174,7 @@ def run_mode
# In local mode, it's given to the system() call
#
def mapper_commandline
- if @mapper
+ if mapper
"#{ruby_interpreter_path} #{this_script_filename} --map " + non_wukong_params
else
options[:map_command]
@@ -228,8 +230,9 @@ def execute_command! *args
#
def maybe_overwrite_output_paths! output_path
if (options[:overwrite] || options[:rm]) && (run_mode == 'hadoop')
- Log.info "Removing output file #{output_path}"
- `hdp-rm -r '#{output_path}'`
+ cmd = %Q{#{hadoop_runner} fs -rmr '#{output_path}'}
+ Log.info "Removing output file #{output_path}: #{cmd}"
+ puts `#{cmd}`
end
end
View
33 lib/wukong/streamer/base.rb
@@ -81,9 +81,42 @@ def bad_record! key, *args
puts ["bad_record-"+key, *args].join("\t")
end
+ # A periodic logger to track progress
def monitor
@monitor ||= PeriodicMonitor.new
end
+
+ # Defines a process method on the fly to execute the given mapper.
+ #
+ # This is still experimental.
+ # Among other limitations, you can't use ++yield++ -- you have to call
+ # emit() directly.
+ def map &mapper_block
+ @mapper_block = mapper_block.to_proc
+ self.instance_eval do
+ def process *args, &block
+ instance_exec(*args, &@mapper_block)
+ end
+ end
+ self
+ end
+
+ # Creates a new object of this class and injects the given block
+ # as the process method
+ def self.map *args, &block
+ self.new.map *args, &block
+ end
+
+ # Delegates back to Wukong to run this instance as a mapper
+ def run options={}
+ Wukong.run(self, nil, options)
+ end
+
+ # Creates a new object of this class and runs it
+ def self.run options={}
+ Wukong.run(self.new, nil, options)
+ end
+
end
end
end

0 comments on commit deea1a4

Please sign in to comment.