Skip to content
This repository has been archived by the owner on Jan 15, 2024. It is now read-only.

Make the connection pool use a Queue instead of a stack #226

Merged
merged 1 commit into from
Nov 3, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions lib/moped/connection/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class Pool
POOL_SIZE = 5

# The default timeout for getting connections from the queue.
TIMEOUT = 0.25
TIMEOUT = 0.5

# @!attribute host
# @return [ String ] The host the pool is for.
Expand Down Expand Up @@ -81,7 +81,7 @@ def initialize(host, port, options = {})
@mutex = Mutex.new
@resource = ConditionVariable.new
@pinned = {}
@unpinned = Queue.new(max_size) do
@unpinned = Queue.new(max_size, timeout) do
Connection.new(host, port, options[:timeout] || Connection::TIMEOUT, options)
end
reaper.start
Expand Down Expand Up @@ -179,7 +179,7 @@ def lease(connection)

def next_connection
reap if saturated?
unpinned.pop(timeout)
unpinned.shift
end

def saturated?
Expand Down
41 changes: 21 additions & 20 deletions lib/moped/connection/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ class Queue
# @param [ Integer ] size The number of connections in the queue.
#
# @since 2.0.0
def initialize(size)
def initialize(size, timeout)
@timeout = timeout
@queue = Array.new(size) { yield }
@mutex = Mutex.new
@resource = ConditionVariable.new
@monitor = Monitor.new
@cond = @monitor.new_cond
end

# Push a connection on the queue.
Expand All @@ -30,9 +31,9 @@ def initialize(size)
#
# @since 2.0.0
def push(connection)
mutex.synchronize do
queue.push(connection)
resource.broadcast
synchronize do
@queue.push(connection)
@cond.signal
end
end

Expand All @@ -46,9 +47,16 @@ def push(connection)
# @return [ Moped::Connection ] The next connection.
#
# @since 2.0.0
def pop(timeout = 0.5)
mutex.synchronize do
wait_for_next(Time.now + timeout)
def shift
deadline = Time.now + @timeout

synchronize do
loop do
return @queue.shift unless @queue.empty?
wait = deadline - Time.now
raise Timeout::Error, "Waited for #{@timeout} seconds for connection but none was pushed." if wait <= 0
@cond.wait(wait)
end
end
end

Expand All @@ -61,7 +69,7 @@ def pop(timeout = 0.5)
#
# @since 2.0.0
def empty?
queue.empty?
@queue.empty?
end

# Get the current size of the queue.
Expand All @@ -73,20 +81,13 @@ def empty?
#
# @since 2.0.0
def size
queue.size
@queue.size
end

private

attr_reader :queue, :mutex, :resource

def wait_for_next(deadline)
loop do
return queue.pop unless queue.empty?
wait = deadline - Time.now
raise Timeout::Error, "Waited for item but none was pushed." if wait <= 0
resource.wait(mutex, wait)
end
def synchronize
@monitor.synchronize { yield }
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion spec/moped/connection/pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@
end

it "returns the default" do
expect(pool.timeout).to eq(0.25)
expect(pool.timeout).to eq(Moped::Connection::Pool::TIMEOUT)
end
end
end
Expand Down
22 changes: 11 additions & 11 deletions spec/moped/connection/queue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
describe "#empty?" do

let(:queue) do
described_class.new(1) do
described_class.new(1, 0.5) do
Moped::Connection.new("127.0.0.1", 27017, 5)
end
end
Expand All @@ -20,7 +20,7 @@
context "when the queue has no connections" do

before do
queue.pop
queue.shift
end

it "returns true" do
Expand All @@ -32,7 +32,7 @@
describe "#initialize" do

let(:queue) do
described_class.new(2) do
described_class.new(2, 0.5) do
Moped::Connection.new("127.0.0.1", 27017, 5)
end
end
Expand All @@ -42,18 +42,18 @@
end
end

describe "#pop" do
describe "#shift" do

let(:queue) do
described_class.new(1) do
described_class.new(1, 0.5) do
Moped::Connection.new("127.0.0.1", 27017, 5)
end
end

context "when a connection is available" do

let(:popped) do
queue.pop
queue.shift
end

it "returns the connection" do
Expand All @@ -64,7 +64,7 @@
context "when a connection is not available" do

before do
queue.pop
queue.shift
end

context "when a connection is pushed in the timeout period" do
Expand All @@ -81,14 +81,14 @@
end

it "returns the connection" do
expect(queue.pop(2)).to equal(connection)
expect(queue.shift).to equal(connection)
end
end

context "when no connection is pushed in the timeout period" do

it "raises an error" do
expect { queue.pop(1) }.to raise_error(Timeout::Error)
expect { queue.shift }.to raise_error(Timeout::Error)
end
end
end
Expand All @@ -101,7 +101,7 @@
end

let(:queue) do
described_class.new(1) do
described_class.new(1, 0.5) do
Moped::Connection.new("127.0.0.1", 27017, 5)
end
end
Expand All @@ -118,7 +118,7 @@
describe "#size" do

let(:queue) do
described_class.new(1) do
described_class.new(1, 0.5) do
Moped::Connection.new("127.0.0.1", 27017, 5)
end
end
Expand Down