Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Make it work with latest version of AMQP #2

Merged
merged 2 commits into from

2 participants

@nel
nel commented

New API of AMQP is not backward compatible when it comes to channel management. The current code didn't work anymore because it tried to reopen channel and was not waiting for async creation to work.

@rubenfonseca
Owner

Wow that is awesome, thanks!

@rubenfonseca rubenfonseca merged commit 2ea3282 into rubenfonseca:master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
Showing with 24 additions and 19 deletions.
  1. +2 −0  .gitignore
  2. +8 −6 socket.rb
  3. +14 −13 twitterfeed.rb
View
2  .gitignore
@@ -1,3 +1,5 @@
bin
vendor
.bundle
+Gemfile.lock
+.rbenv-version
View
14 socket.rb
@@ -1,17 +1,19 @@
-require 'vendor/gems/environment'
require 'em-websocket'
require 'uuid'
-require 'mq'
+require 'amqp'
uuid = UUID.new
EventMachine::WebSocket.start(:host => "0.0.0.0", :port => 8080) do |ws|
ws.onopen do
puts "WebSocket opened"
-
- twitter = MQ.new
- twitter.queue(uuid.generate).bind(twitter.fanout('twitter')).subscribe do |t|
- ws.send t
+ AMQP.connect(:host => '127.0.0.1') do |connection, open_ok|
+ AMQP::Channel.new(connection) do |channel, open_ok|
+ channel.queue(uuid.generate, :auto_delete => true).bind(channel.fanout("twitter")).subscribe do |t|
+ puts 'got a tweet'
+ ws.send t
+ end
+ end
end
end
View
27 twitterfeed.rb
@@ -1,20 +1,21 @@
-require 'vendor/gems/environment'
-require 'mq'
+require 'amqp'
require 'twitter/json_stream'
username = ARGV.shift
password = ARGV.shift
raise "need username and password" if !username or !password
-AMQP.start(:host => 'localhost') do
- twitter = MQ.new.fanout('twitter')
-
- stream = Twitter::JSONStream.connect(
- :path => '/1/statuses/filter.json?track=iphone',
- :auth => "#{username}:#{password}"
- )
-
- stream.each_item do |status|
- twitter.publish(status)
+AMQP.start(:host => 'localhost') do |connection, open_ok|
+ AMQP::Channel.new(connection) do |channel, open_ok|
+ twitter = channel.fanout("twitter")
+
+ stream = Twitter::JSONStream.connect(
+ :path => '/1/statuses/filter.json?track=iphone',
+ :auth => "#{username}:#{password}"
+ )
+
+ stream.each_item do |status|
+ twitter.publish(status)
+ end
end
-end
+end
Something went wrong with that request. Please try again.