Skip to content

Commit

Permalink
Merge pull request sensu#54 from portertech/client-socket
Browse files Browse the repository at this point in the history
Client socket, receive check results from external sources
  • Loading branch information
portertech committed Oct 31, 2011
2 parents 7664a56 + 34c2fde commit f91b49e
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 43 deletions.
30 changes: 22 additions & 8 deletions lib/sensu/api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,24 @@ def self.run(options={})
self.setup(options)
self.run!(:port => @settings.api.port)

Signal.trap('INT') do
EM.stop
end

Signal.trap('TERM') do
EM.stop
%w[INT TERM].each do |signal|
Signal.trap(signal) do
EM.warning('[process] -- ' + signal + ' -- stopping sensu api')
EM.add_timer(1) do
EM.stop
end
end
end
end
end

def self.setup(options={})
config = Sensu::Config.new(options)
@settings = config.settings
EM.syslog_setup(@settings.syslog.host, @settings.syslog.port)
EM.debug('[setup] -- connecting to redis')
set :redis, EM::Hiredis.connect('redis://' + @settings.redis.host + ':' + @settings.redis.port.to_s)
EM.debug('[setup] -- connecting to rabbitmq')
connection = AMQP.connect(@settings.rabbitmq.to_hash.symbolize_keys)
set :amq, MQ.new(connection)
end
Expand All @@ -39,6 +43,7 @@ def self.setup(options={})
end

aget '/clients' do
EM.debug('[clients] -- ' + request.ip + ' -- GET -- request for client list')
current_clients = Array.new
conn.redis.smembers('clients').callback do |clients|
unless clients.empty?
Expand All @@ -55,13 +60,15 @@ def self.setup(options={})
end

aget '/client/:name' do |client|
EM.debug('[client] -- ' + request.ip + ' -- GET -- request for client -- ' + client)
conn.redis.get('client:' + client).callback do |client_json|
status 404 if client_json.nil?
body client_json
end
end

adelete '/client/:name' do |client|
EM.debug('[client] -- ' + request.ip + ' -- DELETE -- request for client -- ' + client)
conn.redis.sismember('clients', client).callback do |client_exists|
unless client_exists == 0
conn.redis.exists('events:' + client).callback do |events_exist|
Expand Down Expand Up @@ -93,6 +100,7 @@ def self.setup(options={})
end

aget '/events' do
EM.debug('[events] -- ' + request.ip + ' -- GET -- request for event list')
current_events = Hash.new
conn.redis.smembers('clients').callback do |clients|
unless clients.empty?
Expand All @@ -113,6 +121,7 @@ def self.setup(options={})
end

aget '/event/:client/:check' do |client, check|
EM.debug('[event] -- ' + request.ip + ' -- GET -- request for event -- ' + client + ' -- ' + check)
conn.redis.hgetall('events:' + client).callback do |events|
client_events = Hash[*events]
event = client_events[check]
Expand All @@ -122,9 +131,10 @@ def self.setup(options={})
end

apost '/stash/*' do |path|
EM.debug('[stash] -- ' + request.ip + ' -- POST -- request for stash -- ' + path)
begin
stash = JSON.parse(request.body.read)
rescue JSON::ParserError => e
rescue JSON::ParserError
status 400
body nil
end
Expand All @@ -135,9 +145,10 @@ def self.setup(options={})
end

apost '/stashes' do
EM.debug('[stashes] -- ' + request.ip + ' -- POST -- request for multiple stashes')
begin
paths = JSON.parse(request.body.read)
rescue JSON::ParserError => e
rescue JSON::ParserError
status 400
body nil
end
Expand All @@ -156,13 +167,15 @@ def self.setup(options={})
end

aget '/stash/*' do |path|
EM.debug('[stash] -- ' + request.ip + ' -- GET -- request for stash -- ' + path)
conn.redis.get('stash:' + path).callback do |stash|
status 404 if stash.nil?
body stash
end
end

adelete '/stash/*' do |path|
EM.debug('[stash] -- ' + request.ip + ' -- DELETE -- request for stash -- ' + path)
conn.redis.exists('stash:' + path).callback do |stash_exist|
unless stash_exist == 0
conn.redis.del('stash:' + path).callback do
Expand All @@ -177,6 +190,7 @@ def self.setup(options={})
end

