Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Initial conversion to connection pool

So far so good, tests still run clean. Next steps: synchronize connection pool access
and modification, and change allow_concurrency to simply switch a real lock for a
null lock.
  • Loading branch information...
commit 6edaa267174dfedaf5b152b9eba25b4eb5e34c99 1 parent 1a81f15
@nicksieger nicksieger authored
View
128 activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
@@ -0,0 +1,128 @@
+module ActiveRecord
+ module ConnectionAdapters
+ class ConnectionPool
+ # Check for activity after at least +verification_timeout+ seconds.
+ # Defaults to 0 (always check.)
+ attr_accessor :verification_timeout
+ attr_reader :active_connections, :spec
+
+ def initialize(spec)
+ @verification_timeout = 0
+
+ # The thread id -> adapter cache.
+ @active_connections = {}
+
+ # The ConnectionSpecification for this pool
+ @spec = spec
+ end
+
+ def active_connection_name #:nodoc:
+ Thread.current.object_id
+ end
+
+ def active_connection
+ active_connections[active_connection_name]
+ end
+
+ # Returns the connection currently associated with the class. This can
+ # also be used to "borrow" the connection to do database work unrelated
+ # to any of the specific Active Records.
+ def connection
+ if conn = active_connections[active_connection_name]
+ conn
+ else
+ # retrieve_connection sets the cache key.
+ conn = retrieve_connection
+ active_connections[active_connection_name] = conn
+ end
+ end
+
+ # Clears the cache which maps classes to connections.
+ def clear_active_connections!
+ clear_entries!(@active_connections, [active_connection_name]) do |name, conn|
+ conn.disconnect!
+ end
+ end
+
+ # Clears the cache which maps classes
+ def clear_reloadable_connections!
+ @active_connections.each do |name, conn|
+ if conn.requires_reloading?
+ conn.disconnect!
+ @active_connections.delete(name)
+ end
+ end
+ end
+
+ # Verify active connections.
+ def verify_active_connections! #:nodoc:
+ remove_stale_cached_threads!(@active_connections) do |name, conn|
+ conn.disconnect!
+ end
+ active_connections.each_value do |connection|
+ connection.verify!(@verification_timeout)
+ end
+ end
+
+ def retrieve_connection #:nodoc:
+ # Name is nil if establish_connection hasn't been called for
+ # some class along the inheritance chain up to AR::Base yet.
+ name = active_connection_name
+ if conn = active_connections[name]
+ # Verify the connection.
+ conn.verify!(@verification_timeout)
+ else
+ self.connection = spec
+ conn = active_connections[name]
+ end
+
+ conn or raise ConnectionNotEstablished
+ end
+
+ # Returns true if a connection that's accessible to this class has already been opened.
+ def connected?
+ active_connections[active_connection_name] ? true : false
+ end
+
+ def disconnect!
+ clear_cache!(@active_connections) do |name, conn|
+ conn.disconnect!
+ end
+ end
+
+ # Set the connection for the class.
+ def connection=(spec) #:nodoc:
+ if spec.kind_of?(ActiveRecord::ConnectionAdapters::AbstractAdapter)
+ active_connections[active_connection_name] = spec
+ elsif spec.kind_of?(ActiveRecord::Base::ConnectionSpecification)
+ self.connection = ActiveRecord::Base.send(spec.adapter_method, spec.config)
+ else
+ raise ConnectionNotEstablished
+ end
+ end
+
+ private
+ def clear_cache!(cache, &block)
+ cache.each(&block) if block_given?
+ cache.clear
+ end
+
+ # Remove stale threads from the cache.
+ def remove_stale_cached_threads!(cache, &block)
+ stale = Set.new(cache.keys)
+
+ Thread.list.each do |thread|
+ stale.delete(thread.object_id) if thread.alive?
+ end
+ clear_entries!(cache, stale, &block)
+ end
+
+ def clear_entries!(cache, keys, &block)
+ keys.each do |key|
+ block.call(key, cache[key])
+ cache.delete(key)
+ end
+ end
+ end
+ end
+end
View
198 activerecord/lib/active_record/connection_adapters/abstract/connection_specification.rb
@@ -14,156 +14,45 @@ def initialize (config, adapter_method)
cattr_accessor :verification_timeout, :instance_writer => false
@@verification_timeout = 0
- # The class -> [adapter_method, config] map
+ # The class -> connection pool map
@@defined_connections = {}
- # The class -> thread id -> adapter cache. (class -> adapter if not allow_concurrency)
- @@active_connections = {}
-
class << self
- # Retrieve the connection cache.
- def thread_safe_active_connections #:nodoc:
- @@active_connections[Thread.current.object_id] ||= {}
- end
-
- def single_threaded_active_connections #:nodoc:
- @@active_connections
- end
-
- # pick up the right active_connection method from @@allow_concurrency
- if @@allow_concurrency
- alias_method :active_connections, :thread_safe_active_connections
- else
- alias_method :active_connections, :single_threaded_active_connections
- end
-
- # set concurrency support flag (not thread safe, like most of the methods in this file)
- def allow_concurrency=(threaded) #:nodoc:
- logger.debug "allow_concurrency=#{threaded}" if logger
- return if @@allow_concurrency == threaded
- clear_all_cached_connections!
- @@allow_concurrency = threaded
- method_prefix = threaded ? "thread_safe" : "single_threaded"
- sing = (class << self; self; end)
- [:active_connections, :scoped_methods].each do |method|
- sing.send(:alias_method, method, "#{method_prefix}_#{method}")
- end
- log_connections if logger
- end
-
- def active_connection_name #:nodoc:
- @active_connection_name ||=
- if active_connections[name] || @@defined_connections[name]
- name
- elsif self == ActiveRecord::Base
- nil
- else
- superclass.active_connection_name
- end
- end
-
- def clear_active_connection_name #:nodoc:
- @active_connection_name = nil
- subclasses.each { |klass| klass.clear_active_connection_name }
+ # for internal use only
+ def active_connections
+ @@defined_connections.inject([]) {|arr,kv| arr << kv.last.active_connection}.compact.uniq
end
# Returns the connection currently associated with the class. This can
# also be used to "borrow" the connection to do database work unrelated
# to any of the specific Active Records.
def connection
- if defined?(@active_connection_name) && (conn = active_connections[@active_connection_name])
- conn
- else
- # retrieve_connection sets the cache key.
- conn = retrieve_connection
- active_connections[@active_connection_name] = conn
- end
+ retrieve_connection
end
# Clears the cache which maps classes to connections.
def clear_active_connections!
- clear_cache!(@@active_connections) do |name, conn|
- conn.disconnect!
+ clear_cache!(@@defined_connections) do |name, pool|
+ pool.disconnect!
end
end
# Clears the cache which maps classes
def clear_reloadable_connections!
- if @@allow_concurrency
- # With concurrent connections @@active_connections is
- # a hash keyed by thread id.
- @@active_connections.each do |thread_id, conns|
- conns.each do |name, conn|
- if conn.requires_reloading?
- conn.disconnect!
- @@active_connections[thread_id].delete(name)
- end
- end
- end
- else
- @@active_connections.each do |name, conn|
- if conn.requires_reloading?
- conn.disconnect!
- @@active_connections.delete(name)
- end
- end
+ clear_cache!(@@defined_connections) do |name, pool|
+ pool.clear_reloadable_connections!
end
end
# Verify active connections.
def verify_active_connections! #:nodoc:
- if @@allow_concurrency
- remove_stale_cached_threads!(@@active_connections) do |name, conn|
- conn.disconnect!
- end
- end
-
- active_connections.each_value do |connection|
- connection.verify!(@@verification_timeout)
- end
+ @@defined_connections.each_value {|pool| pool.verify_active_connections!}
end
private
- def clear_cache!(cache, thread_id = nil, &block)
- if cache
- if @@allow_concurrency
- thread_id ||= Thread.current.object_id
- thread_cache, cache = cache, cache[thread_id]
- return unless cache
- end
-
- cache.each(&block) if block_given?
- cache.clear
- end
- ensure
- if thread_cache && @@allow_concurrency
- thread_cache.delete(thread_id)
- end
- end
-
- # Remove stale threads from the cache.
- def remove_stale_cached_threads!(cache, &block)
- stale = Set.new(cache.keys)
-
- Thread.list.each do |thread|
- stale.delete(thread.object_id) if thread.alive?
- end
-
- stale.each do |thread_id|
- clear_cache!(cache, thread_id, &block)
- end
- end
-
- def clear_all_cached_connections!
- if @@allow_concurrency
- @@active_connections.each_value do |connection_hash_for_thread|
- connection_hash_for_thread.each_value {|conn| conn.disconnect! }
- connection_hash_for_thread.clear
- end
- else
- @@active_connections.each_value {|conn| conn.disconnect! }
- end
- @@active_connections.clear
+ def clear_cache!(cache, &block)
+ cache.each(&block) if block_given?
+ cache.clear
end
end
@@ -208,9 +97,7 @@ def self.establish_connection(spec = nil)
raise AdapterNotSpecified unless defined? RAILS_ENV
establish_connection(RAILS_ENV)
when ConnectionSpecification
- clear_active_connection_name
- @active_connection_name = name
- @@defined_connections[name] = spec
+ @@defined_connections[name] = ConnectionAdapters::ConnectionPool.new(spec)
when Symbol, String
if configuration = configurations[spec.to_s]
establish_connection(configuration)
@@ -248,26 +135,20 @@ def self.establish_connection(spec = nil)
# opened and set as the active connection for the class it was defined
# for (not necessarily the current class).
def self.retrieve_connection #:nodoc:
- # Name is nil if establish_connection hasn't been called for
- # some class along the inheritance chain up to AR::Base yet.
- if name = active_connection_name
- if conn = active_connections[name]
- # Verify the connection.
- conn.verify!(@@verification_timeout)
- elsif spec = @@defined_connections[name]
- # Activate this connection specification.
- klass = name.constantize
- klass.connection = spec
- conn = active_connections[name]
- end
- end
+ pool = retrieve_connection_pool
+ (pool && pool.connection) or raise ConnectionNotEstablished
+ end
- conn or raise ConnectionNotEstablished
+ def self.retrieve_connection_pool
+ pool = @@defined_connections[name]
+ return pool if pool
+ return nil if ActiveRecord::Base == self
+ superclass.retrieve_connection_pool
end
# Returns true if a connection that's accessible to this class has already been opened.
def self.connected?
- active_connections[active_connection_name] ? true : false
+ retrieve_connection_pool.connected?
end
# Remove the connection for this class. This will close the active
@@ -275,35 +156,10 @@ def self.connected?
# can be used as an argument for establish_connection, for easily
# re-establishing the connection.
def self.remove_connection(klass=self)
- spec = @@defined_connections[klass.name]
- konn = active_connections[klass.name]
- @@defined_connections.delete_if { |key, value| value == spec }
- active_connections.delete_if { |key, value| value == konn }
- konn.disconnect! if konn
- spec.config if spec
- end
-
- # Set the connection for the class.
- def self.connection=(spec) #:nodoc:
- if spec.kind_of?(ActiveRecord::ConnectionAdapters::AbstractAdapter)
- active_connections[name] = spec
- elsif spec.kind_of?(ConnectionSpecification)
- config = spec.config.reverse_merge(:allow_concurrency => @@allow_concurrency)
- self.connection = self.send(spec.adapter_method, config)
- elsif spec.nil?
- raise ConnectionNotEstablished
- else
- establish_connection spec
- end
- end
-
- # connection state logging
- def self.log_connections #:nodoc:
- if logger
- logger.info "Defined connections: #{@@defined_connections.inspect}"
- logger.info "Active connections: #{active_connections.inspect}"
- logger.info "Active connection name: #{@active_connection_name}"
- end
+ pool = @@defined_connections[klass.name]
+ @@defined_connections.delete_if { |key, value| value == pool }
+ pool.disconnect! if pool
+ pool.spec.config if pool
end
end
end
View
1  activerecord/lib/active_record/connection_adapters/abstract_adapter.rb 100644 → 100755
@@ -8,6 +8,7 @@
require 'active_record/connection_adapters/abstract/database_statements'
require 'active_record/connection_adapters/abstract/quoting'
require 'active_record/connection_adapters/abstract/connection_specification'
+require 'active_record/connection_adapters/abstract/connection_pool'
require 'active_record/connection_adapters/abstract/query_cache'
module ActiveRecord
View
39 activerecord/test/cases/threaded_connections_test.rb
@@ -8,41 +8,32 @@ class ThreadedConnectionsTest < ActiveRecord::TestCase
fixtures :topics
- def setup
- @connection = ActiveRecord::Base.remove_connection
- @connections = []
- @allow_concurrency = ActiveRecord::Base.allow_concurrency
- end
+ def setup
+ @connection = ActiveRecord::Base.remove_connection
+ @connections = []
+ end
- def teardown
- # clear the connection cache
- ActiveRecord::Base.send(:clear_all_cached_connections!)
- # set allow_concurrency to saved value
- ActiveRecord::Base.allow_concurrency = @allow_concurrency
- # reestablish old connection
- ActiveRecord::Base.establish_connection(@connection)
- end
+ def teardown
+ # clear the connection cache
+ ActiveRecord::Base.clear_active_connections!
+ # reestablish old connection
+ ActiveRecord::Base.establish_connection(@connection)
+ end
- def gather_connections(use_threaded_connections)
- ActiveRecord::Base.allow_concurrency = use_threaded_connections
- ActiveRecord::Base.establish_connection(@connection)
+ def gather_connections
+ ActiveRecord::Base.establish_connection(@connection)
5.times do
Thread.new do
Topic.find :first
- @connections << ActiveRecord::Base.active_connections.values.first
+ @connections << ActiveRecord::Base.active_connections.first
end.join
end
end
def test_threaded_connections
- gather_connections(true)
- assert_equal @connections.uniq.length, 5
- end
-
- def test_unthreaded_connections
- gather_connections(false)
- assert_equal @connections.uniq.length, 1
+ gather_connections
+ assert_equal @connections.length, 5
end
end
end
Please sign in to comment.
Something went wrong with that request. Please try again.