Skip to content

Commit

Permalink
connection pool validator - first draft
Browse files Browse the repository at this point in the history
  • Loading branch information
kares committed Nov 26, 2015
1 parent 507b863 commit 4924e0c
Show file tree
Hide file tree
Showing 3 changed files with 357 additions and 1 deletion.
13 changes: 12 additions & 1 deletion lib/active_record/bogacs/default_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def wait_poll(timeout)
require 'active_record/bogacs/reaper.rb'

attr_accessor :automatic_reconnect, :checkout_timeout
attr_reader :spec, :connections, :size, :reaper
attr_reader :spec, :connections, :size, :reaper, :validator
attr_reader :initial_size

# Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
Expand Down Expand Up @@ -225,6 +225,14 @@ def initialize(spec)
initial_size = (@size * initial_size).to_i if initial_size <= 1.0
# NOTE: warn on onitial_size > size !
prefill_initial_connections if ( @initial_size = initial_size.to_i ) > 0

if frequency = spec.config[:validate_frequency]
require 'active_record/bogacs/validator' unless self.class.const_defined?(:Validator)
@validator = Validator.new self, frequency, spec.config[:validate_timeout]
if @validator.run && @reaping
logger && logger.info("pool: validator configured alongside with reaper")
end
end
end

# Retrieve the connection associated with the current thread, or call
Expand Down Expand Up @@ -418,6 +426,9 @@ def reap
def reaper?; (@reaper ||= nil) && @reaper.frequency end
def reaping?; reaper? && @reaper.running? end

def validator?; (@validator ||= nil) && @validator.frequency end
def validating?; validator? && @validator.running? end

#@@logger = nil
def logger; ::ActiveRecord::Base.logger end
#def logger=(logger); @@logger = logger end
Expand Down
169 changes: 169 additions & 0 deletions lib/active_record/bogacs/validator.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
begin
require 'concurrent/executors'
require 'concurrent/timer_task'
rescue LoadError => e
warn "activerecord-bogacs' validator feature needs gem 'concurrent-ruby', please install or add it to your Gemfile"
raise e
end

require 'active_record/connection_adapters/adapter_compat'
require 'active_record/bogacs/thread_safe'

module ActiveRecord
module Bogacs

# Every +frequency+ seconds, the reaper will call +reap+ on +pool+.
# A reaper instantiated with a nil frequency will never reap the
# connection pool.
#
# Configure the frequency by setting "reaping_frequency" in your
# database yaml file.
class Validator

attr_reader :pool, :frequency, :timeout

# Validator.new(self, spec.config[:validate_frequency]).run
# @private
def initialize(pool, frequency = 60, timeout = nil)
@pool = pool; PoolAdaptor.adapt! pool
if frequency # validate every 60s by default
frequency = frequency.to_f
@frequency = frequency > 0.0 ? frequency : false
else
@frequency = nil
end
if timeout
timeout = timeout.to_f
@timeout = timeout > 0.0 ? timeout : 0
else
@timeout = @frequency
end
@running = nil
end

def run
return unless frequency
@running = true; start
end

TimerTask = ::Concurrent::TimerTask
private_constant :TimerTask rescue nil

def start
TimerTask.new(:execution_interval => frequency, :timeout_interval => timeout) do
validate_connections
end
end

def running?; @running end

def validate
start = Time.now
conns = connections
logger && logger.debug("[validator] found #{conns.size} candidates to validate")
invalid = 0
conns.each { |connection| invalid += 1 if validate_connection(connection) == false }
logger && logger.info("[validator] validated pool in #{Time.now - start}s (removed #{invalid} connections from pool)")
end

private

def connections
connections = pool.connections.dup
connections.map! do |conn|
if conn
owner = conn.owner
if conn.in_use?
if owner && ! owner.alive? # stale-conn (reaping)
pool.remove conn # remove is synchronized
conn.disconnect! rescue nil
nil
elsif ! owner # NOTE: this is likely a nasty bug
logger && logger.warn("[validator] found in-use connection without owner - removing from pool")
pool.remove_without_owner conn
conn.disconnect! rescue nil
nil
else
nil # owner.alive? ... do not touch
end
else
conn # conn not in-use - candidate for validation
end
end
end
connections.compact
end

def validate_connection(conn)
return nil if conn.in_use?
pool.synchronize do # make sure it won't get checked-out while validating
return nil if conn.in_use?
# NOTE: active? is assumed to behave e.g. connection_alive_timeout used
# on AR-JDBC active? might return false as the JDBC connection is lazy
# ... but that is just fine!
return true if conn.active? # validate the connection - ping the DB

# TODO support last_use - only validate if certain amount since use passed

logger && logger.debug("[validator] found non-active connection - removing from pool")
pool.remove_without_owner conn # not active - remove
conn.disconnect! rescue nil
return false
end
end

#def synchronize(&block); pool.synchronize(&block) end

def logger
@logger ||= ( pool.respond_to?(:logger) ? pool.logger : nil ) rescue nil
end

module PoolAdaptor

def self.adapt!(pool)
unless pool.class.include?(PoolAdaptor)
pool.class.send :include, PoolAdaptor
end

return if pool.respond_to?(:thread_cached_conns)

