Skip to content

Commit

Permalink
Merge pull request igrigorik#87 from helios-technologies/amqp
Browse files Browse the repository at this point in the history
AMQP support
  • Loading branch information
igrigorik committed Nov 27, 2011
2 parents 9521261 + 08450e6 commit 0ab0f07
Show file tree
Hide file tree
Showing 4 changed files with 253 additions and 0 deletions.
1 change: 1 addition & 0 deletions Gemfile
Expand Up @@ -14,4 +14,5 @@ group :development do
gem 'em-redis', '~> 0.3.0'
gem 'em-hiredis'
gem 'mongo'
gem 'amqp'
end
151 changes: 151 additions & 0 deletions lib/em-synchrony/amqp.rb
@@ -0,0 +1,151 @@
begin
require "amqp"
require "amq/protocol"
rescue LoadError => error
raise "Missing EM-Synchrony dependency: gem install amqp"
end

module EventMachine
module Synchrony
module AMQP
class Error < RuntimeError; end

class << self
def sync &blk
fiber = Fiber.current
blk.call(fiber)
Fiber.yield
end

def sync_cb fiber
Proc.new do |*args|
if fiber == Fiber.current
return *args
else
fiber.resume *args
end
end
end

%w[connect start run].each do |type|
line = __LINE__ + 2
code = <<-EOF
def #{type}(*params)
sync { |f| ::AMQP.#{type}(*params, &sync_cb(f)) }
end
EOF
module_eval(code, __FILE__, line)
end
end

class Channel < ::AMQP::Channel
def initialize(*params, &block)
f = Fiber.current
super(*params, &EM::Synchrony::AMQP.sync_cb(f))
channel, open_ok = Fiber.yield
raise Error.new unless open_ok.is_a?(::AMQ::Protocol::Channel::OpenOk)
channel
end

%w[direct fanout topic headers].each do |type|
line = __LINE__ + 2
code = <<-EOF
alias :a#{type} :#{type}
def #{type}(name = 'amq.#{type}', opts = {})
if exchange = find_exchange(name)
extended_opts = Exchange.add_default_options(:#{type}, name, opts, nil)
validate_parameters_match!(exchange, extended_opts)
exchange
else
register_exchange(Exchange.new(self, :#{type}, name, opts))
end
end
EOF
module_eval(code, __FILE__, line)
end

alias :aqueue! :queue!
def queue!(name, opts = {})
queue = Queue.new(self, name, opts)
register_queue(queue)
end

%w[queue flow prefetch recover tx_select tx_commit tx_rollback reset]
.each do |type|
line = __LINE__ + 2
code = <<-EOF
alias :a#{type} :#{type}
def #{type}(*params)
EM::Synchrony::AMQP.sync { |f| self.a#{type}(*params, &EM::Synchrony::AMQP.sync_cb(f)) }
end
EOF
module_eval(code, __FILE__, line)
end
end

class Exchange < ::AMQP::Exchange
def initialize(channel, type, name, opts = {}, &block)
f = Fiber.current
super(channel, type, name, opts, &EM::Synchrony::AMQP.sync_cb(f))
exchange, declare_ok = Fiber.yield
raise Error.new unless declare_ok.is_a?(::AMQ::Protocol::Exchange::DeclareOk)
exchange
end

%w[publish delete].each do |type|
line = __LINE__ + 2
code = <<-EOF
alias :a#{type} :#{type}
def #{type}(*params)
EM::Synchrony::AMQP.sync { |f| self.a#{type}(*params, &EM::Synchrony::AMQP.sync_cb(f)) }
end
EOF
module_eval(code, __FILE__, line)
end
end

class Queue < ::AMQP::Queue
def initialize(*params)
f = Fiber.current
super(*params, &EM::Synchrony::AMQP.sync_cb(f))
queue, declare_ok = Fiber.yield
raise Error.new unless declare_ok.is_a?(::AMQ::Protocol::Queue::DeclareOk)
queue
end

alias :asubscribe :subscribe
def subscribe &block
Fiber.new do
asubscribe(&EM::Synchrony::AMQP.sync_cb(Fiber.current))
loop { block.call(Fiber.yield) }
end.resume
end

%w[bind rebind unbind delete purge pop unsubscribe status].each do |type|
line = __LINE__ + 2
code = <<-EOF
alias :a#{type} :#{type}
def #{type}(*params)
EM::Synchrony::AMQP.sync { |f| self.a#{type}(*params, &EM::Synchrony::AMQP.sync_cb(f)) }
end
EOF
module_eval(code, __FILE__, line)
end
end

class Session < ::AMQP::Session
%w[disconnect].each do |type|
line = __LINE__ + 2
code = <<-EOF
alias :a#{type} :#{type}
def #{type}(*params)
EM::Synchrony::AMQP.sync { |f| self.a#{type}(*params, &EM::Synchrony::AMQP.sync_cb(f)) }
end
EOF
module_eval(code, __FILE__, line)
end
end

end
end
end
100 changes: 100 additions & 0 deletions spec/amqp_spec.rb
@@ -0,0 +1,100 @@
require "spec/helper/all"
require 'pp'
require 'ruby-debug'

describe EM::Synchrony::AMQP do

it "should yield until connection is ready" do
EM.synchrony do
connection = EM::Synchrony::AMQP.connect
connection.connected?.should be_true
EM.stop
end
end

it "should yield until disconnection is complete" do
EM.synchrony do
connection = EM::Synchrony::AMQP.connect
connection.disconnect
connection.connected?.should be_false
EM.stop
end
end

it "should yield until the channel is created" do
EM.synchrony do
connection = EM::Synchrony::AMQP.connect
channel = EM::Synchrony::AMQP::Channel.new(connection)
channel.should be_kind_of(EM::Synchrony::AMQP::Channel)
EM.stop
end
end

it "should yield until the queue is created" do
EM.synchrony do
connection = EM::Synchrony::AMQP.connect
channel = EM::Synchrony::AMQP::Channel.new(connection)
queue = EM::Synchrony::AMQP::Queue.new(channel, "test.em-synchrony.queue1", :auto_delete => true)
EM.stop
end
end

it "should yield until the exchange is created" do
EM.synchrony do
connection = EM::Synchrony::AMQP.connect
channel = EM::Synchrony::AMQP::Channel.new(connection)

exchange = EM::Synchrony::AMQP::Exchange.new(channel, :fanout, "test.em-synchrony.exchange")
exchange.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange)

direct = channel.fanout("test.em-synchrony.direct")
fanout = channel.fanout("test.em-synchrony.fanout")
topic = channel.fanout("test.em-synchrony.topic")
headers = channel.fanout("test.em-synchrony.headers")

direct.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange)
fanout.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange)
topic.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange)
headers.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange)
EM.stop
end
end

it "should publish and receive messages" do
publish_number = 10
EM.synchrony do
connection = EM::Synchrony::AMQP.connect
channel = EM::Synchrony::AMQP::Channel.new(connection)
ex = EM::Synchrony::AMQP::Exchange.new(channel, :fanout, "test.em-synchrony.fanout")

q1 = EM::Synchrony::AMQP::Queue.new(channel, "test.em-synchrony.queues.1", :auto_delete => true)
q2 = EM::Synchrony::AMQP::Queue.new(channel, "test.em-synchrony.queues.2", :auto_delete => true)

q1.bind(ex)
q2.bind(ex)

q1_nb, q2_nb = 0, 0
stop_cb = proc { EM.stop if q1_nb + q2_nb == 2 * publish_number }

q1.subscribe do |meta, msg|
msg.should match(/^Bonjour [0-9]+/)
q1_nb += 1
stop_cb.call
end

q2.subscribe do |meta, msg|
msg.should match(/^Bonjour [0-9]+/)
q2_nb += 1
stop_cb.call
end

Fiber.new do
publish_number.times do |n|
ex.publish("Bonjour #{n}")
EM::Synchrony.sleep(0.1)
end
end.resume
end
end

end
1 change: 1 addition & 0 deletions spec/helper/all.rb
Expand Up @@ -10,6 +10,7 @@
require 'lib/em-synchrony/em-mongo'
require 'lib/em-synchrony/em-redis'
require 'lib/em-synchrony/em-hiredis'
require 'lib/em-synchrony/amqp'

require 'helper/tolerance_matcher'
require 'helper/stub-http-server'
Expand Down

0 comments on commit 0ab0f07

Please sign in to comment.