Skip to content

Commit

Permalink
Add temporarily_release_connection Database extension for multithread…
Browse files Browse the repository at this point in the history
…ed transactional testing

This allows one thread to start a transaction, and then release
the connection back for usage by the connection pool, so that
other threads can operate on the connection object safely inside
the transaction.  This requires the connection pool be limited
to a single connection, to ensure that the released connection
can be reacquired.  It's not perfect, because if the connection
is disconnected and removed from the pool while temporarily
released, there is no way to handle that situation correctly.
  • Loading branch information
jeremyevans committed May 6, 2024
1 parent 1549236 commit f372eeb
Show file tree
Hide file tree
Showing 5 changed files with 322 additions and 0 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG
@@ -1,3 +1,7 @@
=== master

* Add temporarily_release_connection Database extension for multithreaded transactional testing (jeremyevans)

=== 5.80.0 (2024-05-01)

* Support Dataset#skip_locked on MariaDB 10.6+ (simi) (#2150)
Expand Down
178 changes: 178 additions & 0 deletions lib/sequel/extensions/temporarily_release_connection.rb
@@ -0,0 +1,178 @@
# frozen-string-literal: true
#
# The temporarily_release_connection extension adds support for temporarily
# releasing a checked out connection back to the connection pool. It is
# designed for use in multithreaded transactional integration tests, allowing
# a connection to start a transaction in one thread, but be temporarily
# released back to the connection pool, so it can be operated on safely
# by multiple threads inside a block. For example, the main thread could be
# running tests that send web requests, and a separate thread running a web
# server that is responding to those requests, and the same connection and
# transaction would be used for both.
#
# To load the extension into the database:
#
# DB.extension :temporarily_release_connection
#
# After the extension is loaded, call the +temporarily_release_connection+
# method with the connection object to temporarily release the connection
# back to the pool. Example:
#
# DB.transaction(rollback: :always, auto_savepoint: true) do |conn|
# DB.temporarily_release_connection(conn) do
# # Other threads can operate on connection safely inside the transaction
# yield
# end
# end
#
# For sharded connection pools, the second argument to +temporarily_release_connection+
# is respected, and specifies the server on which to temporarily release the connection.
#
# The temporarily_release_connection extension is only supported with the
# threaded and timed_queue connection pools that ship with Sequel (and the sharded
# versions of each). To make sure that same connection object can be reacquired, it
# is only supported if the maximum connection pool size is 1, so set the Database
# :max_connections option to 1 if you plan to use this extension.
#
# If the +temporarily_release_connection+ method cannot reacquire the same connection
# it released to the pool, it will raise a Sequel::UnableToReacquireConnectionError
# exception. This should only happen if the connection has been disconnected
# while it was temporarily released. If this error is raised, Database#transaction
# will not rollback the transaction, since the connection object is likely no longer
# valid, and on poorly written database drivers, that could cause the process to crash.
#
# Related modules: Sequel::TemporarilyReleaseConnection,
# Sequel::UnableToReacquireConnectionError

#
module Sequel
# Error class raised if the connection pool does not provide the same connection
# object when checking a temporarily released connection out.
class UnableToReacquireConnectionError < Error
end

module TemporarilyReleaseConnection
module DatabaseMethods
# Temporarily release the connection back to the connection pool for the
# duration of the block.
def temporarily_release_connection(conn, server=:default, &block)
pool.temporarily_release_connection(conn, server, &block)
end

private

# Do nothing if UnableToReacquireConnectionError is raised, as it is
# likely the connection is not in a usable state.
def rollback_transaction(conn, opts)
return if UnableToReacquireConnectionError === $!
super
end
end

module PoolMethods
# Temporarily release a currently checked out connection, then yield to the block. Reacquire the same
# connection upon the exit of the block.
def temporarily_release_connection(conn, server)
t = Sequel.current
raise Error, "connection not currently checked out" unless conn.equal?(trc_owned_connection(t, server))

begin
trc_release(t, conn, server)
yield
ensure
c = trc_acquire(t, server)
unless conn.equal?(c)
raise UnableToReacquireConnectionError, "reacquired connection not the same as initial connection"
end
end
end
end

module TimedQueue
private

def trc_owned_connection(t, server)
owned_connection(t)
end

def trc_release(t, conn, server)
release(t)
end

def trc_acquire(t, server)
acquire(t)
end
end

module ShardedTimedQueue
# Normalize the server name for sharded connection pools
def temporarily_release_connection(conn, server)
server = pick_server(server)
super
end

private

def trc_owned_connection(t, server)
owned_connection(t, server)
end

def trc_release(t, conn, server)
release(t, conn, server)
end

def trc_acquire(t, server)
acquire(t, server)
end
end

module ThreadedBase
private

def trc_release(t, conn, server)
sync{super}
end
end

module Threaded
include TimedQueue
include ThreadedBase
end

module ShardedThreaded
include ShardedTimedQueue
include ThreadedBase
end
end

trc = TemporarilyReleaseConnection
trc_map = {
:threaded => trc::Threaded,
:sharded_threaded => trc::ShardedThreaded,
:timed_queue => trc::TimedQueue,
:sharded_timed_queue => trc::ShardedTimedQueue,
}.freeze

Database.register_extension(:temporarily_release_connection) do |db|
unless pool_mod = trc_map[db.pool.pool_type]
raise(Error, "temporarily_release_connection extension not supported for connection pool type #{db.pool.pool_type}")
end

case db.pool.pool_type
when :threaded, :sharded_threaded
if db.opts[:connection_handling] == :disconnect
raise Error, "temporarily_release_connection extension not supported with connection_handling: :disconnect option"
end
end

