Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

hi travis

  • Loading branch information...
commit 806e4ee38807e2d06811742f37441d12b4d8c618 1 parent 8288f5e
@dhruvbansal dhruvbansal authored
View
10 Rakefile
@@ -0,0 +1,10 @@
+require 'bundler'
+Bundler::GemHelper.install_tasks
+
+require 'rspec/core/rake_task'
+RSpec::Core::RakeTask.new(:specs)
+
+require 'yard'
+YARD::Rake::YardocTask.new
+
+task :default => [:specs]
View
6 bin/wu-hadoop-es
@@ -47,9 +47,9 @@ EOF
# These options we want to pass on to wu-hadoop but sometimes we want
# to mess with one first so we'll have to reconstruct them later
# ourselves.
-settings.define(:input, :description => "Comma-separated list of input paths: local, HDFS, or ElasticSearch", :es => true)
-settings.define(:output, :description => "Output path: local, HDFS, or ElasticSearch", :es => true)
-settings.define(:query, :description => "Query to use when defining input splits for ElasticSearch input", :es => true)
+settings.define(:input, :description => "Comma-separated list of input paths: local, HDFS, or ElasticSearch", :wonderdog => true)
+settings.define(:output, :description => "Output path: local, HDFS, or ElasticSearch", :wonderdog => true)
+
require 'wukong/boot' ; Wukong.boot!(settings)
View
10 lib/wonderdog.rb
@@ -1,3 +1,13 @@
require 'wukong'
+module Wukong
+
+ # Wonderdog provides Java code that couples Hadoop streaming to
+ # Wukong. This module adds some overrides which enables the
+ # <tt>wu-hadoop</tt> program to leverage this code.
+ module Elasticsearch
+ end
+end
+
require 'wonderdog/configuration'
require 'wonderdog/driver'
+
View
28 lib/wonderdog/configuration.rb
@@ -1,16 +1,22 @@
module Wukong
module Elasticsearch
- Configuration = Configliere::Param.new unless defined?(Configuration)
- Configuration.define(:tmp_dir, :description => "Temporary directory on the HDFS to store job files while reading/writing to ElasticSearch", :default => "/user/#{ENV['USER']}/wukong", :es => true)
- Configuration.define(:config, :description => "Where to find configuration files detailing how to join an ElasticSearch cluster", :es => true)
- Configuration.define(:input_splits, :description => "Number of input splits to target when reading from ElasticSearch", :type => Integer, :es => true)
- Configuration.define(:request_size, :description => "Number of objects requested during each batch read from ElasticSearch", :type => Integer, :es => true)
- Configuration.define(:scroll_timeout, :description => "Amount of time to wait on a scroll", :es => true)
- Configuration.define(:index_field, :description => "Field to use from each record to override the default index", :es => true)
- Configuration.define(:type_field, :description => "Field to use from each record to override the default type", :es => true)
- Configuration.define(:id_field, :description => "If this field is present in a record, make an update request, otherwise make a create request", :es => true)
- Configuration.define(:bulk_size, :description => "Number of requests to batch locally before making a request to ElasticSearch", :type => Integer, :es => true)
-
+ # Configure the given +settings+ to be able to work with
+ # Elasticsearch.
+ #
+ # @param [Configliere::Param] settings
+ def self.configure settings
+ settings.define(:es_tmp_dir, :description => "Temporary directory on the HDFS to store job files while reading/writing to ElasticSearch", :default => "/user/#{ENV['USER']}/wukong", :wukong_hadoop => true)
+ settings.define(:es_config, :description => "Where to find configuration files detailing how to join an ElasticSearch cluster", :wukong_hadoop => true)
+ settings.define(:es_input_splits, :description => "Number of input splits to target when reading from ElasticSearch", :type => Integer, :wukong_hadoop => true)
+ settings.define(:es_request_size, :description => "Number of objects requested during each batch read from ElasticSearch", :type => Integer, :wukong_hadoop => true)
+ settings.define(:es_scroll_timeout, :description => "Amount of time to wait on a scroll", :wukong_hadoop => true)
+ settings.define(:es_index_field, :description => "Field to use from each record to override the default index", :wukong_hadoop => true)
+ settings.define(:es_type_field, :description => "Field to use from each record to override the default type", :wukong_hadoop => true)
+ settings.define(:es_id_field, :description => "If this field is present in a record, make an update request, otherwise make a create request", :wukong_hadoop => true)
+ settings.define(:es_bulk_size, :description => "Number of requests to batch locally before making a request to ElasticSearch", :type => Integer, :wukong_hadoop => true)
+ settings.define(:es_query, :description => "Query to use when defining input splits for ElasticSearch input", :wukong_hadoop => true)
+ end
end
+
end
View
145 lib/wonderdog/driver.rb
@@ -1,145 +0,0 @@
-require 'shellwords'
-require 'uri'
-
-module Wukong
- module Elasticsearch
- class Driver
-
- STREAMING_INPUT_FORMAT = "com.infochimps.elasticsearch.ElasticSearchStreamingInputFormat"
- STREAMING_OUTPUT_FORMAT = "com.infochimps.elasticsearch.ElasticSearchStreamingOutputFormat"
-
- class IndexAndType
- attr_reader :index, :type
- def initialize uri
- self.uri = uri
- end
- def self.uri= u
- begin
- @uri = URI.parse(u)
- path = File.join(@uri.host, @uri.path)
- @index = path.split('/')[0]
- @type = path.split('/')[1]
- rescue URI::InvalidURIError => e
- raise Wukong::Error.new("Could not parse '#{u}' as an ElasticSearch /index/type specification")
- end
- end
- end
-
- attr_accessor :settings
-
- def self.run(settings, *extra_args)
- begin
- new(settings, *extra_args).run!
- rescue Wukong::Error => e
- $stderr.puts e.message
- exit(127)
- end
- end
-
- def initialize(settings, *args)
- @settings = settings
- end
-
- def read?
- settings[:input] && settings[:input] =~ %r!^es://!
- end
-
- def write?
- settings[:output] && settings[:output] =~ %r!^es://!
- end
-
- def input
- @input ||= IndexAndType.new(settings[:input])
- end
-
- def output
- @output ||= IndexAndType.new(settings[:output])
- end
-
- def run!
- # puts wu_hadoop_command
- exec wu_hadoop_command
- end
-
- def wu_hadoop_command
- [
- 'wu-hadoop',
- wu_hadoop_input,
- wu_hadoop_output,
- java_opts,
- non_es_params,
- args
- ].flatten.compact.join(' ')
- end
-
- def args
- settings.rest
- end
-
- def wu_hadoop_input
- case
- when settings[:input] && read?
- "--input=#{tmp_filename(input)} --input_format=#{STREAMING_INPUT_FORMAT}"
- when settings[:input]
- "--input=#{settings[:input]}"
- end
- end
-
- def wu_hadoop_output
- case
- when settings[:output] && write?
- "--output=#{tmp_filename(output)} --output_format=#{STREAMING_OUTPUT_FORMAT}"
- when settings[:output]
- "--output=#{settings[:output]}"
- end
- end
-
- def non_es_params
- settings.reject{ |param, val| settings.definition_of(param, :es) }.map{ |param,val| "--#{param}=#{val}" }
- end
-
- def tmp_filename io
- cleaner = %r{[^\w/\.\-\+]+}
- io_part = [io.index, io.type].compact.map { |s| s.gsub(cleaner, '') }.join('/')
- job_part = settings.rest.map { |s| File.basename(s, '.rb').gsub(cleaner,'') }.join('---')
- File.join(settings[:tmp_dir], io_part, job_part, Time.now.strftime("%Y-%m-%d-%H-%M-%S"))
- end
-
- def java_opts
- return unless read? || write?
-
- opts = [].tap do |o|
- o << java_opt('es.config', settings[:config]) if settings[:config]
-
- if read?
- o << java_opt('elasticsearch.input.index', input.index)
- o << java_opt('elasticsearch.input.type', input.type)
- o << java_opt('elasticsearch.input.splits', settings[:input_splits]) if settings[:input_splits]
- o << java_opt('elasticsearch.input.query', settings[:query]) if settings[:query]
- o << java_opt('elasticsearch.input.request_size', settings[:request_size]) if settings[:request_size]
- o << java_opt('elasticsearch.input.scroll_timeout', settings[:scroll_timeout]) if settings[:scroll_timeout]
- end
-
- if write?
- o << java_opt('elasticsearch.output.index', output.index)
- o << java_opt('elasticsearch.output.type', output.type)
- o << java_opt('elasticsearch.output.index.field', settings[:index_field]) if settings[:index_field]
- o << java_opt('elasticsearch.output.type.field', settings[:type_field]) if settings[:type_field]
- o << java_opt('elasticsearch.output.id.field', settings[:id_field]) if settings[:id_field]
- o << java_opt('elasticsearch.output.bulk_size', settings[:bulk_size]) if settings[:bulk_size]
- end
- end
- return if opts.empty?
- joined = opts.join(' ')
- "--java_opts='#{joined}'"
- end
-
- def java_opt name, value
- return if value.nil?
- "-D #{name}=#{Shellwords.escape(value.to_s)}"
- end
- end
- end
-end
-
-
View
155 lib/wonderdog/hadoop_invocation.rb
@@ -0,0 +1,155 @@
+require_relative("index_and_type")
+
+module Wukong
+ module Elasticsearch
+
+ # This module overrides some methods defined in
+ # Wukong::Hadoop::HadoopInvocation. The overrides will only come
+ # into play if the job's input or output paths are URIs beginning
+ # with 'es://', implying reading or writing to/from Elasticsearch
+ # indices.
+ module HadoopInvocationOverride
+
+ # The input format when reading from Elasticsearch as defined in
+ # the Java code accompanying Wonderdog.
+ #
+ # @param [String]
+ STREAMING_INPUT_FORMAT = "com.infochimps.elasticsearch.ElasticSearchStreamingInputFormat"
+
+ # The output format when writing to Elasticsearch as defined in
+ # the Java code accompanying Wonderdog.
+ #
+ # @param [String]
+ STREAMING_OUTPUT_FORMAT = "com.infochimps.elasticsearch.ElasticSearchStreamingOutputFormat"
+
+ # A regular expression that matches URIs describing an
+ # Elasticsearch index and/or type to read/write from/to.
+ #
+ # @param [Regexp]
+ ES_SCHEME_REGEXP = %r{^es://}
+
+ # Does this job read from Elasticsearch?
+ #
+ # @return [true, false]
+ def reads_from_elasticsearch?
+ settings[:input] && settings[:input] =~ ES_SCHEME_REGEXP
+ end
+
+ # The input format to use for this job.
+ #
+ # Will override the default value to STREAMING_INPUT_FORMAT if
+ # reading from Elasticsearch.
+ #
+ # @return [String]
+ def input_format
+ reads_from_elasticsearch? ? STREAMING_INPUT_FORMAT : super()
+ end
+
+ # The input index to use.
+ #
+ # @return [IndexAndType]
+ def input_index
+ @input_index ||= IndexAndType.new(settings[:input])
+ end
+
+ # The input paths to use for this job.
+ #
+ # Will override the default value with a temporary HDFS path
+ # when reading from Elasticsearch.
+ #
+ # @return [String]
+ def input_paths
+ if reads_from_elasticsearch?
+ elasticsearch_hdfs_tmp_dir(input_index)
+ else
+ super()
+ end
+ end
+
+ # Does this write to Elasticsearch?
+ #
+ # @return [true, false]
+ def writes_to_elasticsearch?
+ settings[:output] && settings[:output] =~ ES_SCHEME_REGEXP
+ end
+
+ # The output format to use for this job.
+ #
+ # Will override the default value to STREAMING_OUTPUT_FORMAT if
+ # writing to Elasticsearch.
+ #
+ # @return [String]
+ def output_format
+ writes_to_elasticsearch? ? STREAMING_OUTPUT_FORMAT : super()
+ end
+
+ # The output index to use.
+ #
+ # @return [IndexAndType]
+ def output_index
+ @output_index ||= IndexAndType.new(settings[:output])
+ end
+
+ # The output path to use for this job.
+ #
+ # Will override the default value with a temporary HDFS path
+ # when writing to Elasticsearch.
+ #
+ # @return [String]
+ def output_path
+ if writes_to_elasticsearch?
+ elasticsearch_hdfs_tmp_dir(output_index)
+ else
+ super()
+ end
+ end
+
+ # Adds Java options required to interact with the input/output
+ # formats defined by the Java code accompanying Wonderdog.
+ #
+ # Will not change the default Hadoop jobconf options unless it
+ # has to.
+ #
+ # @return [Array<String>]
+ def hadoop_jobconf_options
+ super() + [].tap do |o|
+ o << java_opt('es.config', settings[:config]) if reads_from_elasticsearch? || writes_to_elasticsearch?
+
+ if reads_from_elasticsearch??
+ o << java_opt('elasticsearch.input.index', input_index.index)
+ o << java_opt('elasticsearch.input.type', input_index.type)
+ o << java_opt('elasticsearch.input.splits', settings[:input_splits])
+ o << java_opt('elasticsearch.input.query', settings[:query])
+ o << java_opt('elasticsearch.input.request_size', settings[:request_size])
+ o << java_opt('elasticsearch.input.scroll_timeout', settings[:scroll_timeout])
+ end
+
+ if writes_to_elasticsearch??
+ o << java_opt('elasticsearch.output.index', output_index.index)
+ o << java_opt('elasticsearch.output.type', output_index.type)
+ o << java_opt('elasticsearch.output.index.field', settings[:index_field])
+ o << java_opt('elasticsearch.output.type.field', settings[:type_field])
+ o << java_opt('elasticsearch.output.id.field', settings[:id_field])
+ o << java_opt('elasticsearch.output.bulk_size', settings[:bulk_size])
+ end
+ end.flatten.compact
+ end
+
+ # Returns a temporary path on the HDFS in which to store log
+ # data while the Hadoop job runs.
+ #
+ # @param [IndexAndType] io
+ # @return [String]
+ def elasticsearch_hdfs_tmp_dir io
+ cleaner = %r{[^\w/\.\-\+]+}
+ io_part = [io.index, io.type].compact.map { |s| s.gsub(cleaner, '') }.join('/')
+ File.join(settings[:tmp_dir], io_part, job_name, Time.now.strftime("%Y-%m-%d-%H-%M-%S"))
+ end
+
+ end
+ end
+
+ if defined?(Hadoop::HadoopInvocation)
+ Hadoop::HadoopInvocation.send(:include, Elasticsearch::HadoopInvocationOverride)
+ end
+end
View
52 lib/wonderdog/index_and_type.rb
@@ -0,0 +1,52 @@
+module Wukong
+ module Elasticsearch
+
+ # A convenient class for parsing Elasticsearch index and type URIs
+ # like
+ #
+ # - es://my_index
+ # - es://my_index/my_type
+ # - es://first_index,second_index,third_index
+ # - es://my_index/first_type,second_type,third_type
+ class IndexAndType
+
+ # The Elasticsearch index.
+ #
+ # @param [String]
+ attr_reader :index
+
+ # The Elasticsearch type.
+ #
+ # @param [String]
+ attr_reader :type
+
+ # Create a new index and type specification from the given
+ # +uri..
+ #
+ # @param [String] uri
+ def initialize uri
+ self.uri = uri
+ end
+
+ # Set the URI of this index and type specification, parsing it
+ # for an index and type.
+ #
+ # Will raise an error if the given URI is malformed.
+ #
+ # @param [String] uri
+ def self.uri= uri
+ begin
+ @uri = URI.parse(uri)
+ path = File.join(@uri.host, @uri.path)
+ @index = path.split('/')[0]
+ @type = path.split('/')[1]
+ rescue URI::InvalidURIError => e
+ raise Wukong::Error.new("Could not parse '#{uri}' as an ElasticSearch /index/type specification")
+ end
+ end
+ end
+ end
+end
+
+
+
View
19 spec/spec_helper.rb
@@ -0,0 +1,19 @@
+require 'wukong-hadoop'
+require_relative('support/integration_helper')
+require 'wukong/spec_helpers'
+
+RSpec.configure do |config|
+
+ config.before(:each) do
+ @orig_reg = Wukong.registry.show
+ end
+
+ config.after(:each) do
+ Wukong.registry.clear!
+ Wukong.registry.merge!(@orig_reg)
+ end
+
+ include Wukong::SpecHelpers
+ include Wukong::Elasticsearch::IntegrationHelper
+end
+
View
15 spec/support/driver_helper.rb
@@ -0,0 +1,15 @@
+module Wukong
+ module Elasticsearch
+ module DriverHelper
+
+ def driver *args
+ params = ::Wukong::Hadoop.configure(Configliere::Param.new)
+ params.resolve!
+ params.merge!(args.pop) if args.last.is_a?(Hash)
+ Wukong::Hadoop::Driver.new(params, *args)
+ end
+
+ end
+ end
+end
+
View
30 spec/support/integration_helper.rb
@@ -0,0 +1,30 @@
+module Wukong
+ module Elasticsearch
+ module IntegrationHelper
+
+ def root
+ @root ||= Pathname.new(File.expand_path('../../..', __FILE__))
+ end
+
+ def lib_dir
+ root.join('lib')
+ end
+
+ def bin_dir
+ root.join('bin')
+ end
+
+ def integration_env
+ {
+ "RUBYLIB" => [lib_dir.to_s, ENV["RUBYLIB"]].compact.join(':')
+ }
+ end
+
+ def integration_cwd
+ root.to_s
+ end
+
+ end
+ end
+end
+
View
8 wonderdog.gemspec
@@ -11,20 +11,20 @@ Gem::Specification.new do |gem|
gem.summary = 'Make Hadoop and ElasticSearch play together nicely.'
gem.description = <<-EOF
- Wonderdog provides code in both Ruby and Java to make ElasticSearch
+ Wonderdog provides code in both Ruby and Java to make Elasticsearch
a more fully-fledged member of both the Hadoop and Wukong
ecosystems.
For the Java side, Wonderdog provides InputFormat and OutputFormat
classes for use with Hadoop (esp. Hadoop Streaming) and Pig.
- For the Ruby side, Wonderdog provides a simple wrapper for wu-hadoop
- to make running Hadoop Streaming jobs written in Wukong against
+ For the Ruby side, Wonderdog provides extensions for wu-hadoop to
+ make running Hadoop Streaming jobs written in Wukong against
ElasticSearch easier.
EOF
gem.files = `git ls-files`.split("\n")
- gem.executables = ['wu-hadoop-es']
+ gem.executables = []
gem.test_files = gem.files.grep(/^spec/)
gem.require_paths = ['lib']
Please sign in to comment.
Something went wrong with that request. Please try again.