Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Nearing the finish line. Initial fixed-size connection pool implement…

…ed, more docs
  • Loading branch information...
commit fe575dd4a9f0fa0e71a89fae9f4a951a9fb36058 1 parent 82fcd9d
Nick Sieger nicksieger authored
12 README.rdoc
View
@@ -20,15 +20,5 @@ be more thread-safe, and to add connection pooling features.
Remaining tasks:
-- Fixed-size connection pool
-- Add checkin/checkout API
-- Add a #with_connection API that allows checkin/checkout of a connection outside of a provided block.
-
- Model.with_connection do |conn|
- Thread.new {
- Model.connection = conn
- # do something with conn
- }
- Model.connection.select ....
- end
+- Review and put the thing to real work.
- Look at whether existing clear_* or verify_* methods can be deprecated or removed.
138 activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb
View
@@ -2,14 +2,43 @@
require 'set'
module ActiveRecord
+ # Raised when a connection could not be obtained within the connection
+ # acquisition timeout period.
+ class ConnectionTimeoutError < ConnectionNotEstablished
+ end
+
module ConnectionAdapters
- # Connection pool base class and ActiveRecord database connections.
- class ConnectionPool
+ # Connection pool base class for managing ActiveRecord database
+ # connections.
+ #
+ # Connections can be obtained and used from a connection pool in several
+ # ways:
+ #
+ # 1. Simply use ActiveRecord::Base.connection as in pre-connection-pooled
+ # ActiveRecord. Eventually, when you're done with the connection and
+ # wish it to be returned to the pool, you call
+ # ActiveRecord::Base.connection_pool.release_thread_connection.
+ # 2. Manually check out a connection from the pool with
+ # ActiveRecord::Base.connection_pool.checkout. You are responsible for
+ # returning this connection to the pool when finished by calling
+ # ActiveRecord::Base.connection_pool.checkin(connection).
+ # 3. Use ActiveRecord::Base.connection_pool.with_connection(&block), which
+ # obtains a connection, yields it as the sole argument to the block,
+ # and returns it to the pool after the block completes.
+ class AbstractConnectionPool
# Factory method for connection pools.
- # Determines pool type to use based on contents of connection specification.
- # FIXME: specification configuration TBD.
+ # Determines pool type to use based on contents of connection
+ # specification. Additional options for connection specification:
+ #
+ # * +pool+: number indicating size of fixed connection pool to use
+ # * +wait_timeout+ (optional): number of seconds to block and wait
+ # for a connection before giving up and raising a timeout error.
def self.create(spec)
- ConnectionPerThread.new(spec)
+ if spec.config[:pool] && spec.config[:pool].to_i > 0
+ ConnectionPool.new(spec)
+ else
+ ConnectionPerThread.new(spec)
+ end
end
delegate :verification_timeout, :to => "::ActiveRecord::Base"
@@ -115,11 +144,16 @@ def connections
end
private :connections
- synchronize :connection, :release_thread_connection, :checkout, :checkin,
+ synchronize :connection, :release_thread_connection,
:clear_reloadable_connections!, :verify_active_connections!,
:connected?, :disconnect!, :with => :@connection_mutex
private
+ def new_connection
+ config = spec.config.reverse_merge(:allow_concurrency => ActiveRecord::Base.allow_concurrency)
+ ActiveRecord::Base.send(spec.adapter_method, config)
+ end
+
def active_connection_name #:nodoc:
Thread.current.object_id
end
@@ -139,7 +173,10 @@ def remove_stale_cached_threads!(cache, &block)
end
end
- class ConnectionPerThread < ConnectionPool
+ # ConnectionPerThread is a simple implementation: always create/disconnect
+ # on checkout/checkin, and use the base class reserved connections hash to
+ # manage the per-thread connections.
+ class ConnectionPerThread < AbstractConnectionPool
def active_connection
@reserved_connections[active_connection_name]
end
@@ -155,12 +192,6 @@ def checkin(conn)
end
private
- # Set the connection for the class.
- def new_connection
- config = spec.config.reverse_merge(:allow_concurrency => ActiveRecord::Base.allow_concurrency)
- ActiveRecord::Base.send(spec.adapter_method, config)
- end
-
def connections
@reserved_connections.values
end
@@ -170,6 +201,70 @@ def remove_connection(conn)
end
end
+ # ConnectionPool provides a full, fixed-size connection pool with timed
+ # waits when the pool is exhausted.
+ class ConnectionPool < AbstractConnectionPool
+ def initialize(spec)
+ super
+ # default 5 second timeout
+ @timeout = spec.config[:wait_timeout] || 5
+ @size = spec.config[:pool].to_i
+ @queue = @connection_mutex.new_cond
+ @connections = []
+ @checked_out = []
+ end
+
+ def checkout
+ # Checkout an available connection
+ conn = @connection_mutex.synchronize do
+ if @connections.length < @size
+ checkout_new_connection
+ elsif @checked_out.size < @connections.size
+ checkout_existing_connection
+ end
+ end
+ return conn if conn
+
+ # No connections available; wait for one
+ @connection_mutex.synchronize do
+ if @queue.wait(@timeout)
+ checkout_existing_connection
+ else
+ raise ConnectionTimeoutError, "could not obtain a database connection in a timely fashion"
+ end
+ end
+ end
+
+ def checkin(conn)
+ @connection_mutex.synchronize do
+ @checked_out -= conn
+ @queue.signal
+ end
+ end
+
+ private
+ def checkout_new_connection
+ c = new_connection
+ @connections << c
+ @checked_out << c
+ c
+ end
+
+ def checkout_existing_connection
+ c = [@connections - @checked_out].first
+ @checked_out << c
+ c
+ end
+
+ def connections
+ @connections
+ end
+
+ def remove_connection(conn)
+ @connections.delete conn
+ end
+ end
+
module ConnectionHandlerMethods
def initialize(pools = {})
@connection_pools = pools
@@ -180,7 +275,7 @@ def connection_pools
end
def establish_connection(name, spec)
- @connection_pools[name] = ConnectionAdapters::ConnectionPool.create(spec)
+ @connection_pools[name] = ConnectionAdapters::AbstractConnectionPool.create(spec)
end
# for internal use only and for testing;
@@ -238,15 +333,14 @@ def remove_connection(klass)
pool.spec.config if pool
end
- private
- def retrieve_connection_pool(klass)
- loop do
- pool = @connection_pools[klass.name]
- return pool if pool
- return nil if ActiveRecord::Base == klass
- klass = klass.superclass
- end
+ def retrieve_connection_pool(klass)
+ loop do
+ pool = @connection_pools[klass.name]
+ return pool if pool
+ return nil if ActiveRecord::Base == klass
+ klass = klass.superclass
end
+ end
end
# This connection handler is not thread-safe, as it does not protect access
4 activerecord/lib/active_record/connection_adapters/abstract/connection_specification.rb
View
@@ -116,6 +116,10 @@ def connection
retrieve_connection
end
+ def connection_pool
+ connection_handler.retrieve_connection_pool(self)
+ end
+
def retrieve_connection
connection_handler.retrieve_connection(self)
end
36 activerecord/test/cases/threaded_connections_test.rb
View
@@ -40,4 +40,40 @@ def test_threaded_connections
assert_equal @connections.length, 5
end
end
+
+ class PooledConnectionsTest < ActiveRecord::TestCase
+ def setup
+ @connection = ActiveRecord::Base.remove_connection
+ @connections = []
+ @allow_concurrency = ActiveRecord::Base.allow_concurrency
+ ActiveRecord::Base.allow_concurrency = true
+ end
+
+ def teardown
+ ActiveRecord::Base.clear_all_connections!
+ ActiveRecord::Base.allow_concurrency = @allow_concurrency
+ ActiveRecord::Base.establish_connection(@connection)
+ end
+
+ def gather_connections
+ ActiveRecord::Base.establish_connection(@connection.merge({:pool => 2, :wait_timeout => 0.3}))
+ @timed_out = 0
+
+ 4.times do
+ Thread.new do
+ begin
+ @connections << ActiveRecord::Base.connection_pool.checkout
+ rescue ActiveRecord::ConnectionTimeoutError
+ @timed_out += 1
+ end
+ end.join
+ end
+ end
+
+ def test_threaded_connections
+ gather_connections
+ assert_equal @connections.length, 2
+ assert_equal @timed_out, 2
+ end
+ end
end
Please sign in to comment.
Something went wrong with that request. Please try again.