forked from igrigorik/em-synchrony
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
206 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
18 changes: 18 additions & 0 deletions
18
lib/em-synchrony/active_record/connection_adapters/em_mysql2_adapter.rb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
# encoding: utf-8 | ||
|
||
# AR adapter for using a fibered mysql2 connection with EM | ||
# This adapter should be used within Thin or Unicorn with the rack-fiber_pool middleware. | ||
# Just update your database.yml's adapter to be 'em_mysql2' | ||
|
||
require 'active_record/connection_adapters/abstract_adapter' | ||
require 'active_record/connection_adapters/mysql2_adapter' | ||
|
||
module ActiveRecord | ||
class Base | ||
def self.em_mysql2_connection(config) | ||
client = Mysql2::EM::Client.new(config.symbolize_keys) | ||
options = [config[:host], config[:username], config[:password], config[:database], config[:port], config[:socket], 0] | ||
ConnectionAdapters::Mysql2Adapter.new(client, logger, options, config) | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
# Necessary monkeypatching to make AR fiber-friendly. | ||
|
||
module ActiveRecord | ||
module ConnectionAdapters | ||
|
||
def self.fiber_pools | ||
@fiber_pools ||= [] | ||
end | ||
def self.register_fiber_pool(fp) | ||
fiber_pools << fp | ||
end | ||
|
||
class FiberedMonitor | ||
class Queue | ||
def initialize | ||
@queue = [] | ||
end | ||
|
||
def wait(timeout) | ||
t = timeout || 5 | ||
fiber = Fiber.current | ||
x = EM::Timer.new(t) do | ||
@queue.delete(fiber) | ||
fiber.resume(false) | ||
end | ||
@queue << fiber | ||
Fiber.yield.tap do | ||
x.cancel | ||
end | ||
end | ||
|
||
def signal | ||
fiber = @queue.pop | ||
fiber.resume(true) if fiber | ||
end | ||
end | ||
|
||
def synchronize | ||
yield | ||
end | ||
|
||
def new_cond | ||
Queue.new | ||
end | ||
end | ||
|
||
# ActiveRecord's connection pool is based on threads. Since we are working | ||
# with EM and a single thread, multiple fiber design, we need to provide | ||
# our own connection pool that keys off of Fiber.current so that different | ||
# fibers running in the same thread don't try to use the same connection. | ||
class ConnectionPool | ||
def initialize(spec) | ||
@spec = spec | ||
|
||
# The cache of reserved connections mapped to threads | ||
@reserved_connections = {} | ||
|
||
# The mutex used to synchronize pool access | ||
@connection_mutex = FiberedMonitor.new | ||
@queue = @connection_mutex.new_cond | ||
|
||
# default 5 second timeout unless on ruby 1.9 | ||
@timeout = spec.config[:wait_timeout] || 5 | ||
|
||
# default max pool size to 5 | ||
@size = (spec.config[:pool] && spec.config[:pool].to_i) || 5 | ||
|
||
@connections = [] | ||
@checked_out = [] | ||
@automatic_reconnect = true | ||
@tables = {} | ||
|
||
@columns = Hash.new do |h, table_name| | ||
h[table_name] = with_connection do |conn| | ||
|
||
# Fetch a list of columns | ||
conn.columns(table_name, "#{table_name} Columns").tap do |columns| | ||
|
||
# set primary key information | ||
columns.each do |column| | ||
column.primary = column.name == primary_keys[table_name] | ||
end | ||
end | ||
end | ||
end | ||
|
||
@columns_hash = Hash.new do |h, table_name| | ||
h[table_name] = Hash[columns[table_name].map { |col| | ||
[col.name, col] | ||
}] | ||
end | ||
|
||
@primary_keys = Hash.new do |h, table_name| | ||
h[table_name] = with_connection do |conn| | ||
table_exists?(table_name) ? conn.primary_key(table_name) : 'id' | ||
end | ||
end | ||
end | ||
|
||
def clear_stale_cached_connections! | ||
cache = @reserved_connections | ||
keys = Set.new(cache.keys) | ||
|
||
ActiveRecord::ConnectionAdapters.fiber_pools.each do |pool| | ||
pool.busy_fibers.each_pair do |object_id, fiber| | ||
keys.delete(object_id) | ||
end | ||
end | ||
|
||
keys.each do |key| | ||
next unless cache.has_key?(key) | ||
checkin cache[key] | ||
cache.delete(key) | ||
end | ||
end | ||
|
||
private | ||
|
||
def current_connection_id #:nodoc: | ||
Fiber.current.object_id | ||
end | ||
|
||
def checkout_and_verify(c) | ||
@checked_out << c | ||
c.run_callbacks :checkout | ||
c.verify! | ||
c | ||
end | ||
end | ||
|
||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
require 'active_record' | ||
require 'em-synchrony/mysql2' | ||
|
||
require 'em-synchrony/active_record/connection_adapters/em_mysql2_adapter' | ||
require 'em-synchrony/active_record/patches' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
require "spec/helper/all" | ||
require "em-synchrony/activerecord" | ||
|
||
# create database widgets; | ||
# use widgets; | ||
# create table widgets (idx INT); | ||
|
||
class Widget < ActiveRecord::Base; end; | ||
|
||
describe "Fiberized ActiveRecord driver for mysql2" do | ||
DELAY = 0.25 | ||
QUERY = "SELECT sleep(#{DELAY})" | ||
|
||
it "should establish AR connection" do | ||
EventMachine.synchrony do | ||
ActiveRecord::Base.establish_connection( | ||
:adapter => 'em_mysql2', | ||
:database => 'widgets', | ||
:username => 'root' | ||
) | ||
|
||
result = Widget.find_by_sql(QUERY) | ||
result.size.should == 1 | ||
|
||
EventMachine.stop | ||
end | ||
end | ||
|
||
it "should fire sequential, synchronous requests within single fiber" do | ||
EventMachine.synchrony do | ||
ActiveRecord::Base.establish_connection( | ||
:adapter => 'em_mysql2', | ||
:database => 'widgets', | ||
:username => 'root' | ||
) | ||
|
||
start = now | ||
res = [] | ||
|
||
res.push Widget.find_by_sql(QUERY) | ||
res.push Widget.find_by_sql(QUERY) | ||
|
||
(now - start.to_f).should be_within(DELAY * res.size * 0.15).of(DELAY * res.size) | ||
res.size.should == 2 | ||
|
||
EventMachine.stop | ||
end | ||
end | ||
|
||
end |