Permalink
Browse files

made example work with new abstraction, changed script to use new abs…

…traction, added methods for merging hdfs files and copying to local without concatenating them
  • Loading branch information...
1 parent 2c6d19b commit 69b4f1236a86000be8602dd4e8988ceb2ba90966 @thedatachef thedatachef committed Feb 12, 2011
Binary file not shown.
@@ -1,62 +0,0 @@
-#!/usr/bin/env ruby
-
-require 'rake'
-require 'swineherd' ; include Swineherd
-require 'swineherd/workflow'
-
-Settings.define :flow_id, :required => true, :description => "Unique id for the workflow"
-Settings.define :work_dir, :default => "/tmp/pagerank_example", :description => "HDFS path to intermediate and final outputs"
-Settings.define :iterations, :type => Integer, :default => 5, :description => "Number of iterations of pagerank algorithm to run"
-Settings.resolve!
-
-Workflow.new(Settings.flow_id) do |flow_id|
- def one_pagerank_iteration n
- script = PigScript.new('pagerank.pig')
- script.options = {:curr_iter_file => "#{Settings.work_dir}/pagerank_iteration_#{n}", :next_iter_file => "#{Settings.work_dir}/pagerank_iteration_#{n+1}", :damp => "0.85f"}
- script.output << "#{Settings.work_dir}/pagerank_iteration_#{n+1}"
- script.run
- end
-
- task :pagerank_initialize do
- script = PigScript.new('pagerank_initialize.pig')
- script.options = {:adjlist => "#{Settings.work_dir}/seinfeld_network.tsv", :initgrph => "#{Settings.work_dir}/pagerank_iteration_0"}
- script.output << "#{Settings.work_dir}/pagerank_iteration_0"
- script.run
- end
-
- task :pagerank_iterate => [:pagerank_initialize] do
- Settings.iterations.to_i.times do |i|
- one_pagerank_iteration i
- end
- end
-
- task :cut_off_adjacency_list => [:pagerank_iterate] do
- script = WukongScript.new('cut_off_list.rb')
- script.input << "#{Settings.work_dir}/pagerank_iteration_#{Settings.iterations.to_i - 1}"
- script.output << "#{Settings.work_dir}/pagerank_result"
- script.run
- end
-
- #
- # Pull results into local directory with same name
- #
- task :pull_down_results => [:cut_off_adjacency_list] do
- HDFS.cat_to_local("#{Settings.work_dir}/pagerank_result", "#{Settings.work_dir}/pagerank_result.tsv")
- end
-
- #
- # Plot 2nd column as a histogram (requires R and ggplot2)
- #
- task :plot_results => [:pull_down_results] do
- script = RScript.new('histogram.R')
- script.attributes = {
- :pagerank_data => "#{Settings.work_dir}/pagerank_result.tsv",
- :plot_file => "#{Settings.work_dir}/pagerank_plot.png",
- :raw_rank => "aes(x=d$V2)"
-
- }
- script.output << "#{Settings.work_dir}/pagerank_plot.png"
- script.run true # run locally
- end
-
-end.run(:plot_results)
@@ -1,19 +1,23 @@
#!/usr/bin/env ruby
+$LOAD_PATH << '../../lib'
require 'swineherd' ; include Swineherd
+require 'swineherd/filesystem'
require 'swineherd/script/pig_script' ; include Swineherd::Script
require 'swineherd/script/wukong_script'
+require 'swineherd/script/r_script'
-Settings.define :flow_id, :required => true, :description => "Flow id required to make run of workflow unique"
-Settings.define :iterations, :type => Integer, :default => 10, :description => "Number of pagerank iterations to run"
+Settings.define :flow_id, :required => true, :description => "Flow id required to make run of workflow unique"
+Settings.define :iterations, :type => Integer, :default => 10, :description => "Number of pagerank iterations to run"
+Settings.define :hadoop_home, :default => '/usr/local/share/hadoop', :description => "Path to hadoop config"
Settings.resolve!
flow = Workflow.new(Settings.flow_id) do
- initializer = PigScript.new('pagerank_initialize.pig')
- iterator = PigScript.new('pagerank.pig')
- finisher = WukongScript.new('cut_off_list.rb')
- plotter = RScript.new('histogram.R')
+ initializer = PigScript.new('scripts/pagerank_initialize.pig')
+ iterator = PigScript.new('scripts/pagerank.pig')
+ finisher = WukongScript.new('scripts/cut_off_list.rb')
+ plotter = RScript.new('scripts/histogram.R')
#
# Runs simple pig script to initialize pagerank. We must specify the input
@@ -22,6 +26,7 @@
# converted into command-line args for the pig interpreter.
#
task :pagerank_initialize do
+ initializer.pig_classpath = File.join(Settings.hadoop_home, 'conf')
initializer.output << next_output(:pagerank_initialize)
initializer.options = {:adjlist => "/tmp/pagerank_example/seinfeld_network.tsv", :initgrph => latest_output(:pagerank_initialize)}
initializer.run
@@ -55,10 +60,13 @@
end
#
- # Cat results into a local directory with the same structure eg. #{work_dir}/#{flow_id}/pull_down_results-0
+ # Cat results into a local directory with the same structure eg. #{work_dir}/#{flow_id}/pull_down_results-0.
#
task :pull_down_results => [:cut_off_adjacency_list] do
- HDFS.cat_to_local(latest_output(:pagerank_iterate), next_output(:pull_down_results))
+ hdfs = FileSystem.get(:hdfs)
+ localfs = FileSystem.get(:file)
+ next if localfs.exists? next_output(:pull_down_results)
+ hdfs.copy_to_local(latest_output(:cut_off_adjacency_list), latest_output(:pull_down_results))
end
#
@@ -73,7 +81,7 @@
:raw_rank => "aes(x=d$V2)"
}
plotter.output << latest_output(:plot_result)
- script.run true # <-- run locallly
+ script.run :local
end
end
@@ -76,6 +76,20 @@ def entries dirpath
list.map{|path| path.get_path.to_s}
end
+ #
+ # Merge all part files in a directory into one file.
+ #
+ def merge srcdir, dstfile
+ FileUtil.copy_merge(@hdfs, Path.new(srcdir), @hdfs, Path.new(dstfile), false, @conf, "")
+ end
+
+ #
+ # Copy hdfs file to local filesystem
+ #
+ def copy_to_local srcfile, dstfile
+ @hdfs.copy_to_local_file(Path.new(srcfile), Path.new(dstfile))
+ end
+
def close *args
@hdfs.close
end
View
@@ -1,4 +1,4 @@
-require 'swineherd/localfs'
+require 'swineherd/filesystem'
module Swineherd
module Script
module Common
@@ -33,12 +33,14 @@ def cmd
#
# Default is to run with hadoop
#
- def run local=false
- puts cmd
- if local
- sh "#{cmd}" if LocalFS.check_paths(@output)
- else
- sh "#{cmd}" if HDFS.check_paths(@output)
+ def run mode=:hadoop
+ case mode
+ when :local then
+ localfs = FileSystem.get :file
+ sh "#{cmd}" if localfs.check_paths(@output)
+ when :hadoop then
+ hdfs = FileSystem.get :hdfs
+ sh "#{cmd}" if hdfs.check_paths(@output)
end
end

0 comments on commit 69b4f12

Please sign in to comment.