Skip to content

Commit

Permalink
Merge pull request #75 from mperham/rollback_timeout_handling
Browse files Browse the repository at this point in the history
Non-local return forces disconnect
  • Loading branch information
mperham committed Apr 11, 2015
2 parents 7c5a548 + 7acf930 commit 6007d32
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 59 deletions.
10 changes: 10 additions & 0 deletions Changes.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
connection\_pool changelog
---------------------------

2.2.0
------

- Rollback `Timeout` handling introduced in 2.1.1 and 2.1.2. It seems
impossible to safely work around the issue. Please never, ever use
`Timeout.timeout` in your code or you will see rare but mysterious bugs. [#75]

2.1.3
------

Expand Down
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ MongoDB has its own connection pool. ActiveRecord has its own connection pool.
This is a generic connection pool that can be used with anything, e.g. Redis,
Dalli and other Ruby network clients.

**WARNING**: Don't ever use `Timeout.timeout` in your Ruby code or you will see
occasional silent corruption and mysterious errors. The Timeout API is unsafe
and cannot be used correctly, ever. Use proper socket timeout options as
exposed by Net::HTTP, Redis, Dalli, etc.


Usage
-----
Expand Down
37 changes: 24 additions & 13 deletions lib/connection_pool.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require_relative 'connection_pool/version'
require_relative 'connection_pool/timed_stack'


# Generic connection pool class for e.g. sharing a limited number of network connections
# among many threads. Note: Connections are lazily created.
#
Expand Down Expand Up @@ -52,26 +53,36 @@ def initialize(options = {}, &block)
@key = :"current-#{@available.object_id}"
end

if Thread.respond_to?(:handle_interrupt)

# MRI
def with(options = {})
Thread.handle_interrupt(Exception => :never) do
conn = checkout(options)
begin
Thread.handle_interrupt(Exception => :immediate) do
yield conn
end
ensure
checkin
end
end
end

else

# jruby 1.7.x
def with(options = {})
# Connections can become corrupted via Timeout::Error. Discard
# any connection whose usage after checkout does not finish as expected.
# See #67
success = false
conn = checkout(options)
begin
result = yield conn
success = true # means the connection wasn't interrupted
result
yield conn
ensure
if success
# everything is roses, we can safely check the connection back in
checkin
else
@available.discard!(pop_connection)
end
checkin
end
end

end

def checkout(options = {})
conn = if stack.empty?
timeout = options[:timeout] || @timeout
Expand Down
22 changes: 0 additions & 22 deletions lib/connection_pool/timed_stack.rb
Original file line number Diff line number Diff line change
Expand Up @@ -117,28 +117,6 @@ def length
@max - @created + @que.length
end

##
# Indicates that a connection isn't coming back, allowing a new one to be
# created to replace it.

def discard!(obj)
@mutex.synchronize do
if @shutdown_block
@shutdown_block.call(obj)
else
# try to shut down the connection before throwing it away
if obj.respond_to?(:close) # Dalli::Client
obj.close
elsif obj.respond_to?(:disconnect!) # Redis
obj.disconnect!
end
@created -= 1
end

@resource.broadcast
end
end

private

##
Expand Down
2 changes: 1 addition & 1 deletion lib/connection_pool/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
class ConnectionPool
VERSION = "2.1.3"
VERSION = "2.2.0"
end
52 changes: 29 additions & 23 deletions test/test_connection_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,38 +109,44 @@ def test_with
assert Thread.new { pool.checkout }.join
end

def test_with_with_dangerous_timeouts
case RUBY_ENGINE.to_sym
when :jruby
skip('JRuby GC dislikes this test')
when :ruby
if RUBY_VERSION == '2.0.0' && RUBY_PATCHLEVEL == 598
skip("#{RUBY_VERSION}p#{RUBY_PATCHLEVEL} GC dislikes this test")
def test_with_timeout
pool = ConnectionPool.new(:timeout => 0, :size => 1) { Object.new }

assert_raises Timeout::Error do
Timeout.timeout(0.01) do
pool.with do |obj|
assert_equal 0, pool.instance_variable_get(:@available).instance_variable_get(:@que).size
sleep 0.015
end
end
end
assert_equal 1, pool.instance_variable_get(:@available).instance_variable_get(:@que).size
end

marker_class = Class.new
pool = ConnectionPool.new(:timeout => 0, :size => 1) { marker_class.new }

# no "connections" allocated yet
assert_equal [], ObjectSpace.each_object(marker_class).to_a
def test_checkout_ignores_timeout
skip("Thread.handle_interrupt not available") unless Thread.respond_to?(:handle_interrupt)

checkin_time = 0.05
pool = ConnectionPool.new(:timeout => 0, :size => 1) { Object.new }
def pool.checkout(options)
sleep 0.015
super
end

did_something = false
assert_raises Timeout::Error do
Timeout.timeout(checkin_time) do
pool.with do
# a "connection" has been allocated
refute_equal [], ObjectSpace.each_object(marker_class).to_a
sleep 2 * checkin_time
Timeout.timeout(0.01) do
pool.with do |obj|
did_something = true
# Timeout::Error will be triggered by any non-trivial Ruby code
# executed here since it couldn't be raised during checkout.
# It looks like setting the local variable above does not trigger
# the Timeout check in MRI 2.2.1.
obj.tap { obj.hash }
end
end
end

GC.start

# no dangling references to this "connection" remain
assert_equal [], ObjectSpace.each_object(marker_class).to_a
assert did_something
assert_equal 1, pool.instance_variable_get(:@available).instance_variable_get(:@que).size
end

def test_explicit_return
Expand Down

0 comments on commit 6007d32

Please sign in to comment.