Skip to content

Commit

Permalink
Basic wrapping working
Browse files Browse the repository at this point in the history
  • Loading branch information
iconara committed May 19, 2011
0 parents commit c7fe56c
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .gitignore
@@ -0,0 +1,5 @@
*.gem
.bundle
Gemfile.lock
pkg/*
.DS_Store
3 changes: 3 additions & 0 deletions Gemfile
@@ -0,0 +1,3 @@
source :rubygems

gemspec
6 changes: 6 additions & 0 deletions Rakefile
@@ -0,0 +1,6 @@
# encoding: utf-8

require 'bundler'


Bundler::GemHelper.install_tasks
24 changes: 24 additions & 0 deletions hot_bunnies.gemspec
@@ -0,0 +1,24 @@
# encoding: utf-8

$: << File.expand_path('../lib', __FILE__)

require 'hot_bunnies/version'


Gem::Specification.new do |s|
s.name = 'hot_bunnies'
s.version = HotBunnies::VERSION
s.platform = Gem::Platform::RUBY
s.authors = ['Theo Hultberg']
s.email = ['theo@burtcorp.com']
s.homepage = ''
s.summary = %q{Ruby wrapper for the RabbitMQ Java driver}
s.description = %q{A object oriented interface to RabbitMQ that uses the Java driver under the hood}

s.rubyforge_project = 'hot_bunnies'

s.files = `git ls-files`.split("\n")
# s.test_files = `git ls-files -- {test,spec,features}/*`.split("\n")
# s.executables = `git ls-files -- bin/*`.split("\n").map{ |f| File.basename(f) }
s.require_paths = %w(lib)
end
136 changes: 136 additions & 0 deletions lib/hot_bunnies.rb
@@ -0,0 +1,136 @@
# encoding: utf-8

require 'java'
require 'ext/commons-io'
require 'ext/rabbitmq-client'


module HotBunnies
VERSION = '1.0.0'

import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.Connection
import com.rabbitmq.client.Channel
import com.rabbitmq.client.DefaultConsumer

def self.connect(options={})
cf = ConnectionFactory.new
cf.host = options[:host] if options[:host]
cf.new_connection
end

module Channel
def queue(name, options={})
Queue.new(self, name, options)
end

def exchange(name, options={})
Exchange.new(self, name, options)
end

def qos(options={})
options = {:prefetch_size => 0, :prefetch_count => 0, :global => true}.merge(options)
end
end

class Queue
attr_reader :name

def initialize(channel, name, options={})
@channel = channel
@name = name
@options = {:durable => false, :exclusive => false, :auto_delete => false}.merge(options)
declare!
end

def bind(exchange, options={})
exchange_name = if exchange.respond_to?(:name) then exchange.name else exchange.to_s end
@channel.queue_bind(@name, exchange_name, options.fetch(:routing_key, ''))
end

def unbind(exchange, options={})
exchange_name = if exchange.respond_to?(:name) then exchange.name else exchange.to_s end
@channel.queue_unbind(@name, exchange_name, options.fetch(:routing_key, ''))
end

def delete
@channel.queue_delete(@name)
end

def purge
@channel.queue_purge(@name)
end

def subscribe(options={}, &subscriber)
@channel.basic_consume(@name, !options.fetch(:ack, false), ConsumerWrapper.new(@channel, &subscriber))
end

private

class Headers
def initialize(channel, consumer_tag, envelope, properties)
@channel = channel
@consumer_tag = consumer_tag
@envelope = envelope
@properties = properties
end

def ack(options={})
@channel.basic_ack(@envelope.delivery_tag, options.fetch(:multiple, false))
end

def reject(options={})
@channel.basic_ack(@envelope.delivery_tag, options.fetch(:requeue, false))
end
end

class ConsumerWrapper < DefaultConsumer
def initialize(channel, &subscriber)
super(channel)
@channel = channel
@subscriber = subscriber
end

def handleDelivery(consumer_tag, envelope, properties, body_bytes)
body = java.lang.String.new(body_bytes).to_s
case @subscriber.arity
when 2 then @subscriber.call(Headers.new(@channel, consumer_tag, envelope, properties), body)
when 1 then @subscriber.call(body)
else raise ArgumentError, 'Consumer callback wants no arguments'
end
end
end

def declare!
@channel.queue_declare(@name, @options[:durable], @options[:exclusive], @options[:auto_delete], nil)
end
end

class Exchange
attr_reader :name

def initialize(channel, name, options={})
@channel = channel
@name = name
@options = {:type => :fanout, :durable => false, :auto_delete => false, :internal => false}.merge(options)
declare!
end

def publish(body, options={})
options = {:routing_key => '', :mandatory => false, :immediate => false}.merge(options)
@channel.basic_publish(@name, options[:routing_key], options[:mandatory], options[:immediate], nil, body.to_java.bytes)
end

def delete(options={})
@channel.exchange_delete(@name, options.fetch(:if_unused, false))
end

private

def declare!
unless @name == ''
@channel.exchange_declare(@name, @options[:type].to_s, @options[:durable], @options[:auto_delete], @options[:internal], nil)
end
end
end
end
5 changes: 5 additions & 0 deletions lib/hot_bunnies/version.rb
@@ -0,0 +1,5 @@
# encoding: utf-8

module HotBunnies
VERSION = '1.0.0'
end

0 comments on commit c7fe56c

Please sign in to comment.