From c7fe56c07b9680f10ff9aca11cdd9e1d4e2c9a36 Mon Sep 17 00:00:00 2001 From: Theo Date: Thu, 19 May 2011 21:27:57 +0200 Subject: [PATCH] Basic wrapping working --- .gitignore | 5 ++ Gemfile | 3 + Rakefile | 6 ++ hot_bunnies.gemspec | 24 +++++++ lib/hot_bunnies.rb | 136 +++++++++++++++++++++++++++++++++++++ lib/hot_bunnies/version.rb | 5 ++ 6 files changed, 179 insertions(+) create mode 100644 .gitignore create mode 100644 Gemfile create mode 100644 Rakefile create mode 100644 hot_bunnies.gemspec create mode 100644 lib/hot_bunnies.rb create mode 100644 lib/hot_bunnies/version.rb diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2f0a9ac --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +*.gem +.bundle +Gemfile.lock +pkg/* +.DS_Store diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..a1b93f3 --- /dev/null +++ b/Gemfile @@ -0,0 +1,3 @@ +source :rubygems + +gemspec diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..5237170 --- /dev/null +++ b/Rakefile @@ -0,0 +1,6 @@ +# encoding: utf-8 + +require 'bundler' + + +Bundler::GemHelper.install_tasks diff --git a/hot_bunnies.gemspec b/hot_bunnies.gemspec new file mode 100644 index 0000000..f269c67 --- /dev/null +++ b/hot_bunnies.gemspec @@ -0,0 +1,24 @@ +# encoding: utf-8 + +$: << File.expand_path('../lib', __FILE__) + +require 'hot_bunnies/version' + + +Gem::Specification.new do |s| + s.name = 'hot_bunnies' + s.version = HotBunnies::VERSION + s.platform = Gem::Platform::RUBY + s.authors = ['Theo Hultberg'] + s.email = ['theo@burtcorp.com'] + s.homepage = '' + s.summary = %q{Ruby wrapper for the RabbitMQ Java driver} + s.description = %q{A object oriented interface to RabbitMQ that uses the Java driver under the hood} + + s.rubyforge_project = 'hot_bunnies' + + s.files = `git ls-files`.split("\n") +# s.test_files = `git ls-files -- {test,spec,features}/*`.split("\n") +# s.executables = `git ls-files -- bin/*`.split("\n").map{ |f| File.basename(f) } + s.require_paths = %w(lib) +end diff --git a/lib/hot_bunnies.rb b/lib/hot_bunnies.rb new file mode 100644 index 0000000..ae4ca74 --- /dev/null +++ b/lib/hot_bunnies.rb @@ -0,0 +1,136 @@ +# encoding: utf-8 + +require 'java' +require 'ext/commons-io' +require 'ext/rabbitmq-client' + + +module HotBunnies + VERSION = '1.0.0' + + import com.rabbitmq.client.ConnectionFactory + import com.rabbitmq.client.Connection + import com.rabbitmq.client.Channel + import com.rabbitmq.client.DefaultConsumer + + def self.connect(options={}) + cf = ConnectionFactory.new + cf.host = options[:host] if options[:host] + cf.new_connection + end + + module Channel + def queue(name, options={}) + Queue.new(self, name, options) + end + + def exchange(name, options={}) + Exchange.new(self, name, options) + end + + def qos(options={}) + options = {:prefetch_size => 0, :prefetch_count => 0, :global => true}.merge(options) + end + end + + class Queue + attr_reader :name + + def initialize(channel, name, options={}) + @channel = channel + @name = name + @options = {:durable => false, :exclusive => false, :auto_delete => false}.merge(options) + declare! + end + + def bind(exchange, options={}) + exchange_name = if exchange.respond_to?(:name) then exchange.name else exchange.to_s end + @channel.queue_bind(@name, exchange_name, options.fetch(:routing_key, '')) + end + + def unbind(exchange, options={}) + exchange_name = if exchange.respond_to?(:name) then exchange.name else exchange.to_s end + @channel.queue_unbind(@name, exchange_name, options.fetch(:routing_key, '')) + end + + def delete + @channel.queue_delete(@name) + end + + def purge + @channel.queue_purge(@name) + end + + def subscribe(options={}, &subscriber) + @channel.basic_consume(@name, !options.fetch(:ack, false), ConsumerWrapper.new(@channel, &subscriber)) + end + + private + + class Headers + def initialize(channel, consumer_tag, envelope, properties) + @channel = channel + @consumer_tag = consumer_tag + @envelope = envelope + @properties = properties + end + + def ack(options={}) + @channel.basic_ack(@envelope.delivery_tag, options.fetch(:multiple, false)) + end + + def reject(options={}) + @channel.basic_ack(@envelope.delivery_tag, options.fetch(:requeue, false)) + end + end + + class ConsumerWrapper < DefaultConsumer + def initialize(channel, &subscriber) + super(channel) + @channel = channel + @subscriber = subscriber + end + + def handleDelivery(consumer_tag, envelope, properties, body_bytes) + body = java.lang.String.new(body_bytes).to_s + case @subscriber.arity + when 2 then @subscriber.call(Headers.new(@channel, consumer_tag, envelope, properties), body) + when 1 then @subscriber.call(body) + else raise ArgumentError, 'Consumer callback wants no arguments' + end + end + end + + def declare! + @channel.queue_declare(@name, @options[:durable], @options[:exclusive], @options[:auto_delete], nil) + end + end + + class Exchange + attr_reader :name + + def initialize(channel, name, options={}) + @channel = channel + @name = name + @options = {:type => :fanout, :durable => false, :auto_delete => false, :internal => false}.merge(options) + declare! + end + + def publish(body, options={}) + options = {:routing_key => '', :mandatory => false, :immediate => false}.merge(options) + @channel.basic_publish(@name, options[:routing_key], options[:mandatory], options[:immediate], nil, body.to_java.bytes) + end + + def delete(options={}) + @channel.exchange_delete(@name, options.fetch(:if_unused, false)) + end + + private + + def declare! + unless @name == '' + @channel.exchange_declare(@name, @options[:type].to_s, @options[:durable], @options[:auto_delete], @options[:internal], nil) + end + end + end +end diff --git a/lib/hot_bunnies/version.rb b/lib/hot_bunnies/version.rb new file mode 100644 index 0000000..fbed3d1 --- /dev/null +++ b/lib/hot_bunnies/version.rb @@ -0,0 +1,5 @@ +# encoding: utf-8 + +module HotBunnies + VERSION = '1.0.0' +end