apost '/test' do
EM.debug('[test] -- ' + request.ip + ' -- POST -- seeding for minitest')
client = '{
"name": "test",
"address": "localhost",
Expand Down
74 changes: 65 additions & 9 deletions lib/sensu/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,38 +9,48 @@ def self.run(options={})
client.setup_keepalives
client.setup_subscriptions
client.setup_queue_monitor
client.setup_client_socket

Signal.trap('INT') do
EM.stop
end

Signal.trap('TERM') do
EM.stop
%w[INT TERM].each do |signal|
Signal.trap(signal) do
EM.warning('[process] -- ' + signal + ' -- stopping sensu client')
EM.add_timer(1) do
EM.stop
end
end
end
end
end

def initialize(options={})
config = Sensu::Config.new(:config_file => options[:config_file])
@settings = config.settings
EM.syslog_setup(@settings.syslog.host, @settings.syslog.port)
end

def setup_amqp
EM.debug("[amqp] -- connecting to rabbitmq")
connection = AMQP.connect(@settings.rabbitmq.to_hash.symbolize_keys)
@amq = MQ.new(connection)
end

def publish_keepalive
EM.debug('[keepalive] -- publishing keepalive -- ' + @settings.client.timestamp.to_s)
@keepalive_queue ||= @amq.queue('keepalives')
@keepalive_queue.publish(@settings.client.to_json)
end

def setup_keepalives
keepalive_queue = @amq.queue('keepalives')
@settings.client.timestamp = Time.now.to_i
keepalive_queue.publish(@settings.client.to_json)
publish_keepalive
EM.add_periodic_timer(30) do
@settings.client.timestamp = Time.now.to_i
keepalive_queue.publish(@settings.client.to_json)
publish_keepalive
end
end

