Skip to content

Commit

Permalink
Extract a base class for connection pools, start to flesh out reserve…
Browse files Browse the repository at this point in the history
…/release API
  • Loading branch information
nicksieger committed Aug 29, 2008
1 parent 51349ec commit 029952e
Showing 1 changed file with 105 additions and 84 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -3,135 +3,156 @@


module ActiveRecord module ActiveRecord
module ConnectionAdapters module ConnectionAdapters
# Connection pool API for ActiveRecord database connections.
class ConnectionPool class ConnectionPool
delegate :verification_timeout, :to => "::ActiveRecord::Base" # Factory method for connection pools.
attr_reader :active_connections, :spec # Determines pool type to use based on contents of connection specification.
# FIXME: specification configuration TBD.
def self.create(spec)
ConnectionPerThread.new(spec)
end


delegate :verification_timeout, :to => "::ActiveRecord::Base"
attr_reader :spec
def initialize(spec) def initialize(spec)
# The thread id -> adapter cache.
@active_connections = {}

# The ConnectionSpecification for this pool
@spec = spec @spec = spec

# The cache of reserved connections mapped to threads
@reserved_connections = {}
# The mutex used to synchronize pool access # The mutex used to synchronize pool access
@connection_mutex = Monitor.new @connection_mutex = Monitor.new
end end


def active_connection_name #:nodoc: # Retrieve the connection reserved for the current thread, or call #reserve to obtain one
Thread.current.object_id # if necessary.
def open_connection
if conn = @reserved_connections[active_connection_name]
conn.verify!(verification_timeout)
conn
else
@reserved_connections[active_connection_name] = reserve
end
end end
alias connection open_connection


def active_connection def close_connection
active_connections[active_connection_name] conn = @reserved_connections.delete(active_connection_name)
release conn if conn
end end


# Returns the connection currently associated with the class. This can # Returns true if a connection has already been opened.
# also be used to "borrow" the connection to do database work unrelated def connected?
# to any of the specific Active Records. !connections.empty?
def connection
if conn = active_connections[active_connection_name]
conn
else
# retrieve_connection sets the cache key.
conn = retrieve_connection
active_connections[active_connection_name] = conn
end
end end


# Clears the cache which maps classes to connections. # Reserve (check-out) a database connection for the current thread.
def clear_active_connections! def reserve
clear_entries!(@active_connections, [active_connection_name]) do |name, conn| raise NotImplementedError, "reserve is an abstract method"
end
alias checkout reserve

# Release (check-in) a database connection for the current thread.
def release(connection)
raise NotImplementedError, "release is an abstract method"
end
alias checkin release

# Disconnect all connections in the pool.
def disconnect!
@reserved_connections.each do |name,conn|
release(conn)
end
connections.each do |conn|
conn.disconnect! conn.disconnect!
end end
@reserved_connections = {}
end end


# Clears the cache which maps classes # Clears the cache which maps classes
def clear_reloadable_connections! def clear_reloadable_connections!
@active_connections.each do |name, conn| @reserved_connections.each do |name, conn|
release(conn)
end
@reserved_connections = {}
connections.each do |conn|
if conn.requires_reloading? if conn.requires_reloading?
conn.disconnect! conn.disconnect!
@active_connections.delete(name) remove_connection conn
end end
end end
end end


# Verify active connections. # Verify active connections.
def verify_active_connections! #:nodoc: def verify_active_connections! #:nodoc:
remove_stale_cached_threads!(@active_connections) do |name, conn| remove_stale_cached_threads!(@reserved_connections) do |name, conn|
conn.disconnect! release(conn)
end end
active_connections.each_value do |connection| connections.each do |connection|
connection.verify!(verification_timeout) connection.verify!(verification_timeout)
end end
end end


