From 580d5a0e3c18588b7c99417e921a7d4807879ce9 Mon Sep 17 00:00:00 2001 From: Sean Porter Date: Tue, 19 May 2015 14:17:58 -0700 Subject: [PATCH] [roundrobin] beginnings of round-robin subscriptions \o/ --- lib/sensu/client/process.rb | 15 +++++++++++++-- lib/sensu/server/process.rb | 13 ++++++++++++- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/lib/sensu/client/process.rb b/lib/sensu/client/process.rb index 1eef0eda..cb34d527 100644 --- a/lib/sensu/client/process.rb +++ b/lib/sensu/client/process.rb @@ -233,6 +233,17 @@ def process_check_request(check) end end + def transport_subscribe_options(subscription) + _, raw_type = subscription.split(":", 2).reverse + case raw_type + when "direct", "roundrobin" + [:direct, subscription, subscription] + else + funnel = [@settings[:client][:name], VERSION, Time.now.to_i].join("-") + [:fanout, subscription, funnel] + end + end + # Set up Sensu client subscriptions. Subscriptions determine the # kinds of check requests the client will receive. A unique # transport funnel is created for the Sensu client, using a @@ -245,8 +256,8 @@ def setup_subscriptions @logger.debug("subscribing to client subscriptions") @settings[:client][:subscriptions].each do |subscription| @logger.debug("subscribing to a subscription", :subscription => subscription) - funnel = [@settings[:client][:name], VERSION, Time.now.to_i].join("-") - @transport.subscribe(:fanout, subscription, funnel) do |message_info, message| + options = transport_subscribe_options(subscription) + @transport.subscribe(*options) do |message_info, message| begin check = MultiJson.load(message) @logger.info("received check request", :check => check) diff --git a/lib/sensu/server/process.rb b/lib/sensu/server/process.rb index 3c643b6e..804ccad6 100644 --- a/lib/sensu/server/process.rb +++ b/lib/sensu/server/process.rb @@ -436,6 +436,16 @@ def setup_results end end + def transport_publish_options(subscription, message) + _, raw_type = subscription.split(":", 2).reverse + case raw_type + when "direct", "roundrobin" + [:direct, subscription, message] + else + [:fanout, subscription, message] + end + end + # Publish a check request to the transport. A check request is # composted of a check `:name`, an `:issued` timestamp, and a # check `:command` if available. The check request is published @@ -456,7 +466,8 @@ def publish_check_request(check) :subscribers => check[:subscribers] }) check[:subscribers].each do |subscription| - @transport.publish(:fanout, subscription, MultiJson.dump(payload)) do |info| + options = transport_publish_options(subscription, MultiJson.dump(payload)) + @transport.publish(*options) do |info| if info[:error] @logger.error("failed to publish check request", { :subscription => subscription,