diff --git a/lib/moped/connection/pool.rb b/lib/moped/connection/pool.rb index 07d01a6..1c587fe 100644 --- a/lib/moped/connection/pool.rb +++ b/lib/moped/connection/pool.rb @@ -11,7 +11,7 @@ class Pool POOL_SIZE = 5 # The default timeout for getting connections from the queue. - TIMEOUT = 0.25 + TIMEOUT = 0.5 # @!attribute host # @return [ String ] The host the pool is for. @@ -81,7 +81,7 @@ def initialize(host, port, options = {}) @mutex = Mutex.new @resource = ConditionVariable.new @pinned = {} - @unpinned = Queue.new(max_size) do + @unpinned = Queue.new(max_size, timeout) do Connection.new(host, port, options[:timeout] || Connection::TIMEOUT, options) end reaper.start @@ -179,7 +179,7 @@ def lease(connection) def next_connection reap if saturated? - unpinned.pop(timeout) + unpinned.shift end def saturated? diff --git a/lib/moped/connection/queue.rb b/lib/moped/connection/queue.rb index fe3a336..6ee67ec 100644 --- a/lib/moped/connection/queue.rb +++ b/lib/moped/connection/queue.rb @@ -15,10 +15,11 @@ class Queue # @param [ Integer ] size The number of connections in the queue. # # @since 2.0.0 - def initialize(size) + def initialize(size, timeout) + @timeout = timeout @queue = Array.new(size) { yield } - @mutex = Mutex.new - @resource = ConditionVariable.new + @monitor = Monitor.new + @cond = @monitor.new_cond end # Push a connection on the queue. @@ -30,9 +31,9 @@ def initialize(size) # # @since 2.0.0 def push(connection) - mutex.synchronize do - queue.push(connection) - resource.broadcast + synchronize do + @queue.push(connection) + @cond.signal end end @@ -46,9 +47,16 @@ def push(connection) # @return [ Moped::Connection ] The next connection. # # @since 2.0.0 - def pop(timeout = 0.5) - mutex.synchronize do - wait_for_next(Time.now + timeout) + def shift + deadline = Time.now + @timeout + + synchronize do + loop do + return @queue.shift unless @queue.empty? + wait = deadline - Time.now + raise Timeout::Error, "Waited for #{@timeout} seconds for connection but none was pushed." if wait <= 0 + @cond.wait(wait) + end end end @@ -61,7 +69,7 @@ def pop(timeout = 0.5) # # @since 2.0.0 def empty? - queue.empty? + @queue.empty? end # Get the current size of the queue. @@ -73,20 +81,13 @@ def empty? # # @since 2.0.0 def size - queue.size + @queue.size end private - attr_reader :queue, :mutex, :resource - - def wait_for_next(deadline) - loop do - return queue.pop unless queue.empty? - wait = deadline - Time.now - raise Timeout::Error, "Waited for item but none was pushed." if wait <= 0 - resource.wait(mutex, wait) - end + def synchronize + @monitor.synchronize { yield } end end end diff --git a/spec/moped/connection/pool_spec.rb b/spec/moped/connection/pool_spec.rb index 9e07185..1385a7a 100644 --- a/spec/moped/connection/pool_spec.rb +++ b/spec/moped/connection/pool_spec.rb @@ -262,7 +262,7 @@ end it "returns the default" do - expect(pool.timeout).to eq(0.25) + expect(pool.timeout).to eq(Moped::Connection::Pool::TIMEOUT) end end end diff --git a/spec/moped/connection/queue_spec.rb b/spec/moped/connection/queue_spec.rb index b055326..7fb8e5b 100644 --- a/spec/moped/connection/queue_spec.rb +++ b/spec/moped/connection/queue_spec.rb @@ -5,7 +5,7 @@ describe "#empty?" do let(:queue) do - described_class.new(1) do + described_class.new(1, 0.5) do Moped::Connection.new("127.0.0.1", 27017, 5) end end @@ -20,7 +20,7 @@ context "when the queue has no connections" do before do - queue.pop + queue.shift end it "returns true" do @@ -32,7 +32,7 @@ describe "#initialize" do let(:queue) do - described_class.new(2) do + described_class.new(2, 0.5) do Moped::Connection.new("127.0.0.1", 27017, 5) end end @@ -42,10 +42,10 @@ end end - describe "#pop" do + describe "#shift" do let(:queue) do - described_class.new(1) do + described_class.new(1, 0.5) do Moped::Connection.new("127.0.0.1", 27017, 5) end end @@ -53,7 +53,7 @@ context "when a connection is available" do let(:popped) do - queue.pop + queue.shift end it "returns the connection" do @@ -64,7 +64,7 @@ context "when a connection is not available" do before do - queue.pop + queue.shift end context "when a connection is pushed in the timeout period" do @@ -81,14 +81,14 @@ end it "returns the connection" do - expect(queue.pop(2)).to equal(connection) + expect(queue.shift).to equal(connection) end end context "when no connection is pushed in the timeout period" do it "raises an error" do - expect { queue.pop(1) }.to raise_error(Timeout::Error) + expect { queue.shift }.to raise_error(Timeout::Error) end end end @@ -101,7 +101,7 @@ end let(:queue) do - described_class.new(1) do + described_class.new(1, 0.5) do Moped::Connection.new("127.0.0.1", 27017, 5) end end @@ -118,7 +118,7 @@ describe "#size" do let(:queue) do - described_class.new(1) do + described_class.new(1, 0.5) do Moped::Connection.new("127.0.0.1", 27017, 5) end end