Skip to content
This repository has been archived by the owner on Jun 30, 2018. It is now read-only.

Commit

Permalink
[#449] Refactor & cleanup the new bulk support
Browse files Browse the repository at this point in the history
* Use plain `Tire::Index#bulk` method name for the generic method:

    Tire.index('articles').bulk :index, [ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ]

* Keep the `bulk_create`, `bulk_store`, `bulk_delete` methods

* Replace dynamically defined tests in index unit test with concrete examples
  • Loading branch information
karmi committed Nov 11, 2012
1 parent 8e81c3a commit d2ed980
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 72 deletions.
28 changes: 17 additions & 11 deletions lib/tire/index.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,16 @@ def store(*args)
curl = %Q|curl -X POST "#{url}" -d '#{document}'|
logged([type, id].join('/'), curl)
end

def bulk_api_action(action, documents, options={})

def bulk(action, documents, options={})
# TODO: A more Ruby-like DSL notation should be supported:
#
# Tire.index('myindex').bulk do
# create id: 1, title: 'bar', _routing: 'abc'
# delete id: 1
# # ...
# end
#
payload = documents.map do |document|
type = get_type_from_document(document, :escape => false) # Do not URL-escape the _type
id = get_id_from_document(document)
Expand All @@ -99,9 +107,7 @@ def bulk_api_action(action, documents, options={})

output = []
output << %Q|{"#{action}":{"_index":"#{@name}","_type":"#{type}","_id":"#{id}"}}|
unless action == 'delete' # delete does not require a document, just an _id
output << convert_document_to_json(document)
end
output << convert_document_to_json(document) unless action.to_s == 'delete'
output.join("\n")
end
payload << ""
Expand All @@ -125,21 +131,21 @@ def bulk_api_action(action, documents, options={})

ensure
curl = %Q|curl -X POST "#{url}/_bulk" --data-binary '{... data omitted ...}'|
logged('BULK', curl)
logged('_bulk', curl)
end

end

def bulk_create(documents, options={})
bulk_api_action("create", documents, options)
bulk :create, documents, options
end

def bulk_store(documents, options={})
bulk_api_action("index", documents, options)
bulk :index, documents, options
end

def bulk_delete(documents, options={})
bulk_api_action("delete", documents, options)
bulk :delete, documents, options
end

def import(klass_or_collection, options={})
Expand Down
150 changes: 89 additions & 61 deletions test/unit/index_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,8 @@ class MyDocument;end; document = MyDocument.new
end

context "when performing a bulk api action" do
# Possible api actions are index, create, delete
# Possible Bulk API actions are `index`, `create`, `delete`
#
# The expected JSON looks like this:
#
# {"index":{"_index":"dummy","_type":"document","_id":"1"}}
Expand All @@ -501,102 +502,129 @@ class MyDocument;end; document = MyDocument.new
# {"delete":{"_index":"dummy","_type":"document","_id":"2"}}
#
# See http://www.elasticsearch.org/guide/reference/api/bulk.html

# test json with each type of action
["index","create","delete"].each do |action|
should "serialize Hashes for #{action}" do
Configuration.client.expects(:post).with do |url, json|

url == "#{@index.url}/_bulk" &&
json =~ /"#{action}"/ &&
json =~ /"_index":"dummy"/ &&
json =~ /"_type":"document"/ &&
json =~ /"_id":"1"/ &&
json =~ /"_id":"2"/ &&
if action == 'delete'
# delete should not contain the document contents
true
else
json =~ /"id":"1"/ &&
json =~ /"id":"2"/ &&
json =~ /"title":"One"/ &&
json =~ /"title":"Two"/
end
end.returns(mock_response('{}'), 200)

@index.bulk_api_action action, [ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ]
end

should "serialize ActiveModel instances for #{action}" do
Configuration.client.expects(:post).with do |url, json|
url == "#{ActiveModelArticle.index.url}/_bulk" &&
json =~ /"#{action}"/ &&
json =~ /"_index":"active_model_articles"/ &&
json =~ /"_type":"active_model_article"/ &&
json =~ /"_id":"1"/ &&
json =~ /"_id":"2"/ &&
if action == 'delete'
# delete should not contain the document contents
true
else
json =~ /"title":"One"/ &&
json =~ /"title":"Two"/
end

end.returns(mock_response('{}', 200))

one = ActiveModelArticle.new 'title' => 'One'; one.id = '1'
two = ActiveModelArticle.new 'title' => 'Two'; two.id = '2'

ActiveModelArticle.index.bulk_api_action action, [ one, two ]
end

should "serialize payload for index action" do
Configuration.client.
expects(:post).
with do |url, payload|
assert_equal "#{@index.url}/_bulk", url
assert_match /"index"/, payload
assert_match /"_index":"dummy"/, payload
assert_match /"_type":"document"/, payload
assert_match /"_id":"1"/, payload
assert_match /"_id":"2"/, payload
assert_match /"title":"One"/, payload
assert_match /"title":"Two"/, payload
end.
returns(mock_response('{}'), 200)

@index.bulk :index, [ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ]
end

should "serialize payload for create action" do
Configuration.client.
expects(:post).
with do |url, payload|
assert_equal "#{@index.url}/_bulk", url
assert_match /"create"/, payload
assert_match /"_index":"dummy"/, payload
assert_match /"_type":"document"/, payload
assert_match /"_id":"1"/, payload
assert_match /"_id":"2"/, payload
assert_match /"title":"One"/, payload
assert_match /"title":"Two"/, payload
end.
returns(mock_response('{}'), 200)

@index.bulk :create, [ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ]
end
# use the index action to test common features of the bulk api
context "namespaced models" do

should "serialize payload for delete action" do
Configuration.client.
expects(:post).
with do |url, payload|
assert_equal "#{@index.url}/_bulk", url
assert_match /"delete"/, payload
assert_match /"_index":"dummy"/, payload
assert_match /"_type":"document"/, payload
assert_match /"_id":"1"/, payload
assert_match /"_id":"2"/, payload
assert ! payload.include?('"title"')
end.
returns(mock_response('{}'), 200)

@index.bulk :delete, [ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ]
end

should "serialize ActiveModel instances as payload" do
Configuration.client.
expects(:post).
with do |url, payload|
assert_equal "#{ActiveModelArticle.index.url}/_bulk", url
assert_match /"index"/, payload
assert_match /"_index":"active_model_articles"/, payload
assert_match /"_type":"active_model_article"/, payload
assert_match /"_id":"1"/, payload
assert_match /"_id":"2"/, payload
assert_match /"title":"One"/, payload
assert_match /"title":"Two"/, payload
end.
returns(mock_response('{}'), 200)

one = ActiveModelArticle.new 'title' => 'One'; one.id = '1'
two = ActiveModelArticle.new 'title' => 'Two'; two.id = '2'

ActiveModelArticle.index.bulk :index, [ one, two ]
end

context "with namespaced models" do

should "not URL-escape the document_type" do
Configuration.client.expects(:post).with do |url, json|
# puts url, json
url == "#{Configuration.url}/my_namespace_my_models/_bulk" &&
json =~ %r|"_index":"my_namespace_my_models"| &&
json =~ %r|"_type":"my_namespace/my_model"|
end.returns(mock_response('{}', 200))

module MyNamespace
class MyModel
def document_type; "my_namespace/my_model"; end
def to_indexed_json; "{}"; end
end
end
Tire.index('my_namespace_my_models').bulk_api_action "index", [ MyNamespace::MyModel.new ]

Tire.index('my_namespace_my_models').bulk :index, [ MyNamespace::MyModel.new ]
end
end

should "try again when an exception occurs" do
Configuration.client.expects(:post).returns(mock_response('Server error', 503)).at_least(2)

assert !@index.bulk_api_action("index", [ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ])
assert !@index.bulk(:index, [ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ])
end

should "try again and the raise when an exception occurs" do
Configuration.client.expects(:post).returns(mock_response('Server error', 503)).at_least(2)

assert_raise(RuntimeError) do
@index.bulk_api_action("index", [ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ], {:raise => true})
@index.bulk :index,
[ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ],
:raise => true
end
end

should "try again when a connection error occurs" do
Configuration.client.expects(:post).raises(Errno::ECONNREFUSED, "Connection refused - connect(2)").at_least(2)

assert !@index.bulk_api_action("index", [ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ])
assert !@index.bulk(:index, [ {:id => '1', :title => 'One'} ])
end

should "retry on SIGINT type of exceptions" do
Configuration.client.expects(:post).raises(Interrupt, "abort then interrupt!")

assert_raise Interrupt do
@index.bulk_api_action("index", [ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ])
@index.bulk :index, [ {:id => '1', :title => 'One'} ]
end
end

Expand All @@ -608,7 +636,7 @@ def to_indexed_json; "{}"; end
STDERR.expects(:puts).once

documents = [ { :title => 'Bogus' }, { :title => 'Real', :id => 1 } ]
ActiveModelArticle.index.bulk_api_action("index", documents)
ActiveModelArticle.index.bulk :index, documents
end

should "log the response code" do
Expand All @@ -619,7 +647,7 @@ def to_indexed_json; "{}"; end
status == 200
end

@index.bulk_api_action("index", [ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ])
@index.bulk :index, [ {:id => '1', :title => 'One'} ]
end

end
Expand Down

0 comments on commit d2ed980

Please sign in to comment.