Skip to content
302f2e4 Jan 24, 2011
executable file 94 lines (79 sloc) 3.87 KB
#!/usr/bin/env ruby
require 'rubygems'
require 'configliere' ; Configliere.use(:commandline, :env_var, :define)
Settings.define :index_name, :required => true, :description => "Index to write data to"
Settings.define :object_type, :default => "tweet", :description => "Type of object we're indexing"
Settings.define :field_names, :default => "rsrc,tweet_id,created_at,user_id,screen_name,search_id,in_reply_to_user_id,in_reply_to_screen_name,in_reply_to_search_id,in_reply_to_status_id,text,source,lang,lat,lng,retweeted_count,rt_of_user_id,rt_of_screen_name,rt_of_tweet_id,contributors", :description => "Comma separated list of field names"
Settings.define :id_field, :default => "1", :description => "Index of field to use as object id (counting from 0; default 1), use -1 if no id field"
Settings.define :bulk_size, :default => "1000", :description => "Number of records per bulk request"
Settings.define :es_home, :default => "/usr/local/share/elasticsearch", :description => "Path to elasticsearch installation",:env_var => "ES_HOME"
Settings.define :es_config, :default => "/etc/elasticsearch/elasticsearch.yml", :description => "Path to elasticsearch config"
Settings.define :rm, :default => false, :description => "Remove existing output?"
Settings.define :hadoop_home, :default => "/usr/lib/hadoop", :description => "Path to hadoop installation", :env_var => "HADOOP_HOME"
Settings.define :min_split_size, :default => "5000000000", :description => "Min split size for maps"
Settings.define :test_outputfmt, :default => false, :description => "Use this flag to run job that test the ElasticSearchOutputFormat"
Settings.resolve!
raise "No input file specified." if Settings.rest.first.blank?
raise "No output file specified." if Settings.rest.last.blank?
class Wonderdog
attr_accessor :options
def initialize
@options = Settings.dup
end
def execute
output = options.rest.last
remove_output(output) if options.rm
system %Q{ echo #{hdp_cmd} }
system %Q{ #{hdp_cmd} }
end
def hdp_cmd
[
"HADOOP_CLASSPATH=#{hadoop_classpath}",
"#{options.hadoop_home}/bin/hadoop jar #{run_jar}",
mainclass,
"-Dmapred.map.tasks.speculative.execution=false",
"-Dmapred.min.split.size=#{options.min_split_size}",
"-Dwonderdog.index.name=#{options.index_name}",
"-Dwonderdog.object.type=#{options.object_type}",
"-Dwonderdog.id.field=#{options.id_field}",
"-Dwonderdog.field.names=#{options.field_names}",
"-Dwonderdog.bulk.size=#{options.bulk_size}",
"-Dwonderdog.config=#{options.es_config}",
"-Dwonderdog.plugins.dir=#{options.es_home}/plugins",
"-libjars #{libjars}",
"#{options.rest.first}",
"#{options.rest.last}"
].flatten.compact.join(" \t\\\n ")
end
def mainclass
return "com.infochimps.elasticsearch.ElasticTest" if Settings.test_outputfmt
"com.infochimps.elasticsearch.wonderdog.WonderDog"
end
def hadoop_classpath
cp = ["."]
Dir[
"/etc/elasticsearch/elasticsearch.yml",
"#{options.es_home}/plugins/*/*.jar",
"#{options.es_home}/lib/*.jar",
"#{options.es_home}/lib/sigar/*.jar"
].each{|jar| cp << jar}
cp.join(':')
end
def run_jar
File.dirname(File.expand_path(__FILE__))+'/../build/wonderdog.jar'
end
def libjars
libjars = []
Dir[
"/etc/elasticsearch/elasticsearch.yml",
"#{options.es_home}/plugins/*/*.jar",
"#{options.es_home}/lib/*.jar"
].each{|jar| libjars << jar}
libjars.join(',')
end
def remove_output output
system %Q{ hdp-rm -r #{output} }
end
end
runner = Wonderdog.new
runner.execute
Something went wrong with that request. Please try again.