Permalink
Browse files

type => mapping throughout to prevent confusing with _type key

  • Loading branch information...
1 parent 21a7636 commit 16f1e007b0deabbd51d3be46623f94df412d41d8 Dhruv Bansal committed Nov 30, 2012
View
@@ -11,3 +11,4 @@ module Elasticsearch
require 'wonderdog/configuration'
require 'wonderdog/hadoop_invocation_override'
+require 'wonderdog/timestamp'
@@ -13,7 +13,7 @@ def self.configure settings
settings.define(:es_request_size, :description => "Number of objects requested during each batch read from ElasticSearch", :type => Integer, :wukong_hadoop => true)
settings.define(:es_scroll_timeout, :description => "Amount of time to wait on a scroll", :wukong_hadoop => true)
settings.define(:es_index_field, :description => "Field to use from each record to override the default index", :wukong_hadoop => true)
- settings.define(:es_type_field, :description => "Field to use from each record to override the default type", :wukong_hadoop => true)
+ settings.define(:es_mapping_field, :description => "Field to use from each record to override the default mapping", :wukong_hadoop => true)
settings.define(:es_id_field, :description => "If this field is present in a record, make an update request, otherwise make a create request", :wukong_hadoop => true)
settings.define(:es_bulk_size, :description => "Number of requests to batch locally before making a request to ElasticSearch", :type => Integer, :wukong_hadoop => true)
settings.define(:es_query, :description => "Query to use when defining input splits for ElasticSearch input", :wukong_hadoop => true)
@@ -1,4 +1,4 @@
-require_relative("index_and_type")
+require_relative("index_and_mapping")
module Wukong
module Elasticsearch
@@ -26,7 +26,7 @@ module HadoopInvocationOverride
#
# @return [true, false]
def reads_from_elasticsearch?
- IndexAndType.matches?(settings[:input])
+ IndexAndMapping.matches?(settings[:input])
end
# The input format to use for this job.
@@ -41,9 +41,9 @@ def input_format
# The input index to use.
#
- # @return [IndexAndType]
+ # @return [IndexAndMapping]
def input_index
- @input_index ||= IndexAndType.new(settings[:input])
+ @input_index ||= IndexAndMapping.new(settings[:input])
end
# The input paths to use for this job.
@@ -60,7 +60,7 @@ def input_paths
#
# @return [true, false]
def writes_to_elasticsearch?
- IndexAndType.matches?(settings[:output])
+ IndexAndMapping.matches?(settings[:output])
end
# The output format to use for this job.
@@ -75,9 +75,9 @@ def output_format
# The output index to use.
#
- # @return [IndexAndType]
+ # @return [IndexAndMapping]
def output_index
- @output_index ||= IndexAndType.new(settings[:output])
+ @output_index ||= IndexAndMapping.new(settings[:output])
end
# The output path to use for this job.
@@ -103,32 +103,32 @@ def hadoop_jobconf_options
if reads_from_elasticsearch?
o << java_opt('elasticsearch.input.index', input_index.index)
- o << java_opt('elasticsearch.input.type', input_index.type)
+ o << java_opt('elasticsearch.input.mapping', input_index.mapping)
o << java_opt('elasticsearch.input.splits', settings[:es_input_splits])
o << java_opt('elasticsearch.input.query', settings[:es_query])
o << java_opt('elasticsearch.input.request_size', settings[:es_request_size])
o << java_opt('elasticsearch.input.scroll_timeout', settings[:es_scroll_timeout])
end
if writes_to_elasticsearch?
- o << java_opt('elasticsearch.output.index', output_index.index)
- o << java_opt('elasticsearch.output.type', output_index.type)
- o << java_opt('elasticsearch.output.index.field', settings[:es_index_field])
- o << java_opt('elasticsearch.output.type.field', settings[:es_type_field])
- o << java_opt('elasticsearch.output.id.field', settings[:es_id_field])
- o << java_opt('elasticsearch.output.bulk_size', settings[:es_bulk_size])
+ o << java_opt('elasticsearch.output.index', output_index.index)
+ o << java_opt('elasticsearch.output.mapping', output_index.mapping)
+ o << java_opt('elasticsearch.output.index.field', settings[:es_index_field])
+ o << java_opt('elasticsearch.output.mapping.field', settings[:es_mapping_field])
+ o << java_opt('elasticsearch.output.id.field', settings[:es_id_field])
+ o << java_opt('elasticsearch.output.bulk_size', settings[:es_bulk_size])
end
end.flatten.compact
end
# Returns a temporary path on the HDFS in which to store log
# data while the Hadoop job runs.
#
- # @param [IndexAndType] io
+ # @param [IndexAndMapping] io
# @return [String]
def elasticsearch_hdfs_tmp_dir io
cleaner = %r{[^\w/\.\-\+]+}
- io_part = [io.index, io.type].compact.map { |s| s.gsub(cleaner, '') }.join('/')
+ io_part = [io.index, io.mapping].compact.map { |s| s.gsub(cleaner, '') }.join('/')
File.join(settings[:es_tmp_dir], io_part, Time.now.strftime("%Y-%m-%d-%H-%M-%S"))
end
@@ -1,17 +1,17 @@
module Wukong
module Elasticsearch
- # A convenient class for parsing Elasticsearch index and type URIs
+ # A convenient class for parsing Elasticsearch index and mapping URIs
# like
#
# - es://my_index
- # - es://my_index/my_type
+ # - es://my_index/my_mapping
# - es://first_index,second_index,third_index
- # - es://my_index/first_type,second_type,third_type
- class IndexAndType
+ # - es://my_index/first_mapping,second_mapping,third_mapping
+ class IndexAndMapping
# A regular expression that matches URIs describing an
- # Elasticsearch index and/or type to read/write from/to.
+ # Elasticsearch index and/or mapping to read/write from/to.
#
# @param [Regexp]
ES_SCHEME_REGEXP = %r{^es://}
@@ -21,13 +21,13 @@ class IndexAndType
# @param [String]
attr_reader :index
- # The Elasticsearch type.
+ # The Elasticsearch mapping.
#
# @param [String]
- attr_reader :type
+ attr_reader :mapping
# Does the given +string+ look like a possible Elasticsearch
- # /index/type specification?
+ # /index/mapping specification?
#
# @param [String] string
# @return [true, false]
@@ -36,28 +36,28 @@ def self.matches? string
string =~ ES_SCHEME_REGEXP
end
- # Create a new index and type specification from the given
+ # Create a new index and mapping specification from the given
# +uri..
#
# @param [String] uri
def initialize uri
self.uri = uri
end
- # Set the URI of this index and type specification, parsing it
- # for an index and type.
+ # Set the URI of this index and mapping specification, parsing it
+ # for an index and mapping.
#
# Will raise an error if the given URI is malformed.
#
# @param [String] uri
def uri= uri
- raise Wukong::Error.new("'#{uri}' is not an ElasticSearch es://index/type specification") unless self.class.matches?(uri)
+ raise Wukong::Error.new("'#{uri}' is not an ElasticSearch es://index/mapping specification") unless self.class.matches?(uri)
parts = uri.gsub(ES_SCHEME_REGEXP, '').gsub(/^\/+/,'').gsub(/\/+$/,'').split('/')
- raise Wukong::Error.new("'#{uri}' is not an ElasticSearch es://index/type specification") unless parts.size.between?(1,2)
+ raise Wukong::Error.new("'#{uri}' is not an ElasticSearch es://index/mapping specification") unless parts.size.between?(1,2)
- @index = parts[0]
- @type = parts[1]
+ @index = parts[0]
+ @mapping = parts[1]
end
end
end
View
@@ -0,0 +1,41 @@
+module Wukong
+ module Elasticsearch
+
+ # A class that makes Ruby's Time class serialize the way
+ # Elasticsearch expects.
+ #
+ # Elasticsearch's date parsing engine [expects to
+ # receive](http://www.elasticsearch.org/guide/reference/mapping/date-format.html)
+ # a date formatted according to the Java library
+ # [Joda's](http://joda-time.sourceforge.net/)
+ # [ISODateTimeFormat.dateOptionalTimeParser](http://joda-time.sourceforge.net/api-release/org/joda/time/format/ISODateTimeFormat.html#dateOptionalTimeParser())
+ # class.
+ #
+ # This format looks like this: `2012-11-30T01:15:23`.
+ #
+ # @see http://www.elasticsearch.org/guide/reference/mapping/date-format.html The Elasticsearch guide's Date Format entry
+ # @see http://joda-time.sourceforge.net/api-release/org/joda/time/format/ISODateTimeFormat.html#dateOptionalTimeParser() The Joda class's API documentation
+ class Timestamp < Time
+
+ # Parses the given `string` into a Timestamp instance.
+ #
+ # @param [String] string
+ # @return [Timestamp]
+ def self.receive string
+ return if string.nil? || string.empty?
+ begin
+ t = Time.parse(string)
+ rescue ArgumentError => e
+ return
+ end
+ new(t.year, t.month, t.day, t.hour, t.min, t.sec, t.utc_offset)
+ end
+
+ # Formats the Timestamp according to
+ #
+ def to_wire(options={})
+ utc.strftime("%Y-%m-%dT%H:%M:%S")
+ end
+ end
+ end
+end
@@ -2,10 +2,10 @@
describe Wukong::Elasticsearch::HadoopInvocationOverride do
- let(:no_es) { driver('regexp', 'count', input: '/tmp/input_file', output: '/tmp/output_file') }
- let(:es_reader) { driver('regexp', 'count', input: 'es://the_index/the_type', output: '/tmp/output_file') }
- let(:es_writer) { driver('regexp', 'count', input: '/tmp/input_file', output: 'es:///the_index/the_type') }
- let(:es_complex) { driver('regexp', 'count', input: 'es://the_index/the_type', output: 'es:///the_index/the_type', es_query: '{"hi": "there"}', es_request_size: 1000, es_index_field: 'ID') }
+ let(:no_es) { driver('regexp', 'count', input: '/tmp/input_file', output: '/tmp/output_file') }
+ let(:es_reader) { driver('regexp', 'count', input: 'es://the_index/the_map', output: '/tmp/output_file') }
+ let(:es_writer) { driver('regexp', 'count', input: '/tmp/input_file', output: 'es:///the_index/the_map') }
+ let(:es_complex) { driver('regexp', 'count', input: 'es://the_index/the_map', output: 'es:///the_index/the_map', es_query: '{"hi": "there"}', es_request_size: 1000, es_index_field: 'ID') }
context "not interacting with Elasticsearch" do
subject { no_es }
@@ -25,11 +25,11 @@
subject { es_reader }
# input
- its(:input_paths) { should match(%r{/user.*wukong.*the_index.*the_type}) }
+ its(:input_paths) { should match(%r{/user.*wukong.*the_index.*the_map}) }
its(:hadoop_commandline) { should match(/-inputformat.*elasticsearch/i) }
- its(:hadoop_commandline) { should match(%r{-input.*/user.*wukong.*the_index.*the_type}i) }
+ its(:hadoop_commandline) { should match(%r{-input.*/user.*wukong.*the_index.*the_map}i) }
its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.input\.index.*the_index/i) }
- its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.input\.type.*the_type/i) }
+ its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.input\.map.*the_map/i) }
# output
its(:output_path) { should == '/tmp/output_file' }
@@ -48,29 +48,29 @@
its(:hadoop_commandline) { should_not match(/-D\s+elasticsearch\.input/i) }
# output
- its(:output_path) { should match(%r{/user.*wukong.*the_index.*the_type}) }
+ its(:output_path) { should match(%r{/user.*wukong.*the_index.*the_map}) }
its(:hadoop_commandline) { should match(/-outputformat.*elasticsearch/i) }
- its(:hadoop_commandline) { should match(%r{-output.*/user.*wukong.*the_index.*the_type}i) }
+ its(:hadoop_commandline) { should match(%r{-output.*/user.*wukong.*the_index.*the_map}i) }
its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.output\.index.*the_index/i) }
- its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.output\.type.*the_type/i) }
+ its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.output\.map.*the_map/i) }
end
context "reading and writing with many options" do
subject { es_complex }
# input
- its(:input_paths) { should match(%r{/user.*wukong.*the_index.*the_type}) }
+ its(:input_paths) { should match(%r{/user.*wukong.*the_index.*the_map}) }
its(:hadoop_commandline) { should match(/-inputformat.*elasticsearch/i) }
- its(:hadoop_commandline) { should match(%r{-input.*/user.*wukong.*the_index.*the_type}i) }
+ its(:hadoop_commandline) { should match(%r{-input.*/user.*wukong.*the_index.*the_map}i) }
its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.input\.index.*the_index/i) }
- its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.input\.type.*the_type/i) }
+ its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.input\.map.*the_map/i) }
# output
- its(:output_path) { should match(%r{/user.*wukong.*the_index.*the_type}) }
+ its(:output_path) { should match(%r{/user.*wukong.*the_index.*the_map}) }
its(:hadoop_commandline) { should match(/-outputformat.*elasticsearch/i) }
- its(:hadoop_commandline) { should match(%r{-output.*/user.*wukong.*the_index.*the_type}i) }
+ its(:hadoop_commandline) { should match(%r{-output.*/user.*wukong.*the_index.*the_map}i) }
its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.output\.index.*the_index/i) }
- its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.output\.type.*the_type/i) }
+ its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.output\.map.*the_map/i) }
# options
its(:hadoop_commandline) { should match(/-D\s+elasticsearch\.input\.query.*hi.*there/i) }
@@ -1,24 +1,24 @@
require 'spec_helper'
-describe Wukong::Elasticsearch::IndexAndType do
+describe Wukong::Elasticsearch::IndexAndMapping do
- subject { Wukong::Elasticsearch::IndexAndType }
+ subject { Wukong::Elasticsearch::IndexAndMapping }
let(:filesystem_path) { '/some/path' }
let(:filesystem_paths) { '/some/path,/some/other/path' }
let(:hdfs_path) { 'hdfs://some/hdfs/path' }
let(:hdfs_paths) { 'hdfs://some/hdfs/path,hdfs://some/other/hdfs/path' }
- let(:es_index_and_type) { 'es://index/type' }
- let(:es_indices_and_type) { 'es://index1,index2/type' }
- let(:es_index_and_types) { 'es://index/type1,type2' }
- let(:es_indices_and_types) { 'es://index1,index2/type1,type2' }
+ let(:es_index_and_mapping) { 'es://index/mapping' }
+ let(:es_indices_and_mapping) { 'es://index1,index2/mapping' }
+ let(:es_index_and_mappings) { 'es://index/mapping1,mapping2' }
+ let(:es_indices_and_mappings) { 'es://index1,index2/mapping1,mapping2' }
fails = %w[filesystem_path filesystem_paths hdfs_path hdfs_paths]
- passes = %w[es_index_and_type es_indices_and_type es_index_and_types es_indices_and_types]
+ passes = %w[es_index_and_mapping es_indices_and_mapping es_index_and_mappings es_indices_and_mappings]
- context 'recognizing possible es://index/type specifications' do
+ context 'recognizing possible es://index/mapping specifications' do
fails.each do |name|
it "doesn't recognize a #{name}" do
subject.matches?(self.send(name)).should be_false
@@ -31,40 +31,40 @@
end
end
- context "parsing es://index/type specifications" do
+ context "parsing es://index/mapping specifications" do
fails.each do |name|
it "raises an error on a #{name}" do
- lambda { subject.new(self.send(name)) }.should raise_error(Wukong::Error, /not an elasticsearch.*index\/type/i)
+ lambda { subject.new(self.send(name)) }.should raise_error(Wukong::Error, /not an elasticsearch.*index\/mapping/i)
end
end
it "raises an error on a specification with too many parts" do
- lambda { subject.new('es://index/type/extra') }.should raise_error(Wukong::Error, /not an elasticsearch.*index\/type/i)
+ lambda { subject.new('es://index/mapping/extra') }.should raise_error(Wukong::Error, /not an elasticsearch.*index\/mapping/i)
end
it "raises an error on a specification with too few parts" do
- lambda { subject.new('es://') }.should raise_error(Wukong::Error, /not an elasticsearch.*index\/type/i)
+ lambda { subject.new('es://') }.should raise_error(Wukong::Error, /not an elasticsearch.*index\/mapping/i)
end
- context "on an index and type" do
- subject { Wukong::Elasticsearch::IndexAndType.new(es_index_and_type) }
- its(:index) { should == 'index'}
- its(:type) { should == 'type' }
+ context "on an index and mapping" do
+ subject { Wukong::Elasticsearch::IndexAndMapping.new(es_index_and_mapping) }
+ its(:index) { should == 'index' }
+ its(:mapping) { should == 'mapping' }
end
- context "on indices and a type" do
- subject { Wukong::Elasticsearch::IndexAndType.new(es_indices_and_type) }
- its(:index) { should == 'index1,index2'}
- its(:type) { should == 'type' }
+ context "on indices and a mapping" do
+ subject { Wukong::Elasticsearch::IndexAndMapping.new(es_indices_and_mapping) }
+ its(:index) { should == 'index1,index2' }
+ its(:mapping) { should == 'mapping' }
end
- context "on an index and types" do
- subject { Wukong::Elasticsearch::IndexAndType.new(es_index_and_types) }
- its(:index) { should == 'index' }
- its(:type) { should == 'type1,type2' }
+ context "on an index and mappings" do
+ subject { Wukong::Elasticsearch::IndexAndMapping.new(es_index_and_mappings) }
+ its(:index) { should == 'index' }
+ its(:mapping) { should == 'mapping1,mapping2' }
end
- context "on indices and types" do
- subject { Wukong::Elasticsearch::IndexAndType.new(es_indices_and_types) }
- its(:index) { should == 'index1,index2'}
- its(:type) { should == 'type1,type2' }
+ context "on indices and mappings" do
+ subject { Wukong::Elasticsearch::IndexAndMapping.new(es_indices_and_mappings) }
+ its(:index) { should == 'index1,index2' }
+ its(:mapping) { should == 'mapping1,mapping2' }
end
Oops, something went wrong.

0 comments on commit 16f1e00

Please sign in to comment.