Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #19 from portertech/single-channel

[single-channel] single queue per client bound to multiple exchanges, no
  • Loading branch information...
commit 19fe13ab208cc79edcd1acb08630cc472e929ca6 2 parents 257e51e + eb8d00c
@portertech portertech authored
View
13 lib/sensu/client.rb
@@ -40,10 +40,10 @@ def setup_keep_alives
end
def execute_check(check)
+ result = {'client' => @settings['client']['name'], 'check' => check}
if @settings['checks'].has_key?(check['name'])
unless @checks_in_progress.include?(check['name'])
@checks_in_progress.push(check['name'])
- result = {'client' => @settings['client']['name'], 'check' => check}
unmatched_tokens = Array.new
command = @settings['checks'][check['name']]['command'].gsub(/:::(.*?):::/) do
key = $1.to_s
@@ -70,12 +70,13 @@ def execute_check(check)
end
def setup_subscriptions
+ uniq_queue = @amq.queue(UUIDTools::UUID.random_create.to_s, :exclusive => true)
@settings['client']['subscriptions'].each do |exchange|
- uniq_queue_name = UUIDTools::UUID.random_create.to_s
- @amq.queue(uniq_queue_name, :exclusive => true).bind(@amq.fanout(exchange)).subscribe do |check_json|
- check = JSON.parse(check_json)
- execute_check(check)
- end
+ uniq_queue.bind(@amq.fanout(exchange))
+ end
+ uniq_queue.subscribe do |check_json|
+ check = JSON.parse(check_json)
+ execute_check(check)
end
end
end
View
31 lib/sensu/server.rb
@@ -16,7 +16,6 @@ def self.run(options={})
server.setup_handlers
server.setup_results
server.setup_publisher
- server.setup_populator
server.setup_keep_alive_monitor
Signal.trap('INT') do
@@ -129,26 +128,20 @@ def setup_results
end
end
- def setup_publisher
+ def setup_publisher(options={})
exchanges = Hash.new
- @amq.queue('checks').subscribe do |check_json|
- check = JSON.parse(check_json)
- check['subscribers'].each do |exchange|
- if exchanges[exchange].nil?
- exchanges[exchange] = @amq.fanout(exchange)
- end
- exchanges[exchange].publish({'name' => check['name'], 'issued' => Time.now.to_i}.to_json)
- EM.debug('published :: ' + exchange + ' :: ' + check['name'])
- end
- end
- end
-
- def setup_populator
- check_queue = @amq.queue('checks')
+ stagger = options[:test] ? 0 : 7
@settings['checks'].each_with_index do |(name, details), index|
- EM.add_timer(7*index) do
- EM.add_periodic_timer(details['interval']) do
- check_queue.publish({'name' => name, 'subscribers' => details['subscribers']}.to_json)
+ EM.add_timer(stagger*index) do
+ details['subscribers'].each do |exchange|
+ if exchanges[exchange].nil?
+ exchanges[exchange] = @amq.fanout(exchange)
+ end
+ interval = options[:test] ? 0.5 : details['interval']
+ EM.add_periodic_timer(interval) do
+ exchanges[exchange].publish({'name' => name, 'issued' => Time.now.to_i}.to_json)
+ EM.debug('published :: ' + exchange + ' :: ' + name)
+ end
end
end
end
View
15 test/sensu_server_and_client_test.rb
@@ -56,7 +56,7 @@ def test_handlers
event = {
'client' => @settings['client'],
'check' => {
- 'name' => 'test',
+ 'name' => 'test_handlers',
'handler' => 'default',
'issued' => Time.now.to_i,
'status' => 1,
@@ -71,7 +71,7 @@ def test_handlers
end
end
- def test_execute_checks
+ def test_publish_subscribe
server = Sensu::Server.new(@options)
client = Sensu::Client.new(@options)
server.setup_logging
@@ -80,12 +80,11 @@ def test_execute_checks
server.setup_keep_alives
server.setup_handlers
server.setup_results
+ server.redis_connection.flushall
client.setup_amqp
client.setup_keep_alives
- server.redis_connection.flushall
- @settings['checks'].each_key do |name|
- client.execute_check({'name' => name, 'issued' => Time.now.to_i})
- end
+ client.setup_subscriptions
+ server.setup_publisher(:test => true)
client_events = Hash.new
EM.add_timer(1) do
server.redis_connection.hgetall('events:' + @settings['client']['name']).callback do |events|
@@ -97,7 +96,9 @@ def test_execute_checks
end
parallel do
@settings['checks'].each_with_index do |(name, details), index|
- eventually({'status' => index + 1, 'output' => @settings['client']['name'] + "\n", "occurrences" => 1}, :total => 1.5) { client_events[name] }
+ eventually({'status' => index + 1, 'output' => @settings['client']['name'] + "\n", "occurrences" => 1}, :total => 1.5) do
+ client_events[name]
+ end
end
end
end
Please sign in to comment.
Something went wrong with that request. Please try again.