Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

add srv record lookups #58

Merged
merged 5 commits into from

3 participants

@rphillips
Owner
  • adds a query_endpoints config variable to the config file which contains a list of SRV queries
  • remove :sample() for cleanup
agents/monitoring/lua/monitoring-agent.lua
((5 lines not shown))
local ConnectionStream = require('./lib/client/connection_stream').ConnectionStream
local misc = require('./lib/util/misc')
local States = require('./lib/states')
local stateFile = require('./lib/state_file')
+local SRV_QUERIES = {
@philips
philips added a note

Why put these in the code and not the config file?

@rphillips Owner

i was thinking we don't want the DNS SRV endpoints to be configurable.

@rphillips Owner

Added configurable SRV records.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
agents/monitoring/lua/monitoring-agent.lua
@@ -78,8 +85,29 @@ function MonitoringAgent:sample()
logging.log(logging.CRIT, "Server listening at http://localhost:8080/")
end
+function MonitoringAgent:_queryForEndpoints(callback)
+ local endpoints = ''
+ function iter(domain, callback)
+ dns.resolve(domain, 'SRV', callback)
+ end
+ async.map(SRV_QUERIES, iter, function(err, results)
+ local i, v
+ if err then
@Kami Owner
Kami added a note

Passing err to async.map short-circuits it, right? Should not being able to resolve one of the endpoints be fatal?

@rphillips Owner

we will connect to up to 3 endpoints. I'll add some code to manage the error better.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
agents/monitoring/lua/monitoring-agent.lua
((16 lines not shown))
- for i, address in ipairs(endpoints) do
- if misc.splitAddress(address) == nil then
- logging.log(logging.ERR, "endpoint needs to be specified in the following format ip:port")
+ logging.log(logging.INFO, "'endpoints' is missing from 'config'")
+ logging.log(logging.INFO, "querying for endpoints")
+ self:_queryForEndpoints(function(err, endpoints)
+ if err then
+ callback(err)
+ return
+ end
+ self._config['endpoints'] = endpoints
+ callback()
+ end)
+ else
+ -- Verify that the endpoint addresses are specified in the correct format
+ endpoints = misc.split(self._config['endpoints'], '[^,]+')
@philips
philips added a note

Why go through the trouble of making the list comma separated above just to split it again below?

@rphillips Owner

this was the code that was there before... This code handles the non-srv record lookup.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
agents/monitoring/lua/monitoring-agent.lua
((6 lines not shown))
- end
-
- -- Verify that the endpoint addresses are specified in the correct format
- local endpoints = misc.split(self._config['endpoints'], '[^,]+')
-
- if #endpoints == 0 then
- logging.log(logging.ERR, "at least one endpoint needs to be specified")
- process.exit(1)
- end
-
- for i, address in ipairs(endpoints) do
- if misc.splitAddress(address) == nil then
- logging.log(logging.ERR, "endpoint needs to be specified in the following format ip:port")
+ logging.log(logging.INFO, "'endpoints' is missing from 'config'")
+ logging.log(logging.INFO, "querying for endpoints")
+ self:_queryForEndpoints(function(err, endpoints)
@philips
philips added a note

Do we need a second code path for an endpoint list that is specified without a SRV record for testing?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@philips

Could you comment on the interval unit?

Owner

I globally commented the timeouts/intervals in 30099f1

@philips

Hrm, a unit here would be nice too

@philips

Perhaps print this at the end of the loop? Would be slightly less spammy.

@philips

great! thanks

@philips

lgtm +1

@rphillips rphillips merged commit 3c6dbbe into master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on May 1, 2012
  1. @rphillips

    add srv record lookups

    rphillips authored
Commits on May 2, 2012
  1. @rphillips

    add monitoring srv servers

    rphillips authored
  2. @rphillips
  3. @rphillips
Commits on May 9, 2012
  1. @rphillips

    comments timeouts

    rphillips authored
This page is out of date. Refresh to see the latest.
View
8 agents/monitoring/lua/lib/util/constants.lua
@@ -1,8 +1,16 @@
local exports = {}
+
+-- All intervals and timeouts are in milliseconds
+
+exports.GC_INTERVAL = 5 * 1000
+
exports.CONNECT_TIMEOUT = 6000
exports.SOCKET_TIMEOUT = 10000
exports.PING_INTERVAL_JITTER = 7000
exports.DATACENTER_MAX_DELAY = 5 * 60 * 1000 -- max connection delay
exports.DATACENTER_MAX_DELAY_JITTER = 7000
+
+exports.SRV_RECORD_FAILURE_DELAY = 15 * 1000
+exports.SRV_RECORD_FAILURE_DELAY_JITTER = 15 * 1000
return exports
View
136 agents/monitoring/lua/monitoring-agent.lua
@@ -22,8 +22,10 @@ local Object = require('core').Object
local fmt = require('string').format
local logging = require('logging')
local timer = require('timer')
+local dns = require('dns')
local ConnectionStream = require('./lib/client/connection_stream').ConnectionStream
+local constants = require('./lib/util/constants')
local misc = require('./lib/util/misc')
local States = require('./lib/states')
local stateFile = require('./lib/state_file')
@@ -36,85 +38,84 @@ end
process:on('SIGUSR1', gc)
-- Setup GC
-timer.setInterval(5 * 1000, gc)
+timer.setInterval(constants.GC_INTERVAL, gc)
local MonitoringAgent = Object:extend()
-function MonitoringAgent:sample()
- local HTTP = require("http")
- local Utils = require("utils")
- local logging = require('logging')
- local s = sigar:new()
- local sysinfo = s:sysinfo()
- local cpus = s:cpus()
- local netifs = s:netifs()
- local i = 1;
-
- HTTP.createServer("0.0.0.0", 8080, function (req, res)
- local body = Utils.dump({req=req,headers=req.headers}) .. "\n"
- res:write_head(200, {
- ["Content-Type"] = "text/plain",
- ["Content-Length"] = #body
- })
- res:finish(body)
- end)
-
- print("sigar.sysinfo = ".. Utils.dump(sysinfo))
-
- while i <= #cpus do
- print("sigar.cpus[".. i .."].info = ".. Utils.dump(cpus[i]:info()))
- print("sigar.cpus[".. i .."].data = ".. Utils.dump(cpus[i]:data()))
- i = i + 1
+function MonitoringAgent:_queryForEndpoints(domains, callback)
+ local endpoints = ''
+ function iter(domain, callback)
+ dns.resolve(domain, 'SRV', function(err, results)
+ if err then
+ logging.log(logging.ERR, 'Could not lookup SRV record from ' .. domain)
+ callback()
+ return
+ end
+ callback(nil, results)
+ end)
end
-
- i = 1;
-
- while i <= #netifs do
- print("sigar.netifs[".. i .."].info = ".. Utils.dump(netifs[i]:info()))
- print("sigar.netifs[".. i .."].usage = ".. Utils.dump(netifs[i]:usage()))
- i = i + 1
- end
-
- logging.log(logging.CRIT, "Server listening at http://localhost:8080/")
+ async.map(domains, iter, function(err, results)
+ local i, v, serverPort
+ for i, v in pairs(results) do
+ serverPort = results[i][1].name .. ':' .. results[i][1].port
+ endpoints = endpoints .. serverPort
+ logging.log(logging.INFO, 'found endpoint: ' .. serverPort)
+ if i ~= #results then
+ endpoints = endpoints .. ','
+ end
+ end
+ callback(nil, endpoints)
+ end)
end
function MonitoringAgent:_verifyState(callback)
- callback = callback or function() end
if self._config == nil then
logging.log(logging.ERR, "statefile 'config' missing or invalid")
process.exit(1)
end
- if self._config['id'] == nil then
+ if self._config['monitoring_id'] == nil then
logging.log(logging.ERR, "'id' is missing from 'config'")
process.exit(1)
end
- if self._config['token'] == nil then
+ if self._config['monitoring_token'] == nil then
logging.log(logging.ERR, "'token' is missing from 'config'")
process.exit(1)
end
+ callback()
+end
- if self._config['endpoints'] == nil then
- logging.log(logging.ERR, "'endpoints' is missing from 'config'")
- process.exit(1)
- end
-
- -- Verify that the endpoint addresses are specified in the correct format
- local endpoints = misc.split(self._config['endpoints'], '[^,]+')
-
- if #endpoints == 0 then
- logging.log(logging.ERR, "at least one endpoint needs to be specified")
- process.exit(1)
- end
-
- for i, address in ipairs(endpoints) do
- if misc.splitAddress(address) == nil then
- logging.log(logging.ERR, "endpoint needs to be specified in the following format ip:port")
+function MonitoringAgent:_loadEndpoints(callback)
+ local endpoints
+ local query_endpoints
+ if self._config['monitoring_query_endpoints'] and
+ self._config['monitoring_endpoints'] == nil then
+ -- Verify that the endpoint addresses are specified in the correct format
+ query_endpoints = misc.split(self._config['monitoring_query_endpoints'], '[^,]+')
+ logging.log(logging.INFO, "querying for endpoints")
+ self:_queryForEndpoints(query_endpoints, function(err, endpoints)
+ if err then
+ callback(err)
+ return
+ end
+ self._config['monitoring_endpoints'] = endpoints
+ callback()
+ end)
+ else
+ -- Verify that the endpoint addresses are specified in the correct format
+ endpoints = misc.split(self._config['monitoring_endpoints'], '[^,]+')
+ if #endpoints == 0 then
+ logging.log(logging.ERR, "at least one endpoint needs to be specified")
process.exit(1)
end
+ for i, address in ipairs(endpoints) do
+ if misc.splitAddress(address) == nil then
+ logging.log(logging.ERR, "endpoint needs to be specified in the following format ip:port")
+ process.exit(1)
+ end
+ end
+ logging.log(logging.INFO, "using id " .. self._config['monitoring_id'])
+ callback()
end
-
- logging.log(logging.INFO, "using id " .. self._config['id'])
- callback()
end
function MonitoringAgent:loadStates(callback)
@@ -126,17 +127,26 @@ function MonitoringAgent:loadStates(callback)
-- Verify
function(callback)
self:_verifyState(callback)
- end
+ end,
+ function(callback)
+ self:_loadEndpoints(callback)
+ end
}, callback)
end
function MonitoringAgent:connect(callback)
- local endpoints = misc.split(self._config['endpoints'], '[^,]+')
- self._streams = ConnectionStream:new(self._config['id'], self._config['token'])
+ local endpoints = misc.split(self._config['monitoring_endpoints'], '[^,]+')
+ if #endpoints <= 0 then
+ logging.log(logging.ERR, 'no endpoints')
+ timer.setTimeout(misc.calcJitter(constants.SRV_RECORD_FAILURE_DELAY, constants.SRV_RECORD_FAILURE_DELAY_JITTER), function()
+ process.exit(1)
+ end)
+ return
+ end
+ self._streams = ConnectionStream:new(self._config['monitoring_id'], self._config['monitoring_token'])
self._streams:on('error', function(err)
logging.log(logging.ERR, JSON.stringify(err))
end)
-
self._streams:createConnections(endpoints, callback)
end
@@ -150,7 +160,7 @@ end
function MonitoringAgent.run(options)
if not options then options = {} end
local agent = MonitoringAgent:new(options.stateDirectory, options.configFile)
- async.waterfall({
+ async.series({
function(callback)
agent:loadStates(callback)
end,
View
7 contrib/config.state
@@ -1,3 +1,4 @@
-id agentA
-token aToken
-endpoints 127.0.0.1:50041,127.0.0.1:50051,127.0.0.1:50061
+monitoring_id agentA
+monitoring_token aToken
+monitoring_query_endpoints _monitoring_agent._tcp.lon3.stage.monitoring.api.rackspacecloud.com,_monitoring_agent._tcp.ord1.stage.monitoring.api.rackspacecloud.com
+#monitoring_endpoints 127.0.0.1:50041,127.0.0.1:50051,127.0.0.1:50061
2  deps/luvit
@@ -1 +1 @@
-Subproject commit 0653b7ff7f3f53bcaa282954473c8ef819e61abb
+Subproject commit d3b395317512c061ab0f4fa6e77d43ee45bd655a
View
6 pkg/monitoring/rackspace-monitoring-agent.cfg
@@ -1,3 +1,3 @@
-id agentA
-token aToken
-endpoints 127.0.0.1:50041,127.0.0.1:50051,127.0.0.1:50061
+monitoring_id REPLACE_ID
+monitoring_token REPLACE_TOKEN
+monitoring_query_endpoints _monitoring_agent._tcp.lon3.stage.monitoring.api.rackspacecloud.com,_monitoring_agent._tcp.ord1.stage.monitoring.api.rackspacecloud.com
Something went wrong with that request. Please try again.