diff --git a/Gemfile b/Gemfile index c9a97a1..3938800 100644 --- a/Gemfile +++ b/Gemfile @@ -7,6 +7,7 @@ group :development do gem 'em-http-request', :git => 'git://github.com/igrigorik/em-http-request' gem 'remcached' # gem 'em-mongo', :git => 'https://github.com/bcg/em-mongo.git' + gem 'activerecord', '>= 3.1.0.rc6' gem 'em-mongo', '~> 0.3.6' gem 'bson_ext' gem 'mysql2' diff --git a/lib/em-synchrony/active_record/connection_adapters/em_mysql2_adapter.rb b/lib/em-synchrony/active_record/connection_adapters/em_mysql2_adapter.rb new file mode 100644 index 0000000..0f2c67f --- /dev/null +++ b/lib/em-synchrony/active_record/connection_adapters/em_mysql2_adapter.rb @@ -0,0 +1,18 @@ +# encoding: utf-8 + +# AR adapter for using a fibered mysql2 connection with EM +# This adapter should be used within Thin or Unicorn with the rack-fiber_pool middleware. +# Just update your database.yml's adapter to be 'em_mysql2' + +require 'active_record/connection_adapters/abstract_adapter' +require 'active_record/connection_adapters/mysql2_adapter' + +module ActiveRecord + class Base + def self.em_mysql2_connection(config) + client = Mysql2::EM::Client.new(config.symbolize_keys) + options = [config[:host], config[:username], config[:password], config[:database], config[:port], config[:socket], 0] + ConnectionAdapters::Mysql2Adapter.new(client, logger, options, config) + end + end +end diff --git a/lib/em-synchrony/active_record/patches.rb b/lib/em-synchrony/active_record/patches.rb new file mode 100644 index 0000000..0d3620d --- /dev/null +++ b/lib/em-synchrony/active_record/patches.rb @@ -0,0 +1,132 @@ +# Necessary monkeypatching to make AR fiber-friendly. + +module ActiveRecord + module ConnectionAdapters + + def self.fiber_pools + @fiber_pools ||= [] + end + def self.register_fiber_pool(fp) + fiber_pools << fp + end + + class FiberedMonitor + class Queue + def initialize + @queue = [] + end + + def wait(timeout) + t = timeout || 5 + fiber = Fiber.current + x = EM::Timer.new(t) do + @queue.delete(fiber) + fiber.resume(false) + end + @queue << fiber + Fiber.yield.tap do + x.cancel + end + end + + def signal + fiber = @queue.pop + fiber.resume(true) if fiber + end + end + + def synchronize + yield + end + + def new_cond + Queue.new + end + end + + # ActiveRecord's connection pool is based on threads. Since we are working + # with EM and a single thread, multiple fiber design, we need to provide + # our own connection pool that keys off of Fiber.current so that different + # fibers running in the same thread don't try to use the same connection. + class ConnectionPool + def initialize(spec) + @spec = spec + + # The cache of reserved connections mapped to threads + @reserved_connections = {} + + # The mutex used to synchronize pool access + @connection_mutex = FiberedMonitor.new + @queue = @connection_mutex.new_cond + + # default 5 second timeout unless on ruby 1.9 + @timeout = spec.config[:wait_timeout] || 5 + + # default max pool size to 5 + @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5 + + @connections = [] + @checked_out = [] + @automatic_reconnect = true + @tables = {} + + @columns = Hash.new do |h, table_name| + h[table_name] = with_connection do |conn| + + # Fetch a list of columns + conn.columns(table_name, "#{table_name} Columns").tap do |columns| + + # set primary key information + columns.each do |column| + column.primary = column.name == primary_keys[table_name] + end + end + end + end + + @columns_hash = Hash.new do |h, table_name| + h[table_name] = Hash[columns[table_name].map { |col| + [col.name, col] + }] + end + + @primary_keys = Hash.new do |h, table_name| + h[table_name] = with_connection do |conn| + table_exists?(table_name) ? conn.primary_key(table_name) : 'id' + end + end + end + + def clear_stale_cached_connections! + cache = @reserved_connections + keys = Set.new(cache.keys) + + ActiveRecord::ConnectionAdapters.fiber_pools.each do |pool| + pool.busy_fibers.each_pair do |object_id, fiber| + keys.delete(object_id) + end + end + + keys.each do |key| + next unless cache.has_key?(key) + checkin cache[key] + cache.delete(key) + end + end + + private + + def current_connection_id #:nodoc: + Fiber.current.object_id + end + + def checkout_and_verify(c) + @checked_out << c + c.run_callbacks :checkout + c.verify! + c + end + end + + end +end \ No newline at end of file diff --git a/lib/em-synchrony/activerecord.rb b/lib/em-synchrony/activerecord.rb new file mode 100644 index 0000000..9baecff --- /dev/null +++ b/lib/em-synchrony/activerecord.rb @@ -0,0 +1,5 @@ +require 'active_record' +require 'em-synchrony/mysql2' + +require 'em-synchrony/active_record/connection_adapters/em_mysql2_adapter' +require 'em-synchrony/active_record/patches' diff --git a/spec/activerecord_spec.rb b/spec/activerecord_spec.rb new file mode 100644 index 0000000..2ad9680 --- /dev/null +++ b/spec/activerecord_spec.rb @@ -0,0 +1,50 @@ +require "spec/helper/all" +require "em-synchrony/activerecord" + +# create database widgets; +# use widgets; +# create table widgets (idx INT); + +class Widget < ActiveRecord::Base; end; + +describe "Fiberized ActiveRecord driver for mysql2" do + DELAY = 0.25 + QUERY = "SELECT sleep(#{DELAY})" + + it "should establish AR connection" do + EventMachine.synchrony do + ActiveRecord::Base.establish_connection( + :adapter => 'em_mysql2', + :database => 'widgets', + :username => 'root' + ) + + result = Widget.find_by_sql(QUERY) + result.size.should == 1 + + EventMachine.stop + end + end + + it "should fire sequential, synchronous requests within single fiber" do + EventMachine.synchrony do + ActiveRecord::Base.establish_connection( + :adapter => 'em_mysql2', + :database => 'widgets', + :username => 'root' + ) + + start = now + res = [] + + res.push Widget.find_by_sql(QUERY) + res.push Widget.find_by_sql(QUERY) + + (now - start.to_f).should be_within(DELAY * res.size * 0.15).of(DELAY * res.size) + res.size.should == 2 + + EventMachine.stop + end + end + +end \ No newline at end of file