Skip to content

Commit

Permalink
got rid of idempotency at the script level, not all scripts have file…
Browse files Browse the repository at this point in the history
…system output and idempotency should be left to the workflow designer themselves, updated example to reflect this, stripped out pig_classpath and pig_options but added the ability to set env for any script via the "env" method (way more flexible)
  • Loading branch information
Jacob Perkins committed Feb 21, 2011
1 parent 57b23bd commit 2e34180
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 35 deletions.
6 changes: 3 additions & 3 deletions README.textile
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,9 @@ flow.run(:plot_results)

h3. TODO

* strip out idempotentcy from the scripts themselves, it's confusing. delegate to the higher level task at the moment
** the user can use the filesystem interface to achieve this
** some pig scripts have no filesystem output, still need to be ran idempotently, shouldn't be creating a output path to achieve this
* next task in a workflow should NOT run if the previous step failed
** this is made difficult by the fact that, sometimes?, when a pig script fails it still returns a 0 exit status
** same for wukong scripts
* add a @job@ object that implements a @not_if@ function. this way a @workflow@ will be constructed of @job@ objects
** a @job@ will do nothing more than execute the ruby code in it's (run?) block, unless @not_if@ is true
** this way we can put @script@ objects inside a @job@ and only run under certain conditions that the user specifies when
Expand Down
53 changes: 30 additions & 23 deletions examples/pagerank/pagerank.rb
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
#!/usr/bin/env ruby

$LOAD_PATH << '../../lib'
require 'swineherd' ; include Swineherd
require 'swineherd' ; include Swineherd
require 'swineherd/script' ; include Swineherd::Script
require 'swineherd/filesystem'
require 'swineherd/script/pig_script' ; include Swineherd::Script
require 'swineherd/script/wukong_script'
require 'swineherd/script/r_script'

Settings.define :flow_id, :required => true, :description => "Flow id required to make run of workflow unique"
Settings.define :iterations, :type => Integer, :default => 10, :description => "Number of pagerank iterations to run"
Settings.define :flow_id, :required => true, :description => "Flow id required to make run of workflow unique"
Settings.define :iterations, :type => Integer, :default => 10, :description => "Number of pagerank iterations to run"
Settings.define :hadoop_home, :default => '/usr/local/share/hadoop', :description => "Path to hadoop config"
Settings.resolve!

flow = Workflow.new(Settings.flow_id) do

# The filesystems we're going to be working with
hdfs = Swineherd::FileSystem.get(:hdfs)
localfs = Swineherd::FileSystem.get(:file)

# The scripts we're going to use
initializer = PigScript.new('scripts/pagerank_initialize.pig')
iterator = PigScript.new('scripts/pagerank.pig')
finisher = WukongScript.new('scripts/cut_off_list.rb')
Expand All @@ -26,10 +29,8 @@
# converted into command-line args for the pig interpreter.
#
task :pagerank_initialize do
initializer.pig_classpath = File.join(Settings.hadoop_home, 'conf')
initializer.output << next_output(:pagerank_initialize)
initializer.options = {:adjlist => "/tmp/pagerank_example/seinfeld_network.tsv", :initgrph => latest_output(:pagerank_initialize)}
initializer.run
initializer.options = {:adjlist => "/tmp/pagerank_example/seinfeld_network.tsv", :initgrph => next_output(:pagerank_initialize)}
initializer.run(:hadoop) unless hdfs.exists? latest_output(:pagerank_initialize)
end

#
Expand All @@ -40,9 +41,8 @@
iterator.options[:damp] = '0.85f'
iterator.options[:curr_iter_file] = latest_output(:pagerank_initialize)
Settings.iterations.times do
iterator.output << next_output(:pagerank_iterate)
iterator.options[:next_iter_file] = latest_output(:pagerank_iterate)
iterator.run
iterator.options[:next_iter_file] = next_output(:pagerank_iterate)
iterator.run(:hadoop) unless hdfs.exists? latest_output(:pagerank_iterate)
iterator.refresh!
iterator.options[:curr_iter_file] = latest_output(:pagerank_iterate)
end
Expand All @@ -56,17 +56,26 @@
task :cut_off_adjacency_list => [:pagerank_iterate] do
finisher.input << latest_output(:pagerank_iterate)
finisher.output << next_output(:cut_off_adjacency_list)
finisher.run
finisher.run :hadoop unless hdfs.exists? latest_output(:cut_off_adjacency_list)
end

