-
Notifications
You must be signed in to change notification settings - Fork 56
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Dhruv Bansal
committed
Nov 15, 2012
1 parent
5410c6e
commit 8288f5e
Showing
7 changed files
with
262 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
--format=documentation | ||
--color |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
#!/usr/bin/env ruby | ||
|
||
require 'wonderdog' | ||
|
||
settings = Wukong::Elasticsearch::Configuration | ||
|
||
settings.use(:commandline) | ||
|
||
def settings.usage() | ||
"usage: #{File.basename($0)} [ --param=value | -p value | --param | -p] PROCESSOR|FLOW [PROCESSOR|FLOW]" | ||
end | ||
|
||
settings.description = <<EOF | ||
wu-hadoop-es is a tool that works in combination with wu-hadoop to run | ||
Hadoop jobs powered by Wukong using data read from or written to | ||
ElasticSearch. | ||
The details of how wu-hadoop-es interacts with Hadoop are the same as | ||
for wu-hadoop, so see that tool's reference for more details. | ||
wu-hadoop-es is just a layer over wu-hadoop that makes it easier to | ||
connect to ElasticSearch for reading or writing data. It does this by | ||
providing a simple way to specify ElasticSearch as an output in | ||
Hadoop: | ||
$ wu-hadoop-es mapper.rb reducer.rb --input=/hdfs/input/path --output=es://my_index/my_type | ||
with the es:// scheme-prefix implying that the --output path should be | ||
interpreted as an ElasticSearch index ("my_index") and type | ||
("my_type"). You can have more fine-grained control over how data is | ||
written, see the various "field" options. | ||
You can also easily read data: | ||
$ wu-hadoop-es mapper.rb reducer.rb --input=es://my_index/my_type --output=/hdfs/output/path | ||
and even specify a query | ||
$ wu-hadoop-es mapper.rb reducer.rb --input=es://my_index/my_type --output=/hdfs/output/path --query='{"query_string": {"query": "hi +there"}}' | ||
There are several tuning options available. ElasticSearch | ||
configuration (and how to specify which cluster to find and join) are | ||
handled using ElasticSearch's usual configuration file based approach, | ||
specified by the --es_config option. | ||
EOF | ||
|
||
# These options we want to pass on to wu-hadoop but sometimes we want | ||
# to mess with one first so we'll have to reconstruct them later | ||
# ourselves. | ||
settings.define(:input, :description => "Comma-separated list of input paths: local, HDFS, or ElasticSearch", :es => true) | ||
settings.define(:output, :description => "Output path: local, HDFS, or ElasticSearch", :es => true) | ||
settings.define(:query, :description => "Query to use when defining input splits for ElasticSearch input", :es => true) | ||
|
||
require 'wukong/boot' ; Wukong.boot!(settings) | ||
|
||
if settings.rest.empty? | ||
settings.dump_help | ||
exit(1) | ||
end | ||
|
||
Wukong::Elasticsearch::Driver.run(settings, settings.rest) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
require 'wukong' | ||
require 'wonderdog/configuration' | ||
require 'wonderdog/driver' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
module Wukong | ||
module Elasticsearch | ||
Configuration = Configliere::Param.new unless defined?(Configuration) | ||
|
||
Configuration.define(:tmp_dir, :description => "Temporary directory on the HDFS to store job files while reading/writing to ElasticSearch", :default => "/user/#{ENV['USER']}/wukong", :es => true) | ||
Configuration.define(:config, :description => "Where to find configuration files detailing how to join an ElasticSearch cluster", :es => true) | ||
Configuration.define(:input_splits, :description => "Number of input splits to target when reading from ElasticSearch", :type => Integer, :es => true) | ||
Configuration.define(:request_size, :description => "Number of objects requested during each batch read from ElasticSearch", :type => Integer, :es => true) | ||
Configuration.define(:scroll_timeout, :description => "Amount of time to wait on a scroll", :es => true) | ||
Configuration.define(:index_field, :description => "Field to use from each record to override the default index", :es => true) | ||
Configuration.define(:type_field, :description => "Field to use from each record to override the default type", :es => true) | ||
Configuration.define(:id_field, :description => "If this field is present in a record, make an update request, otherwise make a create request", :es => true) | ||
Configuration.define(:bulk_size, :description => "Number of requests to batch locally before making a request to ElasticSearch", :type => Integer, :es => true) | ||
|
||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
require 'shellwords' | ||
require 'uri' | ||
|
||
module Wukong | ||
module Elasticsearch | ||
class Driver | ||
|
||
STREAMING_INPUT_FORMAT = "com.infochimps.elasticsearch.ElasticSearchStreamingInputFormat" | ||
STREAMING_OUTPUT_FORMAT = "com.infochimps.elasticsearch.ElasticSearchStreamingOutputFormat" | ||
|
||
class IndexAndType | ||
attr_reader :index, :type | ||
def initialize uri | ||
self.uri = uri | ||
end | ||
def self.uri= u | ||
begin | ||
@uri = URI.parse(u) | ||
path = File.join(@uri.host, @uri.path) | ||
@index = path.split('/')[0] | ||
@type = path.split('/')[1] | ||
rescue URI::InvalidURIError => e | ||
raise Wukong::Error.new("Could not parse '#{u}' as an ElasticSearch /index/type specification") | ||
end | ||
end | ||
end | ||
|
||
attr_accessor :settings | ||
|
||
def self.run(settings, *extra_args) | ||
begin | ||
new(settings, *extra_args).run! | ||
rescue Wukong::Error => e | ||
$stderr.puts e.message | ||
exit(127) | ||
end | ||
end | ||
|
||
def initialize(settings, *args) | ||
@settings = settings | ||
end | ||
|
||
def read? | ||
settings[:input] && settings[:input] =~ %r!^es://! | ||
end | ||
|
||
def write? | ||
settings[:output] && settings[:output] =~ %r!^es://! | ||
end | ||
|
||
def input | ||
@input ||= IndexAndType.new(settings[:input]) | ||
end | ||
|
||
def output | ||
@output ||= IndexAndType.new(settings[:output]) | ||
end | ||
|
||
def run! | ||
# puts wu_hadoop_command | ||
exec wu_hadoop_command | ||
end | ||
|
||
def wu_hadoop_command | ||
[ | ||
'wu-hadoop', | ||
wu_hadoop_input, | ||
wu_hadoop_output, | ||
java_opts, | ||
non_es_params, | ||
args | ||
].flatten.compact.join(' ') | ||
end | ||
|
||
def args | ||
settings.rest | ||
end | ||
|
||
def wu_hadoop_input | ||
case | ||
when settings[:input] && read? | ||
"--input=#{tmp_filename(input)} --input_format=#{STREAMING_INPUT_FORMAT}" | ||
when settings[:input] | ||
"--input=#{settings[:input]}" | ||
end | ||
end | ||
|
||
def wu_hadoop_output | ||
case | ||
when settings[:output] && write? | ||
"--output=#{tmp_filename(output)} --output_format=#{STREAMING_OUTPUT_FORMAT}" | ||
when settings[:output] | ||
"--output=#{settings[:output]}" | ||
end | ||
end | ||
|
||
def non_es_params | ||
settings.reject{ |param, val| settings.definition_of(param, :es) }.map{ |param,val| "--#{param}=#{val}" } | ||
end | ||
|
||
def tmp_filename io | ||
cleaner = %r{[^\w/\.\-\+]+} | ||
io_part = [io.index, io.type].compact.map { |s| s.gsub(cleaner, '') }.join('/') | ||
job_part = settings.rest.map { |s| File.basename(s, '.rb').gsub(cleaner,'') }.join('---') | ||
File.join(settings[:tmp_dir], io_part, job_part, Time.now.strftime("%Y-%m-%d-%H-%M-%S")) | ||
end | ||
|
||
def java_opts | ||
return unless read? || write? | ||
|
||
opts = [].tap do |o| | ||
o << java_opt('es.config', settings[:config]) if settings[:config] | ||
|
||
if read? | ||
o << java_opt('elasticsearch.input.index', input.index) | ||
o << java_opt('elasticsearch.input.type', input.type) | ||
o << java_opt('elasticsearch.input.splits', settings[:input_splits]) if settings[:input_splits] | ||
o << java_opt('elasticsearch.input.query', settings[:query]) if settings[:query] | ||
o << java_opt('elasticsearch.input.request_size', settings[:request_size]) if settings[:request_size] | ||
o << java_opt('elasticsearch.input.scroll_timeout', settings[:scroll_timeout]) if settings[:scroll_timeout] | ||
end | ||
|
||
if write? | ||
o << java_opt('elasticsearch.output.index', output.index) | ||
o << java_opt('elasticsearch.output.type', output.type) | ||
o << java_opt('elasticsearch.output.index.field', settings[:index_field]) if settings[:index_field] | ||
o << java_opt('elasticsearch.output.type.field', settings[:type_field]) if settings[:type_field] | ||
o << java_opt('elasticsearch.output.id.field', settings[:id_field]) if settings[:id_field] | ||
o << java_opt('elasticsearch.output.bulk_size', settings[:bulk_size]) if settings[:bulk_size] | ||
end | ||
end | ||
return if opts.empty? | ||
joined = opts.join(' ') | ||
"--java_opts='#{joined}'" | ||
end | ||
|
||
def java_opt name, value | ||
return if value.nil? | ||
"-D #{name}=#{Shellwords.escape(value.to_s)}" | ||
end | ||
end | ||
end | ||
end | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
module Wonderdog | ||
VERSION = '0.0.1' | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
# -*- encoding: utf-8 -*- | ||
require File.expand_path('../lib/wonderdog/version', __FILE__) | ||
|
||
Gem::Specification.new do |gem| | ||
gem.name = 'wonderdog' | ||
gem.homepage = 'https://github.com/infochimps-labs/wonderdog' | ||
gem.licenses = ["Apache 2.0"] | ||
gem.email = 'coders@infochimps.com' | ||
gem.authors = ['Infochimps', 'Philip (flip) Kromer', 'Jacob Perkins', 'Travis Dempsey', 'Dhruv Bansal'] | ||
gem.version = Wonderdog::VERSION | ||
|
||
gem.summary = 'Make Hadoop and ElasticSearch play together nicely.' | ||
gem.description = <<-EOF | ||
Wonderdog provides code in both Ruby and Java to make ElasticSearch | ||
a more fully-fledged member of both the Hadoop and Wukong | ||
ecosystems. | ||
For the Java side, Wonderdog provides InputFormat and OutputFormat | ||
classes for use with Hadoop (esp. Hadoop Streaming) and Pig. | ||
For the Ruby side, Wonderdog provides a simple wrapper for wu-hadoop | ||
to make running Hadoop Streaming jobs written in Wukong against | ||
ElasticSearch easier. | ||
EOF | ||
|
||
gem.files = `git ls-files`.split("\n") | ||
gem.executables = ['wu-hadoop-es'] | ||
gem.test_files = gem.files.grep(/^spec/) | ||
gem.require_paths = ['lib'] | ||
|
||
gem.add_dependency('configliere', '~> 0.4') | ||
end |