unless db.pool.max_size == 1
raise Error, "temporarily_release_connection extension not supported unless :max_connections option is 1"
end

db.extend(trc::DatabaseMethods)
db.pool.extend(trc::PoolMethods)
db.pool.extend(pool_mod)
end

private_constant :TemporarilyReleaseConnection
end
26 changes: 26 additions & 0 deletions spec/adapters/sqlite_spec.rb
Expand Up @@ -1058,3 +1058,29 @@ def setup_db(opts)
db[:names].where(name: /^J/).select_order_map(:name).must_equal %w[Jane John]
end if RUBY_VERSION >= '3.3'
end if DB.adapter_scheme == :sqlite

# Force a separate Database object for these tests, so temporarily_release_connection
# extension is always tested if testing the sqlite adapter.
describe 'temporarily_release_connection plugin' do
it "should temporarily release a connection" do
db = Sequel.sqlite
db.extension :temporarily_release_connection

db.create_table(:i){Integer :i}

db.transaction(:rollback=>:always) do |c|
db.temporarily_release_connection(c) do
4.times.map do |i|
Thread.new do
db.synchronize do |conn|
_(conn).must_be_same_as c
end
db[:i].insert(i)
end
end.map(&:join)
end
db[:i].count.must_equal 4
end
db[:i].count.must_equal 0
end
end if DB.adapter_scheme == :sqlite
110 changes: 110 additions & 0 deletions spec/extensions/temporarily_release_connection_spec.rb
@@ -0,0 +1,110 @@
require_relative "spec_helper"

pool_types = [ :threaded, :sharded_threaded]
pool_types += [ :timed_queue, :sharded_timed_queue] if RUBY_VERSION >= '3.2'

pool_types.each do |pool_type|
describe "temporarily_release_connection extension with pool class #{pool_type}" do
before do
opts = {:max_connections=>1, :pool_class=>pool_type}
if pool_type.to_s.start_with?('sharded')
opts[:servers] = {:foo=>{}, :bar=>{}}
end
@db = Sequel.mock(opts).extension(:temporarily_release_connection)
end

it "should temporarily release connection during block so it can be acquired by other threads" do
conns = []
@db.transaction(:rollback=>:always) do |c|
@db.temporarily_release_connection(c) do
4.times.map do |i|
Thread.new do
@db.synchronize do |conn|
conns << conn
end
end
end.map(&:join)
end
end

c = @db.synchronize{|conn| conn}
conns.size.must_equal 4
conns.each do |conn|
conn.must_be_same_as c
end

@db.sqls.must_equal ['BEGIN', 'ROLLBACK']
end

it "should temporarily release connection for specific shard during block so it can be acquired by other threads" do
conns = []
@db.transaction(:rollback=>:always, :server=>:foo) do |c|
@db.temporarily_release_connection(c, :foo) do
@db.transaction(:rollback=>:always, :server=>:bar) do |c2|
@db.temporarily_release_connection(c2, :bar) do
4.times.map do |i|
Thread.new do
@db.synchronize(:foo) do |conn|
@db.synchronize(:bar) do |conn2|
conns << [conn, conn2]
end
end
end
end.map(&:join)
end
end
end
end

c = @db.synchronize(:foo){|conn| conn}
c2 = @db.synchronize(:bar){|conn| conn}
conns.size.must_equal 4
conns.each do |conn, conn2|
conn.must_be_same_as c
conn2.must_be_same_as c2
end

@db.sqls.must_equal ["BEGIN -- foo", "BEGIN -- bar", "ROLLBACK -- bar", "ROLLBACK -- foo"]
end if pool_type.to_s.start_with?('sharded')

it "should raise UnableToReacquireConnectionError if unable to reacquire the same connection it released" do
proc do
@db.transaction(rollback: :always) do |conn|
@db.temporarily_release_connection(conn) do
@db.disconnect
end
end
end.must_raise Sequel::UnableToReacquireConnectionError
@db.sqls.must_equal ['BEGIN']
end

it "should raise if provided a connection that is not checked out" do
proc do
@db.temporarily_release_connection(@db.synchronize{|conn| conn})
end.must_raise Sequel::Error
end

it "should raise if pool max_size is not 1" do
db = Sequel.mock(:pool_type=>pool_type)
proc do
db.extension(:temporarily_release_connection)
end.must_raise Sequel::Error
end
end
end

describe "temporarily_release_connection extension" do
it "should raise if pool uses connection_handling: :disconnect option" do
db = Sequel.mock(:connection_handling=>:disconnect)
proc do
db.extension(:temporarily_release_connection)
end.must_raise Sequel::Error
end

it "should raise if pool uses unsupported pool type" do
db = Sequel.mock(:pool_class=>:single)
proc do
db.extension(:temporarily_release_connection)
end.must_raise Sequel::Error
end
end
4 changes: 4 additions & 0 deletions www/pages/plugins.html.erb
Expand Up @@ -771,6 +771,10 @@
<span class="ul__span">Normalizes SQL before logging, helpful for analytics and sensitive data.</span>
</li>
<li class="ul__li ul__li--grid">
<a class="a" href="rdoc-plugins/files/lib/sequel/extensions/temporarily_release_connection_rb.html">temporarily_release_connection </a>
<span class="ul__span">Allows for multithreaded transactional testing by temporarily releasing checked-out connections back to the pool.</span>
</li>
<li class="ul__li ul__li--grid">
<a class="a" href="rdoc-plugins/files/lib/sequel/extensions/transaction_connection_validator_rb.html">transaction_connection_validator </a>
<span class="ul__span">Handle disconnect failures detected when starting a new transaction using a new connection transparently.</span>
</li>
Expand Down

0 comments on commit f372eeb

Please sign in to comment.