Permalink
Browse files

[parallel-indexing] update the client to publish to parallel queues

The backend consumer is now based on a "vnode" design where there are
1024 queues distributed among N nodes. The publishers need to be aware
of this and publish items to the various queues based on the database
ids.
  • Loading branch information...
1 parent c738308 commit e0bb0f7fdb0d0441bf50850d299deb3414d5c430 @danielsdeleo danielsdeleo committed Sep 3, 2010
@@ -19,6 +19,8 @@
class Chef
module IndexQueue
class AmqpClient
+ VNODES = 1024
+
include Singleton
def initialize
@@ -72,13 +74,21 @@ def disconnected!
reset!
end
- def send_action(action, data)
+ def queue_for_object(obj_id)
+ vnode_tag = UUIDTools::UUID.parse(obj_id).to_i % VNODES
+ queue = amqp_client.queue("vnode-#{vnode_tag}")
+ retries = 0
begin
- exchange.publish({"action" => action.to_s, "payload" => data}.to_json)
- rescue Bunny::ServerDownError, Bunny::ConnectionError, Errno::ECONNRESET => e
- Chef::Log.error("Disconnected from the AMQP Broker, cannot queue data to the indexer")
+ yield queue
+ rescue Bunny::ServerDownError, Bunny::ConnectionError, Errno::ECONNRESET
disconnected!
- raise e
+ if (retries += 1) < 2
+ Chef::Log.info("Attempting to reconnect to the AMQP broker")
+ retry
+ else
+ Chef::Log.fatal("Could not re-connect to the AMQP broker, giving up")
+ raise
+ end
end
end
@@ -55,20 +55,29 @@ def with_indexer_metadata(indexer_metadata={})
with_metadata["type"] ||= self.index_object_type
with_metadata["id"] ||= self.index_id
with_metadata["database"] ||= Chef::Config[:couchdb_database]
- with_metadata["item"] ||= self
+ with_metadata["item"] ||= self.to_hash
+ with_metadata["enqueued_at"] ||= Time.now.utc.to_i
raise ArgumentError, "Type, Id, or Database missing in index operation: #{with_metadata.inspect}" if (with_metadata["id"].nil? or with_metadata["type"].nil?)
with_metadata
end
def add_to_index(metadata={})
Chef::Log.debug("pushing item to index queue for addition: #{self.with_indexer_metadata(metadata)}")
- AmqpClient.instance.send_action(:add, self.with_indexer_metadata(metadata))
+ object_with_metadata = with_indexer_metadata(metadata)
+ obj_id = object_with_metadata["id"]
+ AmqpClient.instance.queue_for_object(obj_id) do |queue|
+ queue.publish({:action => :add, :payload => self.with_indexer_metadata(metadata)}.to_json)
+ end
end
def delete_from_index(metadata={})
Chef::Log.debug("pushing item to index queue for deletion: #{self.with_indexer_metadata(metadata)}")
- AmqpClient.instance.send_action(:delete, self.with_indexer_metadata(metadata))
+ object_with_metadata = with_indexer_metadata(metadata)
+ obj_id = object_with_metadata["id"]
+ AmqpClient.instance.queue_for_object(obj_id) do |queue|
+ queue.publish({:action => :delete, :payload => self.with_indexer_metadata(metadata)}.to_json)
+ end
end
end
@@ -29,6 +29,24 @@ def couchdb_id=(value)
def index_id=(value)
@index_id = value
end
+
+ def to_hash
+ {"ohai_world" => "I am IndexableTestHarness", "object_id" => object_id}
+ end
+
+ end
+end
+
+class FauxQueue
+
+ attr_reader :published_message
+
+ def initialize
+ @published_message = :epic_fail!
+ end
+
+ def publish(message)
+ @published_message = message
end
end
@@ -57,6 +75,10 @@ def a_uuid
Chef::IndexableTestHarness.reset_index_metadata!
@publisher = Chef::IndexQueue::AmqpClient.instance
@indexable_obj = Chef::IndexableTestHarness.new
+ @item_as_hash = {"ohai_world" => "I am IndexableTestHarness", "object_id" => @indexable_obj.object_id}
+
+ @now = Time.now
+ Time.stub!(:now).and_return(@now)
end
it "downcases the class name for the index_object_type when it's not explicitly set" do
@@ -70,12 +92,13 @@ def a_uuid
it "adds 'database', 'type', and 'id' (UUID) keys to the published object" do
with_metadata = @indexable_obj.with_indexer_metadata(:database => "foo", :id=>UUIDTools::UUID.random_create.to_s)
- with_metadata.should have(4).keys
- with_metadata.keys.should include("type", "id", "item", "database")
+ with_metadata.should have(5).keys
+ with_metadata.keys.should include("type", "id", "item", "database", "enqueued_at")
with_metadata["type"].should == "indexable_test_harness"
with_metadata["database"].should == "foo"
- with_metadata["item"].should == @indexable_obj
+ with_metadata["item"].should == @item_as_hash
with_metadata["id"].should match(a_uuid)
+ with_metadata["enqueued_at"].should == @now.utc.to_i
end
it "uses the couchdb_id if available" do
@@ -85,20 +108,29 @@ def a_uuid
metadata_id.should == expected_uuid
end
- it "sends ``add'' actions" do
- @publisher.should_receive(:send_action).with(:add, {"item" => @indexable_obj,
- "type" => "indexable_test_harness",
- "database" => "couchdb@localhost,etc.",
- "id" => an_instance_of(String)})
- @indexable_obj.add_to_index(:database => "couchdb@localhost,etc.", :id=>UUIDTools::UUID.random_create.to_s)
+ it "adds items to the index" do
+ @queue = FauxQueue.new
+ @publisher.should_receive(:queue_for_object).with("0000000-1111-2222-3333-444444444444").and_yield(@queue)
+ @indexable_obj.add_to_index(:database => "couchdb@localhost,etc.", :id=>"0000000-1111-2222-3333-444444444444")
+ published_message = JSON.parse(@queue.published_message)
+ published_message.should == {"action" => "add", "payload" => {"item" => @item_as_hash,
+ "type" => "indexable_test_harness",
+ "database" => "couchdb@localhost,etc.",
+ "id" => "0000000-1111-2222-3333-444444444444",
+ "enqueued_at" => @now.utc.to_i}}
end
- it "sends ``delete'' actions" do
- @publisher.should_receive(:send_action).with(:delete, { "item" => @indexable_obj,
- "type" => "indexable_test_harness",
- "database" => "couchdb2@localhost",
- "id" => an_instance_of(String)})
- @indexable_obj.delete_from_index(:database => "couchdb2@localhost", :id=>UUIDTools::UUID.random_create.to_s)
+ it "removes items from the index" do
+ @queue = FauxQueue.new
+ @publisher.should_receive(:queue_for_object).with("0000000-1111-2222-3333-444444444444").and_yield(@queue)
+
+ @indexable_obj.delete_from_index(:database => "couchdb2@localhost", :id=>"0000000-1111-2222-3333-444444444444")
+ published_message = JSON.parse(@queue.published_message)
+ published_message.should == {"action" => "delete", "payload" => { "item" => @item_as_hash,
+ "type" => "indexable_test_harness",
+ "database" => "couchdb2@localhost",
+ "id" => "0000000-1111-2222-3333-444444444444",
+ "enqueued_at" => @now.utc.to_i}}
end
end
@@ -195,31 +227,25 @@ def @amqp_client.connected?; false; end # stubbing predicate methods not working
describe "publishing" do
before do
+ @queue = FauxQueue.new
@amqp_client.stub!(:qos)
+ @amqp_client.stub!(:queue).and_return(@queue)
@data = {"some_data" => "in_a_hash"}
end
- it "publishes an action to the exchange" do
- @exchange.should_receive(:publish).with({"action" => "hot_chef_on_queue", "payload" => @data}.to_json)
- @publisher.send_action(:hot_chef_on_queue, @data)
- end
-
it "resets the client upon a Bunny::ServerDownError when publishing" do
- @exchange.should_receive(:publish).and_raise(Bunny::ServerDownError)
- @publisher.should_receive(:disconnected!).twice
- lambda {@publisher.send_action(:hot_chef_on_queue, @data)}.should raise_error(Bunny::ServerDownError)
+ @publisher.should_receive(:disconnected!).at_least(3).times
+ lambda {@publisher.queue_for_object("00000000-1111-2222-3333-444444444444") {|q| raise Bunny::ServerDownError}}.should raise_error(Bunny::ServerDownError)
end
it "resets the client upon a Bunny::ConnectionError when publishing" do
- @exchange.should_receive(:publish).and_raise(Bunny::ConnectionError)
- @publisher.should_receive(:disconnected!).twice
- lambda {@publisher.send_action(:hot_chef_on_queue, @data)}.should raise_error(Bunny::ConnectionError)
+ @publisher.should_receive(:disconnected!).at_least(3).times
+ lambda {@publisher.queue_for_object("00000000-1111-2222-3333-444444444444") {|q| raise Bunny::ConnectionError}}.should raise_error(Bunny::ConnectionError)
end
it "resets the client upon a Errno::ECONNRESET when publishing" do
- @exchange.should_receive(:publish).and_raise(Errno::ECONNRESET)
- @publisher.should_receive(:disconnected!).twice
- lambda {@publisher.send_action(:hot_chef_on_queue, @data)}.should raise_error(Errno::ECONNRESET)
+ @publisher.should_receive(:disconnected!).at_least(3).times
+ lambda {@publisher.queue_for_object("00000000-1111-2222-3333-444444444444") {|q| raise Errno::ECONNRESET}}.should raise_error(Errno::ECONNRESET)
end
end
@@ -258,3 +284,6 @@ def @amqp_client.connected?; false; end # stubbing predicate methods not working
end
end
+
+
+

0 comments on commit e0bb0f7

Please sign in to comment.