Permalink
Browse files

all specs pass with new plugin architecture

  • Loading branch information...
1 parent b51794e commit d18cf911859a17adc1c4eefe8fd2694dc5b84921 Dhruv Bansal committed Dec 21, 2012
View
@@ -0,0 +1,6 @@
+--readme README.md
+--markup markdown
+-
+CHANGELOG.md
+LICENSE.md
+README.md
View
@@ -0,0 +1,2 @@
+Wukong.dataflow(:mapper) { identity }
+Wukong.dataflow(:reducer) { identity }
View
@@ -0,0 +1,4 @@
+require 'wonderdog'
+Wukong.dataflow(:mapper) { identity }
+Wukong.dataflow(:reducer) { identity }
+
View
@@ -6,9 +6,37 @@ module Wukong
# Wukong. This module adds some overrides which enables the
# <tt>wu-hadoop</tt> program to leverage this code.
module Elasticsearch
+ include Plugin
+
+ # Configure the given `settings` to be able to work with
+ # Elasticsearch.
+ #
+ # @param [Configliere::Param] settings
+ # @return [Configliere::Param] the newly configured settings
+ def self.configure settings, program
+ return unless program == 'wu-hadoop'
+ settings.define(:es_tmp_dir, :description => "Temporary directory on the HDFS to store job files while reading/writing to ElasticSearch", :default => "/user/#{ENV['USER']}/wukong", :wukong_hadoop => true)
+ settings.define(:es_lib_dir, :description => "Directory containing Elasticsearch, Wonderdog, and other support jars", :default => "/usr/lib/hadoop/lib", :wukong_hadoop => true)
+ settings.define(:es_config, :description => "Where to find configuration files detailing how to join an ElasticSearch cluster", :wukong_hadoop => true)
+ settings.define(:es_input_splits, :description => "Number of input splits to target when reading from ElasticSearch", :type => Integer, :wukong_hadoop => true)
+ settings.define(:es_request_size, :description => "Number of objects requested during each batch read from ElasticSearch", :type => Integer, :wukong_hadoop => true)
+ settings.define(:es_scroll_timeout, :description => "Amount of time to wait on a scroll", :wukong_hadoop => true)
+ settings.define(:es_index_field, :description => "Field to use from each record to override the default index", :wukong_hadoop => true)
+ settings.define(:es_mapping_field, :description => "Field to use from each record to override the default mapping", :wukong_hadoop => true)
+ settings.define(:es_id_field, :description => "If this field is present in a record, make an update request, otherwise make a create request", :wukong_hadoop => true)
+ settings.define(:es_bulk_size, :description => "Number of requests to batch locally before making a request to ElasticSearch", :type => Integer, :wukong_hadoop => true)
+ settings.define(:es_query, :description => "Query to use when defining input splits for ElasticSearch input", :wukong_hadoop => true)
+ end
+
+ # Boot Wonderdog with the given `settings` in the given `dir`.
+ #
+ # @param [Configliere::Param] settings
+ # @param [String] root
+ def self.boot settings, root
+ end
+
end
end
-require 'wonderdog/configuration'
require 'wonderdog/hadoop_invocation_override'
require 'wonderdog/timestamp'
@@ -1,26 +0,0 @@
-module Wukong
- module Elasticsearch
-
- # Configure the given +settings+ to be able to work with
- # Elasticsearch.
- #
- # @param [Configliere::Param] settings
- # @return [Configliere::Param] the newly configured settings
- def self.configure settings
- settings.define(:es_tmp_dir, :description => "Temporary directory on the HDFS to store job files while reading/writing to ElasticSearch", :default => "/user/#{ENV['USER']}/wukong", :wukong_hadoop => true)
- settings.define(:es_lib_dir, :description => "Directory containing Elasticsearch, Wonderdog, and other support jars", :default => "/usr/lib/hadoop/lib", :wukong_hadoop => true)
- settings.define(:es_config, :description => "Where to find configuration files detailing how to join an ElasticSearch cluster", :wukong_hadoop => true)
- settings.define(:es_input_splits, :description => "Number of input splits to target when reading from ElasticSearch", :type => Integer, :wukong_hadoop => true)
- settings.define(:es_request_size, :description => "Number of objects requested during each batch read from ElasticSearch", :type => Integer, :wukong_hadoop => true)
- settings.define(:es_scroll_timeout, :description => "Amount of time to wait on a scroll", :wukong_hadoop => true)
- settings.define(:es_index_field, :description => "Field to use from each record to override the default index", :wukong_hadoop => true)
- settings.define(:es_mapping_field, :description => "Field to use from each record to override the default mapping", :wukong_hadoop => true)
- settings.define(:es_id_field, :description => "If this field is present in a record, make an update request, otherwise make a create request", :wukong_hadoop => true)
- settings.define(:es_bulk_size, :description => "Number of requests to batch locally before making a request to ElasticSearch", :type => Integer, :wukong_hadoop => true)
- settings.define(:es_query, :description => "Query to use when defining input splits for ElasticSearch input", :wukong_hadoop => true)
-
- settings
- end
- end
-
-end
@@ -99,8 +99,8 @@ def output_path
# @return [Array<String>]
def hadoop_jobconf_options
if reads_from_elasticsearch? || writes_to_elasticsearch?
- settings[:map_speculative] = false if settings[:map_speculative].nil?
- settings[:reduce_speculative] = false if settings[:reduce_speculative].nil?
+ settings[:map_speculative] = 'false' if settings[:map_speculative].nil?
+ settings[:reduce_speculative] = 'false' if settings[:reduce_speculative].nil?
end
super() + [].tap do |o|
@@ -164,5 +164,5 @@ def elasticsearch_hdfs_tmp_dir io
end
end
- Hadoop::Driver.class_eval { include Elasticsearch::HadoopInvocationOverride }
+ Hadoop::HadoopRunner.class_eval { include Elasticsearch::HadoopInvocationOverride }
end
@@ -26,7 +26,7 @@ class IndexAndMapping
# @param [String]
attr_reader :mapping
- # Does the given +string+ look like a possible Elasticsearch
+ # Does the given `string` look like a possible Elasticsearch
# /index/mapping specification?
#
# @param [String] string
View
@@ -1,22 +1,29 @@
require 'wonderdog'
require 'wukong/spec_helpers'
-require_relative('support/integration_helper')
-require_relative('support/driver_helper')
-
RSpec.configure do |config|
config.before(:each) do
+ Wukong::Log.level = Log4r::OFF
@orig_reg = Wukong.registry.show
end
config.after(:each) do
Wukong.registry.clear!
Wukong.registry.merge!(@orig_reg)
end
-
+
include Wukong::SpecHelpers
- include Wukong::Elasticsearch::IntegrationHelper
- include Wukong::Elasticsearch::DriverHelper
-end
+
+ def root
+ @root ||= Pathname.new(File.expand_path('../..', __FILE__))
+ end
+ def hadoop_runner *args, &block
+ runner(Wukong::Hadoop::HadoopRunner, *args) do
+ stub!(:execute_command!)
+ instance_eval(&block) if block_given?
+ end
+ end
+
+end
View
No changes.
@@ -1,15 +0,0 @@
-module Wukong
- module Elasticsearch
- module DriverHelper
-
- def driver *args
- params = Elasticsearch.configure(Hadoop.configure(Configliere::Param.new))
- params.resolve!
- params.merge!(args.pop) if args.last.is_a?(Hash)
- Hadoop::Driver.new(params, *args)
- end
-
- end
- end
-end
-
@@ -1,30 +0,0 @@
-module Wukong
- module Elasticsearch
- module IntegrationHelper
-
- def root
- @root ||= Pathname.new(File.expand_path('../../..', __FILE__))
- end
-
- def lib_dir
- root.join('lib')
- end
-
- def bin_dir
- root.join('bin')
- end
-
- def integration_env
- {
- "RUBYLIB" => [lib_dir.to_s, ENV["RUBYLIB"]].compact.join(':')
- }
- end
-
- def integration_cwd
- root.to_s
- end
-
- end
- end
-end
-
@@ -2,10 +2,10 @@
describe Wukong::Elasticsearch::HadoopInvocationOverride do
- let(:no_es) { driver('regexp', 'count', input: '/tmp/input_file', output: '/tmp/output_file') }
- let(:es_reader) { driver('regexp', 'count', input: 'es://the_index/the_map', output: '/tmp/output_file') }
- let(:es_writer) { driver('regexp', 'count', input: '/tmp/input_file', output: 'es:///the_index/the_map') }
- let(:es_complex) { driver('regexp', 'count', input: 'es://the_index/the_map', output: 'es:///the_index/the_map', es_query: '{"hi": "there"}', es_request_size: 1000, es_index_field: 'ID') }
+ let(:no_es) { hadoop_runner('regexp', 'count', input: '/tmp/input_file', output: '/tmp/output_file') }
+ let(:es_reader) { hadoop_runner('regexp', 'count', input: 'es://the_index/the_map', output: '/tmp/output_file') }
+ let(:es_writer) { hadoop_runner('regexp', 'count', input: '/tmp/input_file', output: 'es:///the_index/the_map') }
+ let(:es_complex) { hadoop_runner('regexp', 'count', input: 'es://the_index/the_map', output: 'es:///the_index/the_map', es_query: '{"hi": "there"}', es_request_size: 1000, es_index_field: 'ID', map_speculative: true, reduce_speculative: true) }
context "passing necessary jars to Hadoop streaming" do
before { Dir.stub!(:[]).and_return(["/lib/dir/elasticsearch.jar"], ["/lib/dir/wonderdog.jar"]) }
@@ -36,16 +36,32 @@
context "setting speculative execution" do
context "when not given speculative options" do
context "and not interacting with Elasticsearch" do
- it "doesn't add jars" do
+ it "doesn't add any speculative options" do
no_es.hadoop_commandline.should_not match('speculative')
end
end
context "and reading from Elasticsearch" do
- it "adds default jars it finds on the local filesystem" do
- es_reader.hadoop_commandline.should match('-mapred.map.tasks.speculative.execution.*false')
- es_reader.hadoop_commandline.should match('-mapred.reduce.tasks.speculative.execution.*false')
+ it "disables speculative execution in the mapper" do
+ es_reader.hadoop_commandline.should match(/-D mapred.map.tasks.speculative.execution.*false/)
+ end
+ it "disables speculative execution in the reducer" do
+ es_reader.hadoop_commandline.should match(/-D mapred.reduce.tasks.speculative.execution.*false/)
end
end
+ context "and reading from Elasticsearch" do
+ it "disables speculative execution in the mapper" do
+ es_writer.hadoop_commandline.should match(/-D mapred.map.tasks.speculative.execution.*false/)
+ end
+ it "disables speculative execution in the reducer" do
+ es_writer.hadoop_commandline.should match(/-D mapred.reduce.tasks.speculative.execution.*false/)
+ end
+ end
+ end
+ context "when given speculative options" do
+ it "does not change them" do
+ es_complex.hadoop_commandline.should match(/-D mapred.map.tasks.speculative.execution.*true/)
+ es_complex.hadoop_commandline.should match(/-D mapred.reduce.tasks.speculative.execution.*true/)
+ end
end
end
@@ -0,0 +1,18 @@
+require 'spec_helper'
+
+describe 'wu-hadoop' do
+
+ context "when wonderdog hasn't been required" do
+ let(:script) { examples_dir('no_wonderdog.rb') }
+ it "doesn't recognize Elasticsearch URIs" do
+ command('wu-hadoop', script, '--input=es://foo/bar', '--output=/some/path', '--dry_run').should_not have_stdout('elasticsearch')
+ end
+ end
+
+ context "when wonderdog hasn't been required" do
+ let(:script) { examples_dir('wonderdog.rb') }
+ it "recognizes Elasticsearch URIs" do
+ command('wu-hadoop', script, '--input=es://foo/bar', '--output=/some/path', '--dry_run').should have_stdout('elasticsearch')
+ end
+ end
+end
View
@@ -0,0 +1,5 @@
+require 'spec_helper'
+
+describe Wukong::Elasticsearch do
+ it_behaves_like 'a plugin'
+end

0 comments on commit d18cf91

Please sign in to comment.