Permalink
Browse files

try using bunnies on 1.9.3

  • Loading branch information...
1 parent 575d71f commit 136a54e212784926e2592b647eaed219e726d468 @svenfuchs svenfuchs committed Sep 3, 2012
Showing with 147 additions and 11 deletions.
  1. +12 −8 Gemfile
  2. +20 −1 Gemfile.lock
  3. +74 −0 lib/consumer.rb
  4. +6 −2 lib/travis/logs.rb
  5. +29 −0 play/bunny.rb
  6. +6 −0 play/hot_bunnies.rb
View
20 Gemfile
@@ -8,15 +8,19 @@ gem 'travis-support', :git => 'git://github.com/travis-ci/travis-support'
gem 'hubble', :git => 'git://github.com/roidrage/hubble'
gem 'newrelic_rpm', '~> 3.3.2'
-gem 'bunny'
-gem 'pg'
+platforms :mri do
+ gem 'bunny'
+ gem 'pg'
+end
-# can't be removed yet, even though we're on jruby 1.6.7 everywhere
-# this is due to Invalid gemspec errors
-# gem 'rollout', :git => 'git://github.com/jamesgolick/rollout', :ref => 'v1.1.0'
-# gem 'hot_bunnies', '~> 1.3.4'
-# gem 'jruby-openssl', '~> 0.7.4'
-# gem 'activerecord-jdbcpostgresql-adapter', '~> 1.2.2'
+platforms :jruby do
+ # can't be removed yet, even though we're on jruby 1.6.7 everywhere
+ # this is due to Invalid gemspec errors
+ gem 'rollout', :git => 'git://github.com/jamesgolick/rollout', :ref => 'v1.1.0'
+ gem 'hot_bunnies', '~> 1.3.4'
+ gem 'jruby-openssl', '~> 0.7.4'
+ gem 'activerecord-jdbcpostgresql-adapter', '~> 1.2.2'
+end
group :test do
gem 'rspec', '~> 2.7.0'
View
@@ -1,3 +1,10 @@
+GIT
+ remote: git://github.com/jamesgolick/rollout
+ revision: 3e7312cc018061b1ac6a8aeb9f11bbd0331da889
+ ref: v1.1.0
+ specs:
+ rollout (1.1.0)
+
GIT
remote: git://github.com/roidrage/hubble
revision: 5220415d5542a2868d54f7be9f35fc1d66126b8e
@@ -56,6 +63,10 @@ GEM
activesupport (= 3.2.8)
arel (~> 3.0.2)
tzinfo (~> 0.3.29)
+ activerecord-jdbc-adapter (1.2.2)
+ activerecord-jdbcpostgresql-adapter (1.2.2)
+ activerecord-jdbc-adapter (~> 1.2.2)
+ jdbc-postgres (>= 9.0, < 9.2)
activesupport (3.2.8)
i18n (~> 0.6)
multi_json (~> 1.0)
@@ -65,6 +76,7 @@ GEM
atomic (1.0.1-java)
avl_tree (1.1.3)
backports (2.6.4)
+ bouncy-castle-java (1.5.0146.1)
builder (3.0.0)
bunny (0.8.0)
crack (0.3.1)
@@ -92,8 +104,12 @@ GEM
hike (1.2.1)
hitimes (1.1.1)
hitimes (1.1.1-java)
+ hot_bunnies (1.3.8-java)
i18n (0.6.1)
+ jdbc-postgres (9.1.901)
journey (1.0.4)
+ jruby-openssl (0.7.7)
+ bouncy-castle-java (>= 1.5.0146.1)
json (1.6.7)
json (1.6.7-java)
listen (0.5.0)
@@ -144,7 +160,6 @@ GEM
rdoc (3.12)
json (~> 1.4)
redis (3.0.1)
- rollout (1.1.0)
rspec (2.7.0)
rspec-core (~> 2.7.0)
rspec-expectations (~> 2.7.0)
@@ -176,14 +191,18 @@ PLATFORMS
ruby
DEPENDENCIES
+ activerecord-jdbcpostgresql-adapter (~> 1.2.2)
bunny
database_cleaner (~> 0.7.1)
guard
guard-rspec
+ hot_bunnies (~> 1.3.4)
hubble!
+ jruby-openssl (~> 0.7.4)
mocha (~> 0.10.0)
newrelic_rpm (~> 3.3.2)
pg
+ rollout!
rspec (~> 2.7.0)
travis-core!
travis-support!
View
@@ -0,0 +1,74 @@
+require 'bunny'
+require 'hashr'
+
+module Travis
+ module Amqp
+ class Consumer
+ class Message
+ attr_reader :headers, :queue
+
+ def initialize(data)
+ @headers = data
+ end
+
+ def ack
+ # already done by the bunny subscription
+ end
+ end
+
+ class << self
+ def threads
+ @threads ||= []
+ end
+
+ def wait
+ threads.each(&:join)
+ end
+ end
+
+ include Logging
+
+ DEFAULTS = {
+ :subscribe => { :ack => false },
+ :queue => { :prefetch_count => 1, :durable => true },
+ }
+
+ attr_reader :name, :options, :subscription
+
+ def initialize(name, options = {})
+ @name = name
+ @options = Hashr.new(DEFAULTS.deep_merge(options))
+ end
+
+ def subscribe(options = {}, &block)
+ options = deep_merge(self.options.subscribe, options)
+ debug "subscribing to #{name.inspect} with #{options.inspect}"
+
+ self.class.threads << Thread.new {
+ begin
+ queue.subscribe(options) do |data|
+ block.call(Message.new(data), data[:payload])
+ end
+ rescue Exception => e
+ puts e.message, e.backtrace
+ end
+ }
+ end
+
+ def unsubscribe
+ debug "unsubscribing from #{name.inspect}"
+ queue.unsubscribe
+ end
+
+ protected
+
+ def queue
+ @queue ||= Amqp.connection.queue(name, options.queue)
+ end
+
+ def deep_merge(hash, other)
+ hash.merge(other, &(merger = proc { |key, v1, v2| Hash === v1 && Hash === v2 ? v1.merge(v2, &merger) : v2 }))
+ end
+ end
+ end
+end
View
@@ -3,6 +3,7 @@
require 'travis'
require 'travis/support'
require 'timeout'
+require 'consumer'
$stdout.sync = true
@@ -38,16 +39,19 @@ def setup
def subscribe
info 'Subscribing to amqp ...'
info "Subscribing to reporting.jobs.logs"
- Travis::Amqp::Consumer.jobs('logs').subscribe(:ack => true) do |msg, payload|
+
+ Travis::Amqp::Consumer.jobs('logs').subscribe do |msg, payload|
receive(:route, msg, payload)
end
0.upto(Travis.config.logs.shards - 1).each do |shard|
info "Subscribing to reporting.jobs.logs.#{shard}"
- Travis::Amqp::Consumer.jobs("logs.#{shard}").subscribe(:ack => true) do |msg, payload|
+ Travis::Amqp::Consumer.jobs("logs.#{shard}").subscribe do |msg, payload|
receive(:log, msg, payload)
end
end
+
+ Travis::Amqp::Consumer.wait
end
def receive(type, message, payload)
View
@@ -0,0 +1,29 @@
+$: << 'lib'
+
+require 'travis'
+require 'bunny'
+require 'consumer'
+
+b = Travis::Amqp.connection
+b.qos
+
+consumer = Travis::Amqp::Consumer.new('jobs')
+consumer.subscribe(:ack => true) do |msg, payload|
+ p payload
+ # p msg
+ msg.ack
+end
+
+# q = b.queue('jobs')
+#
+# q.subscribe(:ack => true) do |msg|
+# msg_cnt = q.message_count
+# p msg
+# q.ack
+#
+# # if msg_cnt < 1
+# # q.unsubscribe
+# # q.ack
+# # break
+# # end
+# end
View
@@ -0,0 +1,6 @@
+require 'travis'
+
+publisher = Travis::Amqp::Publisher.new('jobs')
+3.times do
+ p publisher.publish('fooo!')
+end

0 comments on commit 136a54e

Please sign in to comment.