Skip to content

Commit

Permalink
added ruby gem layer
Browse files Browse the repository at this point in the history
  • Loading branch information
Dhruv Bansal committed Nov 15, 2012
1 parent 641d9a9 commit 0e3edc6
Show file tree
Hide file tree
Showing 7 changed files with 262 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .rspec
@@ -0,0 +1,2 @@
--format=documentation
--color
61 changes: 61 additions & 0 deletions bin/wu-hadoop-es
@@ -0,0 +1,61 @@
#!/usr/bin/env ruby

require 'wonderdog'

settings = Wukong::Elasticsearch::Configuration

settings.use(:commandline)

def settings.usage()
"usage: #{File.basename($0)} [ --param=value | -p value | --param | -p] PROCESSOR|FLOW [PROCESSOR|FLOW]"
end

settings.description = <<EOF
wu-hadoop-es is a tool that works in combination with wu-hadoop to run
Hadoop jobs powered by Wukong using data read from or written to
ElasticSearch.
The details of how wu-hadoop-es interacts with Hadoop are the same as
for wu-hadoop, so see that tool's reference for more details.
wu-hadoop-es is just a layer over wu-hadoop that makes it easier to
connect to ElasticSearch for reading or writing data. It does this by
providing a simple way to specify ElasticSearch as an output in
Hadoop:
$ wu-hadoop-es mapper.rb reducer.rb --input=/hdfs/input/path --output=es://my_index/my_type
with the es:// scheme-prefix implying that the --output path should be
interpreted as an ElasticSearch index ("my_index") and type
("my_type"). You can have more fine-grained control over how data is
written, see the various "field" options.
You can also easily read data:
$ wu-hadoop-es mapper.rb reducer.rb --input=es://my_index/my_type --output=/hdfs/output/path
and even specify a query
$ wu-hadoop-es mapper.rb reducer.rb --input=es://my_index/my_type --output=/hdfs/output/path --query='{"query_string": {"query": "hi +there"}}'
There are several tuning options available. ElasticSearch
configuration (and how to specify which cluster to find and join) are
handled using ElasticSearch's usual configuration file based approach,
specified by the --es_config option.
EOF

# These options we want to pass on to wu-hadoop but sometimes we want
# to mess with one first so we'll have to reconstruct them later
# ourselves.
settings.define(:input, :description => "Comma-separated list of input paths: local, HDFS, or ElasticSearch", :es => true)
settings.define(:output, :description => "Output path: local, HDFS, or ElasticSearch", :es => true)
settings.define(:query, :description => "Query to use when defining input splits for ElasticSearch input", :es => true)

require 'wukong/boot' ; Wukong.boot!(settings)

if settings.rest.empty?
settings.dump_help
exit(1)
end

Wukong::Elasticsearch::Driver.run(settings, settings.rest)
3 changes: 3 additions & 0 deletions lib/wonderdog.rb
@@ -0,0 +1,3 @@
require 'wukong'
require 'wonderdog/configuration'
require 'wonderdog/driver'
16 changes: 16 additions & 0 deletions lib/wonderdog/configuration.rb
@@ -0,0 +1,16 @@
module Wukong
module Elasticsearch
Configuration = Configliere::Param.new unless defined?(Configuration)

Configuration.define(:tmp_dir, :description => "Temporary directory on the HDFS to store job files while reading/writing to ElasticSearch", :default => "/user/#{ENV['USER']}/wukong", :es => true)
Configuration.define(:config, :description => "Where to find configuration files detailing how to join an ElasticSearch cluster", :es => true)
Configuration.define(:input_splits, :description => "Number of input splits to target when reading from ElasticSearch", :type => Integer, :es => true)
Configuration.define(:request_size, :description => "Number of objects requested during each batch read from ElasticSearch", :type => Integer, :es => true)
Configuration.define(:scroll_timeout, :description => "Amount of time to wait on a scroll", :es => true)
Configuration.define(:index_field, :description => "Field to use from each record to override the default index", :es => true)
Configuration.define(:type_field, :description => "Field to use from each record to override the default type", :es => true)
Configuration.define(:id_field, :description => "If this field is present in a record, make an update request, otherwise make a create request", :es => true)
Configuration.define(:bulk_size, :description => "Number of requests to batch locally before making a request to ElasticSearch", :type => Integer, :es => true)

end
end
145 changes: 145 additions & 0 deletions lib/wonderdog/driver.rb
@@ -0,0 +1,145 @@
require 'shellwords'
require 'uri'

module Wukong
module Elasticsearch
class Driver

STREAMING_INPUT_FORMAT = "com.infochimps.elasticsearch.ElasticSearchStreamingInputFormat"
STREAMING_OUTPUT_FORMAT = "com.infochimps.elasticsearch.ElasticSearchStreamingOutputFormat"

class IndexAndType
attr_reader :index, :type
def initialize uri
self.uri = uri
end
def self.uri= u
begin
@uri = URI.parse(u)
path = File.join(@uri.host, @uri.path)
@index = path.split('/')[0]
@type = path.split('/')[1]
rescue URI::InvalidURIError => e
raise Wukong::Error.new("Could not parse '#{u}' as an ElasticSearch /index/type specification")
end
end
end

