Browse files

automatically detect and add elasticsearch jars when necessary, with …

…some options, and some specs. also added a Gemfile.
  • Loading branch information...
1 parent 2ceb142 commit 61057e9e169b986c058c26c5727630d62ce9c3a6 @dhruvbansal dhruvbansal committed Dec 6, 2012
View
3 Gemfile
@@ -0,0 +1,3 @@
+source :rubygems
+
+gemspec
View
1 lib/wonderdog/configuration.rb
@@ -8,6 +8,7 @@ module Elasticsearch
# @return [Configliere::Param] the newly configured settings
def self.configure settings
settings.define(:es_tmp_dir, :description => "Temporary directory on the HDFS to store job files while reading/writing to ElasticSearch", :default => "/user/#{ENV['USER']}/wukong", :wukong_hadoop => true)
+ settings.define(:es_lib_dir, :description => "Directory containing Elasticsearch, Wonderdog, and other support jars", :default => "/usr/lib/hadoop/lib", :wukong_hadoop => true)
settings.define(:es_config, :description => "Where to find configuration files detailing how to join an ElasticSearch cluster", :wukong_hadoop => true)
settings.define(:es_input_splits, :description => "Number of input splits to target when reading from ElasticSearch", :type => Integer, :wukong_hadoop => true)
settings.define(:es_request_size, :description => "Number of objects requested during each batch read from ElasticSearch", :type => Integer, :wukong_hadoop => true)
View
24 lib/wonderdog/hadoop_invocation_override.rb
@@ -121,6 +121,28 @@ def hadoop_jobconf_options
end.flatten.compact
end
+ # :nodoc:
+ #
+ # Munge the settings object to add necessary jars if
+ # reading/writing to/from Elasticsearch, then call super().
+ def hadoop_files
+ if reads_from_elasticsearch? || writes_to_elasticsearch?
+ settings[:jars] = elasticsearch_jars if settings[:jars].empty?
+ end
+ super()
+ end
+
+ # All Elasticsearch, Wonderdog, and other support jars needed to
+ # connect Hadoop streaming with the
+ # ElasticSearchStreamingInputFormat and
+ # ElasticSearchStreamingOutputFormat provided by the Wonderdog
+ # Java code.
+ #
+ # @return [Array<String>]
+ def elasticsearch_jars
+ Dir[File.join(settings[:es_lib_dir] || '/usr/lib/hadoop/lib', '{elasticsearch,lucene,jna,wonderdog}*.jar')].compact.uniq
+ end
+
# Returns a temporary path on the HDFS in which to store log
# data while the Hadoop job runs.
#
@@ -129,7 +151,7 @@ def hadoop_jobconf_options
def elasticsearch_hdfs_tmp_dir io
cleaner = %r{[^\w/\.\-\+]+}
io_part = [io.index, io.mapping].compact.map { |s| s.gsub(cleaner, '') }.join('/')
- File.join(settings[:es_tmp_dir], io_part, Time.now.strftime("%Y-%m-%d-%H-%M-%S"))
+ File.join(settings[:es_tmp_dir] || '/', io_part || '', Time.now.strftime("%Y-%m-%d-%H-%M-%S"))
end
end
View
147 spec/wonderdog/hadoop_invocation_override_spec.rb
@@ -7,75 +7,104 @@
let(:es_writer) { driver('regexp', 'count', input: '/tmp/input_file', output: 'es:///the_index/the_map') }
let(:es_complex) { driver('regexp', 'count', input: 'es://the_index/the_map', output: 'es:///the_index/the_map', es_query: '{"hi": "there"}', es_request_size: 1000, es_index_field: 'ID') }
- context "not interacting with Elasticsearch" do
- subject { no_es }
- # input
- its(:input_paths) { should == '/tmp/input_file' }
- its(:hadoop_commandline) { should match(%r{-input.*/tmp/input_file}i) }
+ context "passing necessary jars to Hadoop streaming" do
+ before { Dir.stub!(:[]).and_return(["/lib/dir/elasticsearch.jar"], ["/lib/dir/wonderdog.jar"]) }
+ context "when not given explicit jars" do
+ context "and not interacting with Elasticsearch" do
+ it "doesn't add jars" do
+ no_es.hadoop_commandline.should_not match('-libjars')
+ end
+ end
+ context "and reading from Elasticsearch" do
+ it "adds default jars it finds on the local filesystem" do
+ es_reader.hadoop_commandline.should match('-libjars.*elasticsearch')
+ end
+ end
+ context "and writing to Elasticsearch" do
+ it "adds default jars it finds on the local filesystem" do
+ es_writer.hadoop_commandline.should match('-libjars.*elasticsearch')
+ end
+ end
+ context "and reading and writing to Elasticsearch" do
+ it "adds default jars it finds on the local filesystem" do
+ es_complex.hadoop_commandline.should match('-libjars.*elasticsearch')
+ end
+ end
+ end
+ end
+
+ context "handling input and output paths, formats, and options when" do
- # output
- its(:output_path) { should == '/tmp/output_file' }
- its(:hadoop_commandline) { should match(%r{-output.*/tmp/output_file}i) }
+ context "not interacting with Elasticsearch" do
+ subject { no_es }
+ # input
+ its(:input_paths) { should == '/tmp/input_file' }
+ its(:hadoop_commandline) { should match(%r{-input.*/tmp/input_file}i) }
- # no elasticsearch anything
- its(:hadoop_commandline) { should_not match(/elasticsearch/i) }
- end
+ # output
+ its(:output_path) { should == '/tmp/output_file' }
+ its(:hadoop_commandline) { should match(%r{-output.*/tmp/output_file}i) }
- context "reading from Elasticsearch" do
- subject { es_reader }
-
- # input
- its(:input_paths) { should match(%r{/user.*wukong.*the_index.*the_map}) }
- its(:hadoop_commandline) { should match(/-inputformat.*elasticsearch/i) }
- its(:hadoop_commandline) { should match(%r{-input.*/user.*wukong.*the_index.*the_map}i) }
- its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.input\.index.*the_index/i) }
- its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.input\.map.*the_map/i) }
-
- # output
- its(:output_path) { should == '/tmp/output_file' }
- its(:hadoop_commandline) { should_not match(/-outputformat/i) }
- its(:hadoop_commandline) { should match(%r{-output.*/tmp/output_file}i) }
- its(:hadoop_commandline) { should_not match(/-D\s+elasticsearch\.output/i) }
- end
+ # no elasticsearch anything
+ its(:hadoop_commandline) { should_not match(/elasticsearch/i) }
+ end
- context "writing to Elasticsearch" do
- subject { es_writer }
+ context "reading from Elasticsearch" do
+ subject { es_reader }
+
+ # input
+ its(:input_paths) { should match(%r{/user.*wukong.*the_index.*the_map}) }
+ its(:hadoop_commandline) { should match(/-inputformat.*elasticsearch/i) }
+ its(:hadoop_commandline) { should match(%r{-input.*/user.*wukong.*the_index.*the_map}i) }
+ its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.input\.index.*the_index/i) }
+ its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.input\.map.*the_map/i) }
+
+ # output
+ its(:output_path) { should == '/tmp/output_file' }
+ its(:hadoop_commandline) { should_not match(/-outputformat/i) }
+ its(:hadoop_commandline) { should match(%r{-output.*/tmp/output_file}i) }
+ its(:hadoop_commandline) { should_not match(/-D\s+elasticsearch\.output/i) }
+ end
- # input
- its(:input_paths) { should == '/tmp/input_file' }
- its(:hadoop_commandline) { should_not match(/-inputformat/i) }
- its(:hadoop_commandline) { should match(%r{-input.*/tmp/input_file}i) }
- its(:hadoop_commandline) { should_not match(/-D\s+elasticsearch\.input/i) }
+ context "writing to Elasticsearch" do
+ subject { es_writer }
- # output
- its(:output_path) { should match(%r{/user.*wukong.*the_index.*the_map}) }
- its(:hadoop_commandline) { should match(/-outputformat.*elasticsearch/i) }
- its(:hadoop_commandline) { should match(%r{-output.*/user.*wukong.*the_index.*the_map}i) }
- its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.output\.index.*the_index/i) }
- its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.output\.map.*the_map/i) }
- end
+ # input
+ its(:input_paths) { should == '/tmp/input_file' }
+ its(:hadoop_commandline) { should_not match(/-inputformat/i) }
+ its(:hadoop_commandline) { should match(%r{-input.*/tmp/input_file}i) }
+ its(:hadoop_commandline) { should_not match(/-D\s+elasticsearch\.input/i) }
+
+ # output
+ its(:output_path) { should match(%r{/user.*wukong.*the_index.*the_map}) }
+ its(:hadoop_commandline) { should match(/-outputformat.*elasticsearch/i) }
+ its(:hadoop_commandline) { should match(%r{-output.*/user.*wukong.*the_index.*the_map}i) }
+ its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.output\.index.*the_index/i) }
+ its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.output\.map.*the_map/i) }
+ end
- context "reading and writing with many options" do
- subject { es_complex }
+ context "reading and writing with many options" do
+ subject { es_complex }
- # input
- its(:input_paths) { should match(%r{/user.*wukong.*the_index.*the_map}) }
- its(:hadoop_commandline) { should match(/-inputformat.*elasticsearch/i) }
- its(:hadoop_commandline) { should match(%r{-input.*/user.*wukong.*the_index.*the_map}i) }
- its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.input\.index.*the_index/i) }
- its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.input\.map.*the_map/i) }
+ # input
+ its(:input_paths) { should match(%r{/user.*wukong.*the_index.*the_map}) }
+ its(:hadoop_commandline) { should match(/-inputformat.*elasticsearch/i) }
+ its(:hadoop_commandline) { should match(%r{-input.*/user.*wukong.*the_index.*the_map}i) }
+ its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.input\.index.*the_index/i) }
+ its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.input\.map.*the_map/i) }
- # output
- its(:output_path) { should match(%r{/user.*wukong.*the_index.*the_map}) }
- its(:hadoop_commandline) { should match(/-outputformat.*elasticsearch/i) }
- its(:hadoop_commandline) { should match(%r{-output.*/user.*wukong.*the_index.*the_map}i) }
- its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.output\.index.*the_index/i) }
- its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.output\.map.*the_map/i) }
+ # output
+ its(:output_path) { should match(%r{/user.*wukong.*the_index.*the_map}) }
+ its(:hadoop_commandline) { should match(/-outputformat.*elasticsearch/i) }
+ its(:hadoop_commandline) { should match(%r{-output.*/user.*wukong.*the_index.*the_map}i) }
+ its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.output\.index.*the_index/i) }
+ its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.output\.map.*the_map/i) }
- # options
- its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.input\.query.*hi.*there/i) }
- its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.input\.request_size.*1000/i) }
- its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.output\.index\.field.*ID/i) }
+ # options
+ its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.input\.query.*hi.*there/i) }
+ its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.input\.request_size.*1000/i) }
+ its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.output\.index\.field.*ID/i) }
+ end
end
end

0 comments on commit 61057e9

Please sign in to comment.