Skip to content

Commit

Permalink
RUBY-1727 Unscope connection pools from clusters (#1257)
Browse files Browse the repository at this point in the history
* RUBY-1727 Unscope connection pools from clusters

* Move connection pool finalizer to connection pool
  • Loading branch information
p-mongo committed Feb 21, 2019
1 parent 92b9f54 commit 3f84a4a
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 78 deletions.
22 changes: 6 additions & 16 deletions lib/mongo/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ def initialize(seeds, monitoring, options = Options::Redacted.new)
@app_metadata = Server::AppMetadata.new(@options)
@update_lock = Mutex.new
@sdam_flow_lock = Mutex.new
@pool_lock = Mutex.new
@cluster_time = nil
@cluster_time_lock = Mutex.new
@topology = Topology.initial(self, monitoring, options)
Expand Down Expand Up @@ -153,7 +152,7 @@ def initialize(seeds, monitoring, options = Options::Redacted.new)
@periodic_executor = PeriodicExecutor.new(@cursor_reaper, @socket_reaper)
@periodic_executor.run!

ObjectSpace.define_finalizer(self, self.class.finalize(pools, @periodic_executor, @session_pool))
ObjectSpace.define_finalizer(self, self.class.finalize({}, @periodic_executor, @session_pool))

@connecting = false
@connected = true
Expand Down Expand Up @@ -343,13 +342,12 @@ def summary
# @api private
attr_reader :server_selection_semaphore

# Finalize the cluster for garbage collection. Disconnects all the scoped
# connection pools.
# Finalize the cluster for garbage collection.
#
# @example Finalize the cluster.
# Cluster.finalize(pools)
#
# @param [ Hash<Address, Server::ConnectionPool> ] pools The connection pools.
# @param [ Hash<Address, Server::ConnectionPool> ] pools Ignored.
# @param [ PeriodicExecutor ] periodic_executor The periodic executor.
# @param [ SessionPool ] session_pool The session pool.
#
Expand All @@ -360,9 +358,6 @@ def self.finalize(pools, periodic_executor, session_pool)
proc do
session_pool.end_sessions
periodic_executor.stop!
pools.values.each do |pool|
pool.disconnect!
end
end
end

Expand Down Expand Up @@ -523,7 +518,7 @@ def next_primary(ping = true)
@primary_selector.select_server(self)
end

# Get the scoped connection pool for the server.
# Get the connection pool for the server.
#
# @example Get the connection pool.
# cluster.pool(server)
Expand All @@ -533,10 +528,9 @@ def next_primary(ping = true)
# @return [ Server::ConnectionPool ] The connection pool.
#
# @since 2.2.0
# @deprecated
def pool(server)
@pool_lock.synchronize do
pools[server.address] ||= Server::ConnectionPool.get(server)
end
server.pool
end

# Update the max cluster time seen in a response.
Expand Down Expand Up @@ -675,10 +669,6 @@ def sessions_supported?
false
end
end

def pools
@pools ||= {}
end
end
end

Expand Down
9 changes: 8 additions & 1 deletion lib/mongo/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def initialize(address, cluster, monitoring, event_listeners, options = {})
start_monitoring
end
@connected = true
@pool_lock = Mutex.new
end

# @return [ String ] The configured address for the server.
Expand Down Expand Up @@ -280,7 +281,13 @@ def summary
#
# @since 2.0.0
def pool
@pool ||= cluster.pool(self)
@pool_lock.synchronize do
@pool ||= begin
ConnectionPool.new(options) do |generation|
Connection.new(self, options.merge(generation: generation))
end
end
end
end

# Determine if the provided tags are a subset of the server's tags.
Expand Down
30 changes: 7 additions & 23 deletions lib/mongo/server/connection_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,13 @@ class ConnectionPool
#
# @since 2.0.0
def initialize(options = {}, &block)
@options = options.freeze
@queue = Queue.new(options, &block)
@options = options.dup.freeze
@queue = queue = Queue.new(@options, &block)

finalizer = proc do
queue.disconnect!
end
ObjectSpace.define_finalizer(self, finalizer)
end

# @return [ Hash ] options The pool options.
Expand Down Expand Up @@ -120,27 +125,6 @@ def with_connection
protected

attr_reader :queue

private

class << self

# Creates a new connection pool for the provided server.
#
# @example Create a new connection pool.
# Mongo::Server::ConnectionPool.get(server)
#
# @param [ Mongo::Server ] server The server.
#
# @return [ Mongo::Server::ConnectionPool ] The connection pool.
#
# @since 2.0.0
def get(server)
ConnectionPool.new(server.options) do |generation|
Connection.new(server, server.options.merge(generation: generation))
end
end
end
end
end
end
40 changes: 10 additions & 30 deletions spec/mongo/server/connection_pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@
end

let!(:pool) do
described_class.get(server)
server.pool
end

after do
expect(cluster).to receive(:pool).with(server).and_return(pool)
expect(server).to receive(:pool).and_return(pool)
server.disconnect!
end

Expand Down Expand Up @@ -70,7 +70,7 @@
end

let!(:pool) do
described_class.get(server)
server.pool
end

context 'when no connection is checked out on the same thread' do
Expand Down Expand Up @@ -129,48 +129,28 @@
end

let!(:pool) do
described_class.get(server)
server.pool
end

it 'disconnects the queue' do
expect(cluster).to receive(:pool).with(server).and_return(pool)
expect(server).to receive(:pool).and_return(pool)
expect(pool.send(:queue)).to receive(:disconnect!).once.and_call_original
server.disconnect!
end
end

describe '.get' do

let(:server) do
Mongo::Server.new(address, cluster, monitoring, listeners, options)
end

let!(:pool) do
described_class.get(server)
end

after do
expect(cluster).to receive(:pool).with(server).and_return(pool)
server.disconnect!
end

it 'returns the pool for the server' do
expect(pool).to_not be_nil
end
end

describe '#inspect' do

let(:server) do
Mongo::Server.new(address, cluster, monitoring, listeners, options)
end

let!(:pool) do
described_class.get(server)
server.pool
end

after do
expect(cluster).to receive(:pool).with(server).and_return(pool)
expect(server).to receive(:pool).and_return(pool)
server.disconnect!
end

Expand All @@ -190,7 +170,7 @@
end

let!(:pool) do
described_class.get(server)
server.pool
end

context 'when a connection cannot be checked out' do
Expand All @@ -217,7 +197,7 @@
end

let!(:pool) do
described_class.get(server)
server.pool
end

let(:options) do
Expand Down Expand Up @@ -245,7 +225,7 @@
end

let!(:pool) do
described_class.get(server)
server.pool
end

let(:queue) do
Expand Down
4 changes: 2 additions & 2 deletions spec/mongo/server/connection_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -249,15 +249,15 @@ class ConnectionSpecTestException < Exception; end

shared_examples_for 'does not disconnect connection pool' do
it 'does not disconnect non-monitoring sockets' do
allow(cluster).to receive(:pool).with(server).and_return(pool)
allow(server).to receive(:pool).and_return(pool)
expect(pool).not_to receive(:disconnect!)
error
end
end

shared_examples_for 'disconnects connection pool' do
it 'disconnects non-monitoring sockets' do
expect(cluster).to receive(:pool).with(server).and_return(pool)
expect(server).to receive(:pool).and_return(pool)
expect(pool).to receive(:disconnect!).and_return(true)
error
end
Expand Down
12 changes: 6 additions & 6 deletions spec/mongo/server_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
end

let(:pool) do
Mongo::Server::ConnectionPool.get(server)
server.pool
end

describe '#==' do
Expand All @@ -35,7 +35,7 @@
end

after do
expect(cluster).to receive(:pool).with(server).and_return(pool)
expect(server).to receive(:pool).and_return(pool)
server.disconnect!
end

Expand Down Expand Up @@ -88,7 +88,7 @@

it 'stops the monitor instance' do
expect(server.instance_variable_get(:@monitor)).to receive(:stop!).and_return(true)
expect(cluster).to receive(:pool).with(server).and_return(pool)
expect(server).to receive(:pool).and_return(pool)
server.disconnect!
end
end
Expand All @@ -106,7 +106,7 @@
end

after do
expect(cluster).to receive(:pool).with(server).and_return(pool)
expect(server).to receive(:pool).and_return(pool)
server.disconnect!
end

Expand Down Expand Up @@ -150,7 +150,7 @@
end

after do
expect(cluster).to receive(:pool).with(server).and_return(pool)
expect(server).to receive(:pool).and_return(pool)
server.disconnect!
end

Expand All @@ -170,7 +170,7 @@
end

after do
expect(cluster).to receive(:pool).with(server).and_return(pool)
expect(server).to receive(:pool).and_return(pool)
server.disconnect!
end

Expand Down

0 comments on commit 3f84a4a

Please sign in to comment.