Skip to content
This repository has been archived by the owner on Jan 1, 2020. It is now read-only.

Commit

Permalink
[roundrobin] beginnings of round-robin subscriptions \o/
Browse files Browse the repository at this point in the history
  • Loading branch information
portertech committed May 19, 2015
1 parent 2b7040c commit 580d5a0
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 3 deletions.
15 changes: 13 additions & 2 deletions lib/sensu/client/process.rb
Expand Up @@ -233,6 +233,17 @@ def process_check_request(check)
end
end

def transport_subscribe_options(subscription)
_, raw_type = subscription.split(":", 2).reverse
case raw_type
when "direct", "roundrobin"
[:direct, subscription, subscription]
else
funnel = [@settings[:client][:name], VERSION, Time.now.to_i].join("-")
[:fanout, subscription, funnel]
end
end

# Set up Sensu client subscriptions. Subscriptions determine the
# kinds of check requests the client will receive. A unique
# transport funnel is created for the Sensu client, using a
Expand All @@ -245,8 +256,8 @@ def setup_subscriptions
@logger.debug("subscribing to client subscriptions")
@settings[:client][:subscriptions].each do |subscription|
@logger.debug("subscribing to a subscription", :subscription => subscription)
funnel = [@settings[:client][:name], VERSION, Time.now.to_i].join("-")
@transport.subscribe(:fanout, subscription, funnel) do |message_info, message|
options = transport_subscribe_options(subscription)
@transport.subscribe(*options) do |message_info, message|
begin
check = MultiJson.load(message)
@logger.info("received check request", :check => check)
Expand Down
13 changes: 12 additions & 1 deletion lib/sensu/server/process.rb
Expand Up @@ -436,6 +436,16 @@ def setup_results
end
end

def transport_publish_options(subscription, message)
_, raw_type = subscription.split(":", 2).reverse
case raw_type
when "direct", "roundrobin"
[:direct, subscription, message]
else
[:fanout, subscription, message]
end
end

# Publish a check request to the transport. A check request is
# composted of a check `:name`, an `:issued` timestamp, and a
# check `:command` if available. The check request is published
Expand All @@ -456,7 +466,8 @@ def publish_check_request(check)
:subscribers => check[:subscribers]
})
check[:subscribers].each do |subscription|
@transport.publish(:fanout, subscription, MultiJson.dump(payload)) do |info|
options = transport_publish_options(subscription, MultiJson.dump(payload))
@transport.publish(*options) do |info|
if info[:error]
@logger.error("failed to publish check request", {
:subscription => subscription,
Expand Down

0 comments on commit 580d5a0

Please sign in to comment.