def publish_result(check)
EM.info('[result] -- publishing check result -- ' + check.name)
@result_queue ||= @amq.queue('results')
@result_queue.publish({
:client => @settings.client.name,
Expand Down Expand Up @@ -72,6 +82,7 @@ def execute_check(check)
end
EM.defer(execute, publish)
else
EM.warning('[execute] -- missing client attributes -- ' + unmatched_tokens.join(', ') + ' -- ' + check.name)
check.status = 3
check.output = 'Missing client attributes: ' + unmatched_tokens.join(', ')
check.internal = true
Expand All @@ -80,6 +91,7 @@ def execute_check(check)
end
end
else
EM.warning('[execute] -- unkown check -- ' + check.name)
check.status = 3
check.output = 'Unknown check'
check.internal = true
Expand All @@ -92,22 +104,66 @@ def setup_subscriptions
@check_queue = @amq.queue(UUIDTools::UUID.random_create.to_s, :exclusive => true)
@settings.client.subscriptions.each do |exchange|
@check_queue.bind(@amq.fanout(exchange))
EM.debug('[subscribe] -- queue bound to exchange -- ' + exchange)
end
@check_queue.subscribe do |check_json|
check = Hashie::Mash.new(JSON.parse(check_json))
EM.info('[subscribe] -- received check -- ' + check.name)
execute_check(check)
end
end

def setup_queue_monitor
EM.add_periodic_timer(5) do
unless @check_queue.subscribed?
EM.warning('[monitor] -- reconnecting to rabbitmq')
@check_queue.delete
EM.add_timer(1) do
setup_subscriptions
end
end
end
end

def setup_client_socket
EM.debug('[socket] -- starting up socket server')
EM.start_server('127.0.0.1', 3030, ClientSocket) do |socket|
socket.client_name = @settings.client.name
socket.result_queue = @amq.queue('results')
end
end
end

class ClientSocket < EM::Connection
attr_accessor :client_name, :result_queue

def post_init
EM.debug('[socket] -- client connected')
end

def receive_data(data)
begin
check = Hashie::Mash.new(JSON.parse(data))
validates = %w[name status output].all? do |key|
check.key?(key)
end
if validates
EM.info('[socket] -- publishing check result -- ' + check.name)
@result_queue.publish({
:client => @client_name,
:check => check.to_hash
}.to_json)
else
EM.warning('[socket] -- a check name, exit status, and output are required -- e.g. {name: x, status: 0, output: "y"}')
end
rescue JSON::ParserError
EM.warning('[socket] -- could not parse check result -- expecting JSON')
end
close_connection
end

def unbind
EM.debug('[socket] -- client disconnected')
end
end
end
45 changes: 27 additions & 18 deletions lib/sensu/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@
module Sensu
class Server
attr_accessor :redis, :is_worker
alias :redis_connection :redis

def self.run(options={})
EM.threadpool_size = 15
EM.threadpool_size = 16
EM.run do
server = self.new(options)
server.setup_logging
server.setup_redis
server.setup_amqp
server.setup_keepalives
Expand All @@ -21,12 +19,13 @@ def self.run(options={})
end
server.setup_queue_monitor

Signal.trap('INT') do
EM.stop
end

Signal.trap('TERM') do
EM.stop
%w[INT TERM].each do |signal|
Signal.trap(signal) do
EM.warning('[process] -- ' + signal + ' -- stopping sensu server')
EM.add_timer(1) do
EM.stop
end
end
end
end
end
Expand All @@ -35,27 +34,27 @@ def initialize(options={})
config = Sensu::Config.new(:config_file => options[:config_file])
@settings = config.settings
@is_worker = options[:worker]
end

def setup_logging
EM.syslog_setup(@settings.syslog.host, @settings.syslog.port)
end

def setup_redis
EM.debug('[redis] -- connecting to redis')
@redis = EM::Hiredis.connect('redis://' + @settings.redis.host + ':' + @settings.redis.port.to_s)
end

def setup_amqp
EM.debug('[amqp] -- connecting to rabbitmq')
connection = AMQP.connect(@settings.rabbitmq.to_hash.symbolize_keys)
@amq = MQ.new(connection)
end

def setup_keepalives
@keepalive_queue = @amq.queue('keepalives')
@keepalive_queue.subscribe do |keepalive_json|
client_id = JSON.parse(keepalive_json)['name']
@redis.set('client:' + client_id, keepalive_json).callback do
@redis.sadd('clients', client_id)
client = Hashie::Mash.new(JSON.parse(keepalive_json))
EM.debug('[keepalive] -- received keepalive -- ' + client.name)
@redis.set('client:' + client.name, keepalive_json).callback do
@redis.sadd('clients', client.name)
end
end
end
Expand All @@ -72,10 +71,15 @@ def handle_event(event)
end
report = proc do |output|
output.split(/\n+/).each do |line|
EM.debug(line)
EM.info('[handler] -- ' + line)
end
end
EM.defer(handler, report)
if @settings.handlers.key?(event.check.handler)
EM.debug('[event] -- handling event -- ' + [event.check.handler, event.client.name, event.check.name].join(' -- '))
EM.defer(handler, report)
else
EM.warning('[event] -- handler does not exist -- ' + event.check.handler)
end
end

def process_result(result)
Expand Down Expand Up @@ -132,6 +136,7 @@ def process_result(result)
end
end
else
EM.debug('[result] -- check is flapping -- ' + client.name + ' -- ' + check.name)
@redis.hset('events:' + client.name, check.name, previous_event.merge({'flapping' => true}).to_json)
end
elsif check['status'] != 0
Expand Down Expand Up @@ -163,6 +168,7 @@ def setup_results
@result_queue = @amq.queue('results')
@result_queue.subscribe do |result_json|
result = Hashie::Mash.new(JSON.parse(result_json))
EM.info('[result] -- received result -- ' + result.client + ' -- ' + result.check.name)
process_result(result)
end
end
Expand All @@ -179,8 +185,8 @@ def setup_publisher(options={})
end
interval = options[:test] ? 0.5 : details.interval
EM.add_periodic_timer(interval) do
EM.info('[publisher] -- publishing check -- ' + name + ' -- ' + exchange)
exchanges[exchange].publish({'name' => name, 'issued' => Time.now.to_i}.to_json)
EM.debug('name="Published Check" event_id=server action="Published check ' + name + ' to the ' + exchange + ' exchange"')
end
end
end
Expand All @@ -190,6 +196,7 @@ def setup_publisher(options={})

def setup_keepalive_monitor
EM.add_periodic_timer(30) do
EM.debug('[keepalive] -- checking for stale clients')
@redis.smembers('clients').callback do |clients|
clients.each do |client_id|
@redis.get('client:' + client_id).callback do |client_json|
Expand Down Expand Up @@ -229,9 +236,11 @@ def setup_keepalive_monitor
def setup_queue_monitor
EM.add_periodic_timer(5) do
unless @keepalive_queue.subscribed?
EM.warning('[monitor] -- reconnecting to rabbitmq')
setup_keepalives
end
unless @result_queue.subscribed?
EM.warning('[monitor] -- reconnecting to rabbitmq')
setup_results
end
end
Expand Down
Loading

0 comments on commit f91b49e

Please sign in to comment.