Permalink
Browse files

MQ -> Channel.

  • Loading branch information...
1 parent acc059d commit 67815ad9e833db958f5d1c3bcdc47d9fd775d6bf @botanicus botanicus committed Jan 19, 2011
View
@@ -27,4 +27,4 @@
- handle connection.redirect during connect (for rabbitmq in distributed mode) [or just set insist to true]
- add amq.queue('name').size{ |num| "#{num} messages in the queue" } (send declare passive, look at declare-ok response)
-- clean up MQ.default on disconnect
+- clean up AMQP::Channel.default on disconnect
View
@@ -20,7 +20,7 @@ Gem::Specification.new do |s|
# RDoc
s.has_rdoc = true
- s.rdoc_options = '--include=examples'
+ s.rdoc_options = '--include=examples --main README.md'
s.extra_rdoc_files = ["README.md"] + Dir.glob("doc/*")
# Dependencies
View
@@ -9,14 +9,14 @@
Signal.trap('TERM') { AMQP.stop { EM.stop } }
AMQP.start(:host => 'localhost') do
- MQ.queue('awesome').publish('Totally rad 1')
- MQ.queue('awesome').publish('Totally rad 2')
- MQ.queue('awesome').publish('Totally rad 3')
+ AMQP::Channel.queue('awesome').publish('Totally rad 1')
+ AMQP::Channel.queue('awesome').publish('Totally rad 2')
+ AMQP::Channel.queue('awesome').publish('Totally rad 3')
i = 0
# Stopping after the second item was acked will keep the 3rd item in the queue
- MQ.queue('awesome').subscribe(:ack => true) do |h, m|
+ AMQP::Channel.queue('awesome').subscribe(:ack => true) do |h, m|
if (i += 1) == 3
puts 'Shutting down...'
AMQP.stop { EM.stop }
@@ -20,7 +20,7 @@ class Array
EM.run do
connection = AMQP.connect
- mq = MQ.new(connection)
+ mq = AMQP::Channel.new(connection)
show_stopper = Proc.new do
$stdout.puts "Stopping..."
@@ -42,7 +42,7 @@ class Array
queues = [queue1, queue2, queue3]
# Rely on default direct exchange binding, see section 2.1.2.4 Automatic Mode in AMQP 0.9.1 spec.
- exchange = MQ::Exchange.default
+ exchange = AMQP::Channel::Exchange.default
queue1.subscribe do |payload|
puts "Got #{payload} for #{queue1.name}"
@@ -13,7 +13,7 @@
end
@counter = 0
- amq = MQ.new
+ amq = AMQP::Channel.new
3.times do
amq.queue("") do |queue|
View
@@ -18,20 +18,20 @@ def log(*args)
# AMQP.logging = true
- clock = MQ.new.fanout('clock')
+ clock = AMQP::Channel.new.fanout('clock')
EM.add_periodic_timer(1) {
puts
log :publishing, time = Time.now
clock.publish(Marshal.dump(time))
}
- amq = MQ.new
+ amq = AMQP::Channel.new
amq.queue('every second').bind(amq.fanout('clock')).subscribe { |time|
log 'every second', :received, Marshal.load(time)
}
- amq = MQ.new
+ amq = AMQP::Channel.new
amq.queue('every 5 seconds').bind(amq.fanout('clock')).subscribe { |time|
time = Marshal.load(time)
log 'every 5 seconds', :received, time if time.strftime('%S').to_i % 5 == 0
@@ -35,9 +35,9 @@ def keys
end
end
- server = MQ.new.rpc('hash table node', HashTable.new)
+ server = AMQP::Channel.new.rpc('hash table node', HashTable.new)
- client = MQ.new.rpc('hash table node')
+ client = AMQP::Channel.new.rpc('hash table node')
client.set(:now, time = Time.now)
client.get(:now) do |res|
log 'client', :now => res, :eql? => res == time
@@ -10,13 +10,13 @@
connection = AMQP.connect(:host => 'localhost', :logging => false)
# open a channel on the AMQP connection
- channel = MQ.new(connection)
+ channel = AMQP::Channel.new(connection)
# declare a queue on the channel
- queue = MQ::Queue.new(channel, 'queue name')
+ queue = AMQP::Channel::Queue.new(channel, 'queue name')
# create a fanout exchange
- exchange = MQ::Exchange.new(channel, :fanout, 'all queues')
+ exchange = AMQP::Channel::Exchange.new(channel, :fanout, 'all queues')
# bind the queue to the exchange
queue.bind(exchange)
View
@@ -4,12 +4,12 @@
require 'amqp'
require 'amqp/logger'
-Logger = MQ::Logger
+Logger = AMQP::Channel::Logger
AMQP.start(:host => 'localhost') do
if ARGV[0] == 'server'
- MQ.queue('logger').bind(MQ.fanout('logging', :durable => true)).subscribe { |msg|
+ AMQP::Channel.queue('logger').bind(AMQP::Channel.fanout('logging', :durable => true)).subscribe { |msg|
msg = Marshal.load(msg)
require 'pp'
pp(msg)
@@ -19,7 +19,7 @@ def log(*args)
#AMQP.logging = true
- clock = MQ.new.headers('multiformat_clock')
+ clock = AMQP::Channel.new.headers('multiformat_clock')
EM.add_periodic_timer(1) {
puts
@@ -32,7 +32,7 @@ def log(*args)
}
["iso8601", "rfc2822"].each do |format|
- amq = MQ.new
+ amq = AMQP::Channel.new
amq.queue(format.to_s).bind(amq.headers('multiformat_clock'), :arguments => {"format" => format}).subscribe { |time|
log "received #{format}", time
}
@@ -18,21 +18,21 @@ def log(*args)
# AMQP.logging = true
- amq = MQ.new
+ amq = AMQP::Channel.new
EM.add_periodic_timer(1) {
puts
log :sending, 'ping'
amq.queue('one').publish('ping')
}
- amq = MQ.new
+ amq = AMQP::Channel.new
amq.queue('one').subscribe { |msg|
log 'one', :received, msg, :sending, 'pong'
amq.queue('two').publish('pong')
}
- amq = MQ.new
+ amq = AMQP::Channel.new
amq.queue('two').subscribe { |msg|
log 'two', :received, msg
}
View
@@ -8,7 +8,7 @@
Signal.trap('TERM') { AMQP.stop { EM.stop } }
AMQP.start do
- queue = MQ.queue('awesome')
+ queue = AMQP::Channel.queue('awesome')
queue.publish('Totally rad 1')
queue.publish('Totally rad 2')
View
@@ -15,7 +15,7 @@ def log(*args)
workers = ARGV[0] ? (Integer(ARGV[0]) rescue 1) : 1
AMQP.fork(workers) do
- log MQ.id, :started
+ log AMQP::Channel.id, :started
class Fixnum
def prime?
@@ -25,19 +25,19 @@ def prime?
class PrimeChecker
def is_prime? number
- log "prime checker #{MQ.id}", :prime?, number
+ log "prime checker #{AMQP::Channel.id}", :prime?, number
number.prime?
end
end
- MQ.rpc('prime checker', PrimeChecker.new)
+ AMQP::Channel.rpc('prime checker', PrimeChecker.new)
end
# use workers to check which numbers are prime
AMQP.start(:host => 'localhost') do
- prime_checker = MQ.rpc('prime checker')
+ prime_checker = AMQP::Channel.rpc('prime checker')
(10_000...(10_000+MAX)).each do |num|
log :checking, num
View
@@ -17,7 +17,7 @@ def log(*args)
end
def publish_stock_prices
- mq = MQ.new
+ mq = AMQP::Channel.new
EM.add_periodic_timer(1) {
puts
@@ -33,14 +33,14 @@ def publish_stock_prices
end
def watch_appl_stock
- mq = MQ.new
+ mq = AMQP::Channel.new
mq.queue('apple stock').bind(mq.topic('stocks'), :key => 'usd.appl').subscribe { |price|
log 'apple stock', price
}
end
def watch_us_stocks
- mq = MQ.new
+ mq = AMQP::Channel.new
mq.queue('us stocks').bind(mq.topic('stocks'), :key => 'usd.*').subscribe { |info, price|
log 'us stock', info.routing_key, price
}
Oops, something went wrong.

0 comments on commit 67815ad

Please sign in to comment.