Permalink
Browse files

[#449] Extract the bulk behaviour into `bulk_api_action`, allowing ot…

…her actions (create/delete)

* Extracts the bulk logic into a neew method, Tire::Index#bulk_api_action
* Adds wrapper methods: bulk_store, bulk_create, bulk_delete
* Reformats and changes the test suite

See <http://www.elasticsearch.org/guide/reference/api/bulk.html>

Closes #449
  • Loading branch information...
1 parent 4e85109 commit 8e81c3af13cd9dbfe6d2766e7c5f6572ddd43e93 @rhec rhec committed with Aug 6, 2012
Showing with 82 additions and 48 deletions.
  1. +19 −4 lib/tire/index.rb
  2. +63 −44 test/unit/index_test.rb
View
@@ -89,17 +89,19 @@ def store(*args)
curl = %Q|curl -X POST "#{url}" -d '#{document}'|
logged([type, id].join('/'), curl)
end
-
- def bulk_store(documents, options={})
+
+ def bulk_api_action(action, documents, options={})
payload = documents.map do |document|
type = get_type_from_document(document, :escape => false) # Do not URL-escape the _type
id = get_id_from_document(document)
STDERR.puts "[ERROR] Document #{document.inspect} does not have ID" unless id
output = []
- output << %Q|{"index":{"_index":"#{@name}","_type":"#{type}","_id":"#{id}"}}|
- output << convert_document_to_json(document)
+ output << %Q|{"#{action}":{"_index":"#{@name}","_type":"#{type}","_id":"#{id}"}}|
+ unless action == 'delete' # delete does not require a document, just an _id
+ output << convert_document_to_json(document)
+ end
output.join("\n")
end
payload << ""
@@ -125,6 +127,19 @@ def bulk_store(documents, options={})
curl = %Q|curl -X POST "#{url}/_bulk" --data-binary '{... data omitted ...}'|
logged('BULK', curl)
end
+
+ end
+
+ def bulk_create(documents, options={})
+ bulk_api_action("create", documents, options)
+ end
+
+ def bulk_store(documents, options={})
+ bulk_api_action("index", documents, options)
+ end
+
+ def bulk_delete(documents, options={})
+ bulk_api_action("delete", documents, options)
end
def import(klass_or_collection, options={})
View
@@ -490,49 +490,69 @@ class MyDocument;end; document = MyDocument.new
end
- context "when storing in bulk" do
+ context "when performing a bulk api action" do
+ # Possible api actions are index, create, delete
# The expected JSON looks like this:
#
# {"index":{"_index":"dummy","_type":"document","_id":"1"}}
# {"id":"1","title":"One"}
- # {"index":{"_index":"dummy","_type":"document","_id":"2"}}
+ # {"create":{"_index":"dummy","_type":"document","_id":"2"}}
# {"id":"2","title":"Two"}
+ # {"delete":{"_index":"dummy","_type":"document","_id":"2"}}
#
# See http://www.elasticsearch.org/guide/reference/api/bulk.html
+
+ # test json with each type of action
+ ["index","create","delete"].each do |action|
+ should "serialize Hashes for #{action}" do
+ Configuration.client.expects(:post).with do |url, json|
+
+ url == "#{@index.url}/_bulk" &&
+ json =~ /"#{action}"/ &&
+ json =~ /"_index":"dummy"/ &&
+ json =~ /"_type":"document"/ &&
+ json =~ /"_id":"1"/ &&
+ json =~ /"_id":"2"/ &&
+ if action == 'delete'
+ # delete should not contain the document contents
+ true
+ else
+ json =~ /"id":"1"/ &&
+ json =~ /"id":"2"/ &&
+ json =~ /"title":"One"/ &&
+ json =~ /"title":"Two"/
+ end
+ end.returns(mock_response('{}'), 200)
+
+ @index.bulk_api_action action, [ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ]
+ end
- should "serialize Hashes" do
- Configuration.client.expects(:post).with do |url, json|
- url == "#{@index.url}/_bulk" &&
- json =~ /"_index":"dummy"/ &&
- json =~ /"_type":"document"/ &&
- json =~ /"_id":"1"/ &&
- json =~ /"_id":"2"/ &&
- json =~ /"id":"1"/ &&
- json =~ /"id":"2"/ &&
- json =~ /"title":"One"/ &&
- json =~ /"title":"Two"/
- end.returns(mock_response('{}'), 200)
-
- @index.bulk_store [ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ]
- end
-
- should "serialize ActiveModel instances" do
- Configuration.client.expects(:post).with do |url, json|
- url == "#{ActiveModelArticle.index.url}/_bulk" &&
- json =~ /"_index":"active_model_articles"/ &&
- json =~ /"_type":"active_model_article"/ &&
- json =~ /"_id":"1"/ &&
- json =~ /"_id":"2"/ &&
- json =~ /"title":"One"/ &&
- json =~ /"title":"Two"/
- end.returns(mock_response('{}', 200))
-
- one = ActiveModelArticle.new 'title' => 'One'; one.id = '1'
- two = ActiveModelArticle.new 'title' => 'Two'; two.id = '2'
-
- ActiveModelArticle.index.bulk_store [ one, two ]
+ should "serialize ActiveModel instances for #{action}" do
+ Configuration.client.expects(:post).with do |url, json|
+ url == "#{ActiveModelArticle.index.url}/_bulk" &&
+ json =~ /"#{action}"/ &&
+ json =~ /"_index":"active_model_articles"/ &&
+ json =~ /"_type":"active_model_article"/ &&
+ json =~ /"_id":"1"/ &&
+ json =~ /"_id":"2"/ &&
+ if action == 'delete'
+ # delete should not contain the document contents
+ true
+ else
+ json =~ /"title":"One"/ &&
+ json =~ /"title":"Two"/
+ end
+
+ end.returns(mock_response('{}', 200))
+
+ one = ActiveModelArticle.new 'title' => 'One'; one.id = '1'
+ two = ActiveModelArticle.new 'title' => 'Two'; two.id = '2'
+
+ ActiveModelArticle.index.bulk_api_action action, [ one, two ]
+ end
+
end
-
+ # use the index action to test common features of the bulk api
context "namespaced models" do
should "not URL-escape the document_type" do
Configuration.client.expects(:post).with do |url, json|
@@ -541,43 +561,42 @@ class MyDocument;end; document = MyDocument.new
json =~ %r|"_index":"my_namespace_my_models"| &&
json =~ %r|"_type":"my_namespace/my_model"|
end.returns(mock_response('{}', 200))
-
+
module MyNamespace
class MyModel
def document_type; "my_namespace/my_model"; end
def to_indexed_json; "{}"; end
end
end
-
- Tire.index('my_namespace_my_models').bulk_store [ MyNamespace::MyModel.new ]
+
+ Tire.index('my_namespace_my_models').bulk_api_action "index", [ MyNamespace::MyModel.new ]
end
end
-
should "try again when an exception occurs" do
Configuration.client.expects(:post).returns(mock_response('Server error', 503)).at_least(2)
- assert !@index.bulk_store([ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ])
+ assert !@index.bulk_api_action("index", [ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ])
end
should "try again and the raise when an exception occurs" do
Configuration.client.expects(:post).returns(mock_response('Server error', 503)).at_least(2)
assert_raise(RuntimeError) do
- @index.bulk_store([ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ], {:raise => true})
+ @index.bulk_api_action("index", [ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ], {:raise => true})
end
end
should "try again when a connection error occurs" do
Configuration.client.expects(:post).raises(Errno::ECONNREFUSED, "Connection refused - connect(2)").at_least(2)
- assert !@index.bulk_store([ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ])
+ assert !@index.bulk_api_action("index", [ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ])
end
should "retry on SIGINT type of exceptions" do
Configuration.client.expects(:post).raises(Interrupt, "abort then interrupt!")
assert_raise Interrupt do
- @index.bulk_store([ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ])
+ @index.bulk_api_action("index", [ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ])
end
end
@@ -589,7 +608,7 @@ def to_indexed_json; "{}"; end
STDERR.expects(:puts).once
documents = [ { :title => 'Bogus' }, { :title => 'Real', :id => 1 } ]
- ActiveModelArticle.index.bulk_store documents
+ ActiveModelArticle.index.bulk_api_action("index", documents)
end
should "log the response code" do
@@ -600,7 +619,7 @@ def to_indexed_json; "{}"; end
status == 200
end
- @index.bulk_store [ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ]
+ @index.bulk_api_action("index", [ {:id => '1', :title => 'One'}, {:id => '2', :title => 'Two'} ])
end
end

0 comments on commit 8e81c3a

Please sign in to comment.