#
# We want to pull down one result file, merge the part-000.. files into one file
#
task :merge_results => [:cut_off_adjacency_list] do
merged_results = next_output(:merge_results)
hdfs.merge(latest_output(:cut_off_adjacency_list), merged_results) unless hdfs.exists? merged_results
end

#
# Cat results into a local directory with the same structure eg. #{work_dir}/#{flow_id}/pull_down_results-0.
# Cat results into a local directory with the same structure
# eg. #{work_dir}/#{flow_id}/pull_down_results-0.
#
# FIXME: Bridging filesystems is cludgey.
#
task :pull_down_results => [:cut_off_adjacency_list] do
hdfs = FileSystem.get(:hdfs)
localfs = FileSystem.get(:file)
next if localfs.exists? next_output(:pull_down_results)
hdfs.copy_to_local(latest_output(:cut_off_adjacency_list), latest_output(:pull_down_results))
task :pull_down_results => [:merge_results] do
local_results = next_output(:pull_down_results)
hdfs.copy_to_local(latest_output(:merge_results), local_results) unless localfs.exists? local_results
end

#
Expand All @@ -80,13 +89,11 @@
:plot_file => next_output(:plot_results), # <-- this will be a png...
:raw_rank => "aes(x=d$V2)"
}
plotter.output << latest_output(:plot_result)
script.run :local
plotter.run(:hadoop) unless localfs.exists? latest_output(:plot_results)
end

end

flow.workdir = "/tmp/pagerank_example"
flow.describe
flow.run(:plot_results)
# flow.clean!
19 changes: 14 additions & 5 deletions lib/swineherd/script.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
require 'swineherd/filesystem'
module Swineherd
module Script

autoload :WukongScript, 'swineherd/script/wukong_script'
autoload :PigScript, 'swineherd/script/pig_script'
autoload :RScript, 'swineherd/script/r_script'

module Common
attr_accessor :input, :output, :options, :attributes
def initialize(source, input = [], output = [], options = {}, attributes ={})
Expand All @@ -11,6 +15,13 @@ def initialize(source, input = [], output = [], options = {}, attributes ={})
@attributes = attributes
end

#
# Allows for setting the environment the script will be ran in
#
def env
ENV
end

def script
@script ||= Template.new(@source, @attributes).substitute!
end
Expand Down Expand Up @@ -46,11 +57,9 @@ def local_cmd
def run mode=:hadoop
case mode
when :local then
localfs = FileSystem.get :file
sh local_cmd if localfs.check_paths(@output)
sh local_cmd
when :hadoop then
hdfs = FileSystem.get :hdfs
sh cmd if hdfs.check_paths(@output)
sh cmd
end
end

Expand Down
5 changes: 2 additions & 3 deletions lib/swineherd/script/pig_script.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
module Swineherd::Script
class PigScript
include Common
attr_accessor :pig_options, :pig_classpath

#
# Convert a generic hash of options {:foo => 'bar'} into
Expand All @@ -12,11 +11,11 @@ def pig_args options
end

def local_cmd
"PIG_CLASSPATH=#{@pig_classpath} PIG_OPTS='#{@pig_options}' pig -x local #{pig_args(@options)} #{script}"
"pig -x local #{pig_args(@options)} #{script}"
end

def cmd
"PIG_CLASSPATH=#{@pig_classpath} PIG_OPTS='#{@pig_options}' pig #{pig_args(@options)} #{script}"
"pig #{pig_args(@options)} #{script}"
end

end
Expand Down
1 change: 0 additions & 1 deletion lib/swineherd/script/wukong_script.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ def cmd
"#{ruby_interpreter_path} #{script} #{wukong_args(@options)} --run #{input.join(',')} #{output.join(',')}"
end

# FIXME: wukong's local mode doesn't work?
def local_cmd
inputs = input.map{|path| path += "/*"}.join(',')
"#{ruby_interpreter_path} #{script} #{wukong_args(@options)} --run=local #{inputs} #{output.join(',')}"
Expand Down

0 comments on commit 2e34180

Please sign in to comment.