Skip to content

Commit

Permalink
Merge pull request #2 from cjbottaro/master
Browse files Browse the repository at this point in the history
Various fixes for fiber pool
  • Loading branch information
mperham committed Jul 27, 2011
2 parents 08b83d4 + 035e344 commit 8d9b0d4
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 8 deletions.
70 changes: 70 additions & 0 deletions examples/standalone.rb
@@ -0,0 +1,70 @@
# This program demonstrates that the connection pool actual works, as does the wait_timeout option.
# You need to provide your own configuration to #establish_connection.

$LOAD_PATH << File.dirname(__FILE__) + '/../lib'

require "eventmachine"
require "fiber"
require "active_record"
require "benchmark"

ActiveRecord::Base.logger = Logger.new(STDOUT)
ActiveRecord::Base.establish_connection :adapter => "em_postgresql",
:port => 5432,
:pool => 2,
:username => "cjbottaro",
:host => "localhost",
:database => "editor-ui_development",
:wait_timeout => 2

EM.run do
Fiber.new do
fibers = []
time = Benchmark.realtime do

fibers = 5.times.collect do
Fiber.new do
begin
ActiveRecord::Base.connection.execute "select pg_sleep(1)"
ActiveRecord::Base.clear_active_connections!
rescue => e
puts e.inspect
end
end.tap{ |fiber| fiber.resume }
end

fibers.each do |fiber|
while fiber.alive?
current_fiber = Fiber.current
EM.next_tick{ current_fiber.resume }
Fiber.yield
end
end

puts "first batch done"

# This is a copy/paste job.
fibers = 5.times.collect do
Fiber.new do
begin
ActiveRecord::Base.connection.execute "select pg_sleep(1)"
ActiveRecord::Base.clear_active_connections!
rescue => e
puts e.inspect
end
end.tap{ |fiber| fiber.resume }
end

fibers.each do |fiber|
while fiber.alive?
current_fiber = Fiber.current
EM.next_tick{ current_fiber.resume }
Fiber.yield
end
end

end
puts time
EM.stop
end.resume
end
Expand Up @@ -60,7 +60,7 @@ def connect
# See: http://www.postgresql.org/docs/current/static/runtime-config-compatible.html
# If PostgreSQL doesn't know the standard_conforming_strings parameter then it doesn't
# support escape string syntax. Don't override the inherited quoted_string_prefix.
if supports_standard_conforming_strings?
if respond_to?(:supports_standard_conforming_strings?) and supports_standard_conforming_strings?
self.class.instance_eval do
define_method(:quoted_string_prefix) { 'E' }
end
Expand Down Expand Up @@ -124,4 +124,4 @@ def self.em_postgresql_connection(config) # :nodoc:
end
end

end
end
42 changes: 36 additions & 6 deletions lib/active_record/patches.rb
Expand Up @@ -22,9 +22,7 @@ def wait(timeout)
fiber.resume(false)
end
@queue << fiber
returning Fiber.yield do
x.cancel
end
Fiber.yield.tap{ x.cancel }
end

def signal
Expand Down Expand Up @@ -75,28 +73,60 @@ def current_connection_id #:nodoc:

# Remove stale fibers from the cache.
def remove_stale_cached_threads!(cache, &block)
return if ActiveRecord::ConnectionAdapters.fiber_pools.empty?

keys = Set.new(cache.keys)

ActiveRecord::ConnectionAdapters.fiber_pools.each do |pool|
pool.busy_fibers.each_pair do |object_id, fiber|
keys.delete(object_id)
end
end
# puts "Pruning stale connections: #{f.busy_fibers.size} #{f.fibers.size} #{keys.inspect}"

# puts "Pruning stale connections: #{f.busy_fibers.size} #{f.fibers.size} #{keys.inspect}"
keys.each do |key|
next unless cache.has_key?(key)
block.call(key, cache[key])
cache.delete(key)
end
end

def checkout_and_verify(c)
# The next three methods (#checkout_new_connection, #checkout_existing_connection and #checkout_and_verify) require modification.
# The reason is because @connection_mutex.synchronize was modified to do nothing, which means #checkout is unguarded. It was
# assumed that was ok because the current fiber wouldn't yield during execution of #checkout, but that is untrue. Both #new_connection
# and #checkout_and_verify will yield the current fiber, thus allowing the body of #checkout to be accessed by multiple fibers at once.
# So if we want this to work without a lock, we need to make sure that the variables used to test the conditions in #checkout are
# modified *before* the current fiber is yielded and the next fiber enters #checkout.

def checkout_new_connection

# #new_connection will yield the current fiber, thus we need to fill @connections and @checked_out with placeholders so
# that the next fiber to enter #checkout will take the appropriate action. Once we actually have our connection, we
# replace the placeholders with it.

@connections << current_connection_id
@checked_out << current_connection_id

c = new_connection

@connections[@connections.index(current_connection_id)] = c
@checked_out[@checked_out.index(current_connection_id)] = c

checkout_and_verify(c)
end

def checkout_existing_connection
c = (@connections - @checked_out).first
@checked_out << c
checkout_and_verify(c)
end

def checkout_and_verify(c)
c.run_callbacks :checkout
c.verify!
c
end
end

end
end
end

0 comments on commit 8d9b0d4

Please sign in to comment.