Skip to content

Commit

Permalink
DRYing up Workflow Job even though it isn't currently (and may never …
Browse files Browse the repository at this point in the history
…be) used
  • Loading branch information
David Snyder committed Jan 13, 2012
1 parent 071e15c commit 98029b4
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 31 deletions.
2 changes: 2 additions & 0 deletions TODO.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
* The next task in a workflow should NOT run if the previous step failed
** this is made difficult by the fact that, sometimes?, when a pig script fails it still returns a 0 exit status
** same for wukong scripts

* Add a @job@ object that implements a @not_if@ function. this way a @workflow@ will be constructed of @job@ objects
** a @job@ will do nothing more than execute the ruby code in it's (run?) block, unless @not_if@ is true
** this way we can put @script@ objects inside a @job@ and only run under certain conditions that the user specifies when
they create the @job@

* Implement FTP filesystem interface
1 change: 1 addition & 0 deletions lib/swineherd/runner/hadoop_jobconf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def jobconf_options
config.options_for(:hadoop_jobconf).inject([]){ |options,option| options << jobconf_for(option[0]) }
end

#FIXME: Currently doesn't accept arbitrary jobconf commands, like "mapred.min.split.size".
def jobconf_for option
unless config[option].nil?
"-D%s=%s" % [config.definition_of(option)[:description], config[option]]
Expand Down
9 changes: 5 additions & 4 deletions lib/swineherd/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

module Swineherd
class Workflow
attr_accessor :workdir, :outputs,:flow_id
attr_accessor :workdir,:outputs,:flow_id

#
# Create a new workflow and new namespace for this workflow
Expand All @@ -21,7 +21,8 @@ def initialize flow_id, &blk
#
def next_output taskname
raise "No working directory specified, set #workdir." unless workdir
outputs[taskname] << [workdir,flow_id,taskname].join("/")+"-#{outputs[taskname].count}"
taskcount = outputs[taskname].count
outputs[taskname] << [workdir,flow_id,taskname].join("/")+"-#{taskcount}"
latest_output(taskname)
end

Expand All @@ -37,9 +38,9 @@ def latest_output taskname
#
def run taskname
task = [flow_id,taskname].join(":")
Logger.new(STDOUT).info "Launching workflow task #{task}"
Logger.new(STDOUT).info "Launching workflow task '#{task}'"
Rake::Task[task].invoke
Logger.new(STDOUT).info "Workflow task #{task} finished"
Logger.new(STDOUT).info "Workflow task '#{task}' finished"
end

#
Expand Down
30 changes: 3 additions & 27 deletions lib/swineherd/workflow/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,47 +5,23 @@ module Swineherd
#
class Job

#
# Initialize job, fill variables, and create rake task
#
attr_accessor :name,:script,:dependencies,:job_id

def initialize job_id, &blk
@job_id = job_id
@name = ''
@dependencies = []
@script = ''
self.instance_eval(&blk)
raketask
handle_dependencies
end

#
# Will be the name of the rake task
#
def name name = nil
return @name unless name
@name = name
end

def script script = nil
return @script unless script
@script = script
end

#
# An array of job names as dependencies
#
def dependencies dependencies = nil
return @dependencies unless dependencies
@dependencies = dependencies
end

def handle_dependencies
return if dependencies.empty?
task name => dependencies
end

def cmd
@script.cmd
@script.runner.command_line
end

#
Expand Down

0 comments on commit 98029b4

Please sign in to comment.