Permalink
Browse files

Add synchronization to connection pool also

  • Loading branch information...
1 parent 50cd4bd commit cab76ce6ac2983f59451e2d53b23746a2873aea0 @nicksieger nicksieger committed with nicksieger Apr 19, 2008
@@ -12,6 +12,9 @@ def initialize(spec)
# The ConnectionSpecification for this pool
@spec = spec
+
+ # The mutex used to synchronize pool access
+ @connection_mutex = Monitor.new
end
def active_connection_name #:nodoc:
@@ -70,7 +73,7 @@ def retrieve_connection #:nodoc:
# Verify the connection.
conn.verify!(verification_timeout)
else
- self.connection = spec
+ self.set_connection spec
conn = active_connections[name]
end
@@ -82,23 +85,28 @@ def connected?
active_connections[active_connection_name] ? true : false
end
+ # Disconnect all connections in the pool.
def disconnect!
clear_cache!(@active_connections) do |name, conn|
conn.disconnect!
end
end
# Set the connection for the class.
- def connection=(spec) #:nodoc:
+ def set_connection(spec) #:nodoc:
if spec.kind_of?(ActiveRecord::ConnectionAdapters::AbstractAdapter)
active_connections[active_connection_name] = spec
elsif spec.kind_of?(ActiveRecord::Base::ConnectionSpecification)
- self.connection = ActiveRecord::Base.send(spec.adapter_method, spec.config)
+ self.set_connection ActiveRecord::Base.send(spec.adapter_method, spec.config)
else
raise ConnectionNotEstablished
end
end
+ synchronize :active_connection, :connection, :clear_active_connections!,
+ :clear_reloadable_connections!, :verify_active_connections!, :retrieve_connection,
+ :connected?, :disconnect!, :set_connection, :with => :@connection_mutex
+
private
def clear_cache!(cache, &block)
cache.each(&block) if block_given?
@@ -27,6 +27,8 @@ def synchronize
@@connection_pools = {}
class << self
+ # Turning on allow_concurrency basically switches a null mutex for a real one, so that
+ # multi-threaded access of the connection pools hash is synchronized.
def allow_concurrency=(flag)
if @@allow_concurrency != flag
if flag
@@ -37,7 +39,7 @@ def allow_concurrency=(flag)
end
end
- # for internal use only
+ # for internal use only and for testing
def active_connections #:nodoc:
@@connection_pools.inject({}) do |hash,kv|
hash[kv.first] = kv.last.active_connection
@@ -18,15 +18,15 @@ def synchronize(*methods)
raise ArgumentError, "Synchronization needs a mutex. Supply an options hash with a :with key as the last argument (e.g. synchronize :hello, :with => :@mutex)."
end
- methods.each do |method|
+ methods.flatten.each do |method|
aliased_method, punctuation = method.to_s.sub(/([?!=])$/, ''), $1
if instance_methods.include?("#{aliased_method}_without_synchronization#{punctuation}")
raise ArgumentError, "#{method} is already synchronized. Double synchronization is not currently supported."
end
module_eval(<<-EOS, __FILE__, __LINE__)
def #{aliased_method}_with_synchronization#{punctuation}(*args, &block)
#{with}.synchronize do
- #{aliased_method}_without_synchronization#{punctuation}(*args,&block)
+ #{aliased_method}_without_synchronization#{punctuation}(*args, &block)
end
end
EOS

0 comments on commit cab76ce

Please sign in to comment.