if pool.instance_variable_get :@reserved_connections
class << pool
attr_reader :reserved_connections
alias_method :thread_cached_conns, :reserved_connections
end
elsif pool.instance_variable_get :@thread_cached_conns
class << pool
attr_reader :thread_cached_conns
end
else
raise NotImplementedError, "could not adapt pool: #{pool}"
end
end

def cached_conn_owner_id(conn)
thread_cached_conns.keys.each do |owner_id|
if thread_cached_conns[ owner_id ] == conn
return owner_id
end
end
nil
end

def remove_without_owner(conn)
remove conn # release(conn, nil) owner.object_id should do fine
release_without_owner conn
end

def release_without_owner(conn)
if owner_id = cached_conn_owner_id(conn)
thread_cached_conns.delete owner_id; return true
end
end

end

end

end
end
176 changes: 176 additions & 0 deletions test/active_record/bogacs/validator_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
require File.expand_path('../../test_helper', File.dirname(__FILE__))
require 'stringio'

module ActiveRecord
module Bogacs
class ValidatorTest < Test::Unit::TestCase

def self.startup
ConnectionAdapters::ConnectionHandler.connection_pool_class = DefaultPool
end

def self.shutdown
ConnectionAdapters::ConnectionHandler.connection_pool_class = nil
end

def config; AR_CONFIG end

def teardown; @pool.disconnect! if (@pool ||= nil) end

def test_null_validator
pool = new_pool :validate_frequency => nil

assert ! pool.validator?
sleep 0.05
assert ! pool.validating?
end

def test_parse_frequency
pool = new_pool :validate_frequency => '0'

assert ! pool.validator?
sleep 0.05
assert ! pool.validating?

assert ! Validator.new(pool, '').frequency
assert_equal 50, Validator.new(pool, '50').frequency
assert_equal 5.5, Validator.new(pool, '5.5').frequency
end

def test_validator?
assert pool.validator?
sleep 0.1
assert pool.validating?
end

require 'concurrent/atomic/atomic_fixnum.rb'
AtomicFixnum = ::Concurrent::AtomicFixnum

require 'concurrent/atomic/semaphore.rb'
Semaphore = ::Concurrent::Semaphore

def test_selects_non_used_connections
assert_equal [], validator.send(:connections)

count = AtomicFixnum.new
semaphore = Semaphore.new(2); semaphore.drain_permits
Thread.new {
pool.with_connection { |conn| assert conn; count.increment; semaphore.acquire }
}
Thread.new {
pool.with_connection { |conn| assert conn; count.increment; semaphore.acquire }
}
while count.value < 2; sleep 0.01 end

released_conn = nil
Thread.new {
pool.with_connection { |conn| assert released_conn = conn }
}.join


assert_equal 3, pool.connections.size
assert_equal 1, validator.send(:connections).size
assert_equal [ released_conn ], validator.send(:connections)

semaphore.release 2
end

def test_validate_connection
conn = connection; pool.remove conn
conn.expire; assert ! conn.in_use?
# lazy on AR-JDBC :
conn.tables; assert conn.active?

def conn.active_called?; @_active_called ||= false end
def conn.active?; @_active_called = true; super end

result = validator.send :validate_connection, conn
assert_true result

assert conn.active_called?
end

def test_validate_connection_non_valid
conn = connection; pool.remove conn
conn.expire; assert ! conn.in_use?

def conn.active?; false end

result = validator.send :validate_connection, conn
assert_false result
end

def test_validate_connection_in_use
conn = connection
assert conn.in_use?
def conn.active?; raise 'active? should not be called for a used connection' end

result = validator.send :validate_connection, conn
assert_nil result
end

def test_validate_connection_removes_invalid_connection_from_pool
conn = connection
puts pool.connections.map(&:object_id).inspect
Thread.new { pool.with_connection { |conn| assert conn } }.join
puts pool.connections.map(&:object_id).inspect
assert_equal 2, pool.connections.size

conn.expire; assert ! conn.in_use?

def conn.active?; false end

result = validator.send :validate_connection, conn
assert_false result

assert_equal 1, pool.connections.size
assert ! pool.send(:connections).include?(conn)
end

# def test_reap_error_restart
# logger = Logger.new str = StringIO.new
# @pool.reaper.instance_variable_set :@logger, logger
# def @pool.reap; raise RuntimeError, 'test_reap_error' end
#
# assert @pool.reaper?
# sleep 0.3
# assert_true @pool.reaping?
# assert_match /WARN.*reaping failed:.* test_reap_error.* restarting after/, str.string
# end

private

def connection
pool; ActiveRecord::Base.connection
end

def validator; pool.validator end

def pool
# self.startup: connection_pool_class = DefaultPool
@pool ||= (establish_connection; Base.connection_pool)
end

DEFAULT_OPTS = { :size => 5, :validate_frequency => 1 }

def establish_connection(opts = DEFAULT_OPTS)
ActiveRecord::Base.establish_connection config.merge opts
end

def new_pool(opts = DEFAULT_OPTS)
establish_connection config.merge opts
DefaultPool.new Base.connection_pool.spec
end

class TimerTaskStub

# :execution_interval => frequency, :timeout_interval => timeout
def self.new(opts, &block)
raise 'noop'
end

end

end
end
end

0 comments on commit 4924e0c

Please sign in to comment.