Skip to content
This repository has been archived by the owner on Jan 15, 2024. It is now read-only.

Commit

Permalink
Merge pull request #226 from mongoid/connection_pool_queue
Browse files Browse the repository at this point in the history
Make the connection pool use a Queue instead of a stack
  • Loading branch information
arthurnn committed Nov 3, 2013
2 parents 68a456b + 95f6ac9 commit 664de79
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 35 deletions.
6 changes: 3 additions & 3 deletions lib/moped/connection/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class Pool
POOL_SIZE = 5

# The default timeout for getting connections from the queue.
TIMEOUT = 0.25
TIMEOUT = 0.5

# @!attribute host
# @return [ String ] The host the pool is for.
Expand Down Expand Up @@ -81,7 +81,7 @@ def initialize(host, port, options = {})
@mutex = Mutex.new
@resource = ConditionVariable.new
@pinned = {}
@unpinned = Queue.new(max_size) do
@unpinned = Queue.new(max_size, timeout) do
Connection.new(host, port, options[:timeout] || Connection::TIMEOUT, options)
end
reaper.start
Expand Down Expand Up @@ -179,7 +179,7 @@ def lease(connection)

def next_connection
reap if saturated?
unpinned.pop(timeout)
unpinned.shift
end

def saturated?
Expand Down
41 changes: 21 additions & 20 deletions lib/moped/connection/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ class Queue
# @param [ Integer ] size The number of connections in the queue.
#
# @since 2.0.0
def initialize(size)
def initialize(size, timeout)
@timeout = timeout
@queue = Array.new(size) { yield }
@mutex = Mutex.new
@resource = ConditionVariable.new
@monitor = Monitor.new
@cond = @monitor.new_cond
end

# Push a connection on the queue.
Expand All @@ -30,9 +31,9 @@ def initialize(size)
#
# @since 2.0.0
def push(connection)
mutex.synchronize do
queue.push(connection)
resource.broadcast
synchronize do
@queue.push(connection)
@cond.signal
end
end

Expand All @@ -46,9 +47,16 @@ def push(connection)
# @return [ Moped::Connection ] The next connection.
#
# @since 2.0.0
def pop(timeout = 0.5)
mutex.synchronize do
wait_for_next(Time.now + timeout)
def shift
deadline = Time.now + @timeout

synchronize do
loop do
return @queue.shift unless @queue.empty?
wait = deadline - Time.now
raise Timeout::Error, "Waited for #{@timeout} seconds for connection but none was pushed." if wait <= 0
@cond.wait(wait)
end
end
end

Expand All @@ -61,7 +69,7 @@ def pop(timeout = 0.5)
#
# @since 2.0.0
def empty?
queue.empty?
@queue.empty?
end

# Get the current size of the queue.
Expand All @@ -73,20 +81,13 @@ def empty?
#
# @since 2.0.0
def size
queue.size
@queue.size
end

private

attr_reader :queue, :mutex, :resource

def wait_for_next(deadline)
loop do
return queue.pop unless queue.empty?
wait = deadline - Time.now
raise Timeout::Error, "Waited for item but none was pushed." if wait <= 0
resource.wait(mutex, wait)
end
def synchronize
@monitor.synchronize { yield }
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion spec/moped/connection/pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@
end

it "returns the default" do
expect(pool.timeout).to eq(0.25)
expect(pool.timeout).to eq(Moped::Connection::Pool::TIMEOUT)
end
end
end
Expand Down
22 changes: 11 additions & 11 deletions spec/moped/connection/queue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
describe "#empty?" do

let(:queue) do
described_class.new(1) do
described_class.new(1, 0.5) do
Moped::Connection.new("127.0.0.1", 27017, 5)
end
end
Expand All @@ -20,7 +20,7 @@
context "when the queue has no connections" do

before do
queue.pop
queue.shift
end

it "returns true" do
Expand All @@ -32,7 +32,7 @@
describe "#initialize" do

let(:queue) do
described_class.new(2) do
described_class.new(2, 0.5) do
Moped::Connection.new("127.0.0.1", 27017, 5)
end
end
Expand All @@ -42,18 +42,18 @@
end
end

describe "#pop" do
describe "#shift" do

let(:queue) do
described_class.new(1) do
described_class.new(1, 0.5) do
Moped::Connection.new("127.0.0.1", 27017, 5)
end
end

context "when a connection is available" do

let(:popped) do
queue.pop
queue.shift
end

it "returns the connection" do
Expand All @@ -64,7 +64,7 @@
context "when a connection is not available" do

before do
queue.pop
queue.shift
end

context "when a connection is pushed in the timeout period" do
Expand All @@ -81,14 +81,14 @@
end

it "returns the connection" do
expect(queue.pop(2)).to equal(connection)
expect(queue.shift).to equal(connection)
end
end

context "when no connection is pushed in the timeout period" do

it "raises an error" do
expect { queue.pop(1) }.to raise_error(Timeout::Error)
expect { queue.shift }.to raise_error(Timeout::Error)
end
end
end
Expand All @@ -101,7 +101,7 @@
end

let(:queue) do
described_class.new(1) do
described_class.new(1, 0.5) do
Moped::Connection.new("127.0.0.1", 27017, 5)
end
end
Expand All @@ -118,7 +118,7 @@
describe "#size" do

let(:queue) do
described_class.new(1) do
described_class.new(1, 0.5) do
Moped::Connection.new("127.0.0.1", 27017, 5)
end
end
Expand Down

0 comments on commit 664de79

Please sign in to comment.