attr_accessor :settings

def self.run(settings, *extra_args)
begin
new(settings, *extra_args).run!
rescue Wukong::Error => e
$stderr.puts e.message
exit(127)
end
end

def initialize(settings, *args)
@settings = settings
end

def read?
settings[:input] && settings[:input] =~ %r!^es://!
end

def write?
settings[:output] && settings[:output] =~ %r!^es://!
end

def input
@input ||= IndexAndType.new(settings[:input])
end

def output
@output ||= IndexAndType.new(settings[:output])
end

def run!
# puts wu_hadoop_command
exec wu_hadoop_command
end

def wu_hadoop_command
[
'wu-hadoop',
wu_hadoop_input,
wu_hadoop_output,
java_opts,
non_es_params,
args
].flatten.compact.join(' ')
end

def args
settings.rest
end

def wu_hadoop_input
case
when settings[:input] && read?
"--input=#{tmp_filename(input)} --input_format=#{STREAMING_INPUT_FORMAT}"
when settings[:input]
"--input=#{settings[:input]}"
end
end

def wu_hadoop_output
case
when settings[:output] && write?
"--output=#{tmp_filename(output)} --output_format=#{STREAMING_OUTPUT_FORMAT}"
when settings[:output]
"--output=#{settings[:output]}"
end
end

def non_es_params
settings.reject{ |param, val| settings.definition_of(param, :es) }.map{ |param,val| "--#{param}=#{val}" }
end

def tmp_filename io
cleaner = %r{[^\w/\.\-\+]+}
io_part = [io.index, io.type].compact.map { |s| s.gsub(cleaner, '') }.join('/')
job_part = settings.rest.map { |s| File.basename(s, '.rb').gsub(cleaner,'') }.join('---')
File.join(settings[:tmp_dir], io_part, job_part, Time.now.strftime("%Y-%m-%d-%H-%M-%S"))
end

def java_opts
return unless read? || write?

opts = [].tap do |o|
o << java_opt('es.config', settings[:config]) if settings[:config]

if read?
o << java_opt('elasticsearch.input.index', input.index)
o << java_opt('elasticsearch.input.type', input.type)
o << java_opt('elasticsearch.input.splits', settings[:input_splits]) if settings[:input_splits]
o << java_opt('elasticsearch.input.query', settings[:query]) if settings[:query]
o << java_opt('elasticsearch.input.request_size', settings[:request_size]) if settings[:request_size]
o << java_opt('elasticsearch.input.scroll_timeout', settings[:scroll_timeout]) if settings[:scroll_timeout]
end

if write?
o << java_opt('elasticsearch.output.index', output.index)
o << java_opt('elasticsearch.output.type', output.type)
o << java_opt('elasticsearch.output.index.field', settings[:index_field]) if settings[:index_field]
o << java_opt('elasticsearch.output.type.field', settings[:type_field]) if settings[:type_field]
o << java_opt('elasticsearch.output.id.field', settings[:id_field]) if settings[:id_field]
o << java_opt('elasticsearch.output.bulk_size', settings[:bulk_size]) if settings[:bulk_size]
end
end
return if opts.empty?
joined = opts.join(' ')
"--java_opts='#{joined}'"
end

def java_opt name, value
return if value.nil?
"-D #{name}=#{Shellwords.escape(value.to_s)}"
end
end
end
end


3 changes: 3 additions & 0 deletions lib/wonderdog/version.rb
@@ -0,0 +1,3 @@
module Wonderdog
VERSION = '0.0.1'
end
32 changes: 32 additions & 0 deletions wonderdog.gemspec
@@ -0,0 +1,32 @@
# -*- encoding: utf-8 -*-
require File.expand_path('../lib/wonderdog/version', __FILE__)

Gem::Specification.new do |gem|
gem.name = 'wonderdog'
gem.homepage = 'https://github.com/infochimps-labs/wonderdog'
gem.licenses = ["Apache 2.0"]
gem.email = 'coders@infochimps.com'
gem.authors = ['Infochimps', 'Philip (flip) Kromer', 'Jacob Perkins', 'Travis Dempsey', 'Dhruv Bansal']
gem.version = Wonderdog::VERSION

gem.summary = 'Make Hadoop and ElasticSearch play together nicely.'
gem.description = <<-EOF
Wonderdog provides code in both Ruby and Java to make ElasticSearch
a more fully-fledged member of both the Hadoop and Wukong
ecosystems.
For the Java side, Wonderdog provides InputFormat and OutputFormat
classes for use with Hadoop (esp. Hadoop Streaming) and Pig.
For the Ruby side, Wonderdog provides a simple wrapper for wu-hadoop
to make running Hadoop Streaming jobs written in Wukong against
ElasticSearch easier.
EOF

gem.files = `git ls-files`.split("\n")
gem.executables = ['wu-hadoop-es']
gem.test_files = gem.files.grep(/^spec/)
gem.require_paths = ['lib']

gem.add_dependency('configliere', '~> 0.4')
end

0 comments on commit 0e3edc6

Please sign in to comment.