Skip to content
Find file
Fetching contributors…
Cannot retrieve contributors at this time
executable file 250 lines (188 sloc) 6.45 KB
#!/usr/bin/env ruby
# An interactive IRB like interface for STOMP
require 'yaml'
require 'rubygems'
require 'stomp'
require 'pp'
require 'irb'
require 'optparse'
class StompLogger
def on_connecting(params)
puts "info> Attempting to connect to %s" % stomp_url(params)
end
def on_connectfail(params)
puts "error> Connection to %s failed" % stomp_url(params)
end
def on_connectfail(params)
puts "info> Disconnected from %s" % stomp_url(params)
end
def on_connected(params)
puts "info> Connected to %s" % stomp_url(params)
end
def on_miscerr(params, errstr)
puts "error> Unknown error on stomp connection %s: %s" % [stomp_url(params), errstr]
end
def stomp_url(params)
"%s://%s@%s:%d" % [ params[:cur_ssl] ? "stomp+ssl" : "stomp", params[:cur_login], params[:cur_host], params[:cur_port]]
end
end
def consolize(&block)
yield
IRB.setup(nil)
irb = IRB::Irb.new
IRB.conf[:MAIN_CONTEXT] = irb.context
irb.context.evaluate("require 'irb/completion'", 0)
install_alias_method :help, :stomp_help, IRB::ExtendCommandBundle::OVERRIDE_ALL
trap("SIGINT") do
irb.signal_handle
end
catch(:IRB_EXIT) do
irb.eval_input
end
end
def stomp_help
puts <<EOF
Interactive Ruby Shell for Stomp
================================
subscribe :topic, "foo" - subscribes to /topic/foo
unsubscribe :topic, "foo" - unsubscribes from /topic/foo
topic "foo", "hello" - sends "hello" to /topic/foo
topic "foo", "hello" - sends "hello" to /topic/foo
queue "foo", "hello" - sends "hello" to /queue/foo
queue "foo", "hello", {:persistent => true} - sends "hello" to /queue/foo
and persist to storage
recv_callback{|f| pp f} - dumps every received message using pp
verbose - shows timestamps and source of
received messages
short_format "<format>" - sets the short display format
long_format "<format>" - sets the long display format
Display Formats:
You can set the display format used for showing incoming messages
the default long format is:
"<<%{time}:%{source}>> %{body}"
the default short format is:
"<<%{body}>>"
EOF
end
def short_format(frm)
@options[:short_display_format] = frm
end
def long_format(frm)
@options[:long_display_format] = frm
end
def show_subscriptions
puts "Current Subscriptions:"
@connection.instance_variable_get("@subscriptions").keys.each do |s|
puts "\t#{s}"
end
puts
end
def unsubscribe(type, q, headers={})
dest = "/#{type}/#{q}"
unless @subscriptions[dest]
puts "error> Not currently subscribed to #{dest}"
return false
end
@connection.unsubscribe("/#{type}/#{q}", {:id => @subscriptions[dest][:id]}.merge(headers))
@subscriptions.delete(dest)
show_subscriptions
end
def subscribe(type, q, headers={})
dest = "/#{type}/#{q}"
if @subscriptions[dest]
puts "error> Already subscribed to #{dest}"
return false
end
@connection.subscribe(dest, {:id => @next_subscription_id}.merge(headers))
@subscriptions[dest] = {:id => @next_subscription_id}
@next_subscription_id += 1
show_subscriptions
end
def verbose
@options[:verbose] ? @options[:verbose] = false : @options[:verbose] = true
end
def send_msg(dest, message, headers={})
if @connection.respond_to?(:publish)
@connection.publish(dest, message.to_s, headers)
else
@connection.send(dest, message.to_s, headers)
end
puts "Sent #{message.to_s} to #{dest} with headers #{headers.pretty_inspect.chomp}"
end
def topic(dest, message, headers={})
send_msg("/topic/#{dest}", message, headers)
end
def queue(dest, message, headers={})
send_msg("/queue/#{dest}", message, headers)
end
def recv_callback(&blk)
if block_given?
@options[:callback] = blk
else
puts "ERROR: Need to pass a block into the callback"
end
end
def receive_and_print_loop
Thread.new(@connection) do |conn|
while true
begin
msg = conn.receive
dest = msg.headers["destination"]
time = Time.now.strftime('%H:%M:%S')
if @options[:verbose]
txt = @options[:long_display_format].clone
else
txt = @options[:short_display_format].clone
end
txt.gsub!(/%\{time\}/, time)
txt.gsub!(/%\{source\}/, dest)
txt.gsub!(/%\{body\}/, msg.body.chomp)
puts "\r#{txt}\n"
@options[:callback].call(msg)
rescue Exception => e
puts "error> Failed to receive from #{options[:stompserver]}: #{e.class}: #{e}"
end
end
end
end
consolize do
@options = {}
@options[:stompserver] = ENV["STOMP_SERVER"] || "localhost"
@options[:stompport] = (ENV["STOMP_PORT"] || 61613).to_i
@options[:stompuser] = ENV["STOMP_USER"] || "guest"
@options[:stomppass] = ENV["STOMP_PASSWORD"] || "guest"
@options[:verbose] = false
@options[:long_display_format] = "<<%{time}:%{source}>> %{body}"
@options[:short_display_format] = "<<stomp>> %{body}"
@options[:callback] = Proc.new { true }
@options[:heartbeat_interval] = "30000"
@subscriptions = {}
@next_subscription_id = 0
opt = OptionParser.new
opt.on("--server [SERVER]", "-s", "Server to connect to") do |val|
@options[:stompserver] = val
end
opt.on("--port [PORT]", "-p", Integer, "Port to connect to") do |val|
@options[:stompport] = val
end
opt.on("--user [USER]", "-u", "User to connect as") do |val|
@options[:stompuser] = val
end
opt.on("--password [PASSWORD]", "-P", "Password to connect with") do |val|
@options[:stomppass] = val
end
opt.on("--heartbeat-interval [INTERVAL]", "-i", "Interval for Stomp 1.1 heartbeats") do |val|
@options[:heartbeat_interval] = val
end
opt.parse!
connection = {:hosts => [ {:login => @options[:stompuser], :passcode => @options[:stomppass], :host => @options[:stompserver], :port => @options[:stompport]} ],
:logger => StompLogger.new,
:connect_headers => {:"heart-beat" => "#{@options[:heartbeat_interval]},#{@options[:heartbeat_interval]}", :host => @options[:stompserver], :"accept-version" => "1.1,1.0"}}
puts "Interactive Ruby shell for STOMP"
puts
@connection = Stomp::Connection.open(connection)
puts
puts "Type 'help' for usage instructions"
puts
receive_and_print_loop
end
Jump to Line
Something went wrong with that request. Please try again.