def retrieve_connection #:nodoc: synchronize :open_connection, :close_connection, :reserve, :release,
# Name is nil if establish_connection hasn't been called for :clear_reloadable_connections!, :verify_active_connections!,
# some class along the inheritance chain up to AR::Base yet. :connected?, :disconnect!, :with => :@connection_mutex
name = active_connection_name
if conn = active_connections[name]
# Verify the connection.
conn.verify!(verification_timeout)
else
self.set_connection spec
conn = active_connections[name]
end


conn or raise ConnectionNotEstablished private
def active_connection_name #:nodoc:
Thread.current.object_id
end end


# Returns true if a connection that's accessible to this class has already been opened. def remove_connection(conn)
def connected? raise NotImplementedError, "remove_connection is an abstract method"
active_connections[active_connection_name] ? true : false
end end


# Disconnect all connections in the pool. # Array containing all connections (reserved or available) in the pool.
def disconnect! def connections
clear_cache!(@active_connections) do |name, conn| raise NotImplementedError, "connections is an abstract method"
conn.disconnect!
end
end end


# Set the connection for the class. # Remove stale threads from the cache.
def set_connection(spec) #:nodoc: def remove_stale_cached_threads!(cache, &block)
if spec.kind_of?(ActiveRecord::ConnectionAdapters::AbstractAdapter) keys = Set.new(cache.keys)
active_connections[active_connection_name] = spec
elsif spec.kind_of?(ActiveRecord::Base::ConnectionSpecification) Thread.list.each do |thread|
config = spec.config.reverse_merge(:allow_concurrency => ActiveRecord::Base.allow_concurrency) keys.delete(thread.object_id) if thread.alive?
self.set_connection ActiveRecord::Base.send(spec.adapter_method, config) end
else keys.each do |key|
raise ConnectionNotEstablished next unless cache.has_key?(key)
block.call(key, cache[key])
cache.delete(key)
end end
end end
end

class ConnectionPerThread < ConnectionPool
def active_connection
@reserved_connections[active_connection_name]
end


synchronize :active_connection, :connection, :clear_active_connections!, def active_connections; @reserved_connections; end
:clear_reloadable_connections!, :verify_active_connections!, :retrieve_connection,
:connected?, :disconnect!, :set_connection, :with => :@connection_mutex


private def reserve
def clear_cache!(cache, &block) new_connection
cache.each(&block) if block_given? end
cache.clear
end


# Remove stale threads from the cache. def release(conn)
def remove_stale_cached_threads!(cache, &block) conn.disconnect!
stale = Set.new(cache.keys) end


Thread.list.each do |thread| private
stale.delete(thread.object_id) if thread.alive? # Set the connection for the class.
end def new_connection
clear_entries!(cache, stale, &block) config = spec.config.reverse_merge(:allow_concurrency => ActiveRecord::Base.allow_concurrency)
end ActiveRecord::Base.send(spec.adapter_method, config)
end


def clear_entries!(cache, keys, &block) def connections
keys.each do |key| @reserved_connections.values
next unless cache.has_key?(key) end
block.call(key, cache[key])
cache.delete(key) def remove_connection(conn)
end @reserved_connections.delete_if {|k,v| v == conn}
end end
end end


module ConnectionHandlerMethods module ConnectionHandlerMethods
Expand All @@ -144,7 +165,7 @@ def connection_pools
end end


def establish_connection(name, spec) def establish_connection(name, spec)
@connection_pools[name] = ConnectionAdapters::ConnectionPool.new(spec) @connection_pools[name] = ConnectionAdapters::ConnectionPool.create(spec)
end end


# for internal use only and for testing # for internal use only and for testing
Expand All @@ -158,7 +179,7 @@ def active_connections #:nodoc:


# Clears the cache which maps classes to connections. # Clears the cache which maps classes to connections.
def clear_active_connections! def clear_active_connections!
@connection_pools.each_value {|pool| pool.clear_active_connections! } @connection_pools.each_value {|pool| pool.close_connection }
end end


# Clears the cache which maps classes # Clears the cache which maps classes
Expand Down

0 comments on commit 029952e

Please sign in to comment.