add srv record lookups #58

Merged
merged 5 commits into from May 9, 2012
Jump to file or symbol
Failed to load files and symbols.
+89 −70
Diff settings

Always

Just for now

@@ -1,8 +1,16 @@
local exports = {}
+
+-- All intervals and timeouts are in milliseconds
+
+exports.GC_INTERVAL = 5 * 1000
+
exports.CONNECT_TIMEOUT = 6000
exports.SOCKET_TIMEOUT = 10000
exports.PING_INTERVAL_JITTER = 7000
exports.DATACENTER_MAX_DELAY = 5 * 60 * 1000 -- max connection delay
exports.DATACENTER_MAX_DELAY_JITTER = 7000
+
+exports.SRV_RECORD_FAILURE_DELAY = 15 * 1000
+exports.SRV_RECORD_FAILURE_DELAY_JITTER = 15 * 1000
return exports
@@ -22,8 +22,10 @@ local Object = require('core').Object
local fmt = require('string').format
local logging = require('logging')
local timer = require('timer')
+local dns = require('dns')
local ConnectionStream = require('./lib/client/connection_stream').ConnectionStream
+local constants = require('./lib/util/constants')
local misc = require('./lib/util/misc')
local States = require('./lib/states')
local stateFile = require('./lib/state_file')
@@ -36,85 +38,84 @@ end
process:on('SIGUSR1', gc)
-- Setup GC
-timer.setInterval(5 * 1000, gc)
+timer.setInterval(constants.GC_INTERVAL, gc)
local MonitoringAgent = Object:extend()
-function MonitoringAgent:sample()
- local HTTP = require("http")
- local Utils = require("utils")
- local logging = require('logging')
- local s = sigar:new()
- local sysinfo = s:sysinfo()
- local cpus = s:cpus()
- local netifs = s:netifs()
- local i = 1;
-
- HTTP.createServer("0.0.0.0", 8080, function (req, res)
- local body = Utils.dump({req=req,headers=req.headers}) .. "\n"
- res:write_head(200, {
- ["Content-Type"] = "text/plain",
- ["Content-Length"] = #body
- })
- res:finish(body)
- end)
-
- print("sigar.sysinfo = ".. Utils.dump(sysinfo))
-
- while i <= #cpus do
- print("sigar.cpus[".. i .."].info = ".. Utils.dump(cpus[i]:info()))
- print("sigar.cpus[".. i .."].data = ".. Utils.dump(cpus[i]:data()))
- i = i + 1
+function MonitoringAgent:_queryForEndpoints(domains, callback)
+ local endpoints = ''
+ function iter(domain, callback)
+ dns.resolve(domain, 'SRV', function(err, results)
+ if err then
+ logging.log(logging.ERR, 'Could not lookup SRV record from ' .. domain)
+ callback()
+ return
+ end
+ callback(nil, results)
+ end)
end
-
- i = 1;
-
- while i <= #netifs do
- print("sigar.netifs[".. i .."].info = ".. Utils.dump(netifs[i]:info()))
- print("sigar.netifs[".. i .."].usage = ".. Utils.dump(netifs[i]:usage()))
- i = i + 1
- end
-
- logging.log(logging.CRIT, "Server listening at http://localhost:8080/")
+ async.map(domains, iter, function(err, results)
+ local i, v, serverPort
+ for i, v in pairs(results) do
+ serverPort = results[i][1].name .. ':' .. results[i][1].port
+ endpoints = endpoints .. serverPort
+ logging.log(logging.INFO, 'found endpoint: ' .. serverPort)
+ if i ~= #results then
+ endpoints = endpoints .. ','
+ end
+ end
+ callback(nil, endpoints)
+ end)
end
function MonitoringAgent:_verifyState(callback)
- callback = callback or function() end
if self._config == nil then
logging.log(logging.ERR, "statefile 'config' missing or invalid")
process.exit(1)
end
- if self._config['id'] == nil then
+ if self._config['monitoring_id'] == nil then
logging.log(logging.ERR, "'id' is missing from 'config'")
process.exit(1)
end
- if self._config['token'] == nil then
+ if self._config['monitoring_token'] == nil then
logging.log(logging.ERR, "'token' is missing from 'config'")
process.exit(1)
end
+ callback()
+end
- if self._config['endpoints'] == nil then
- logging.log(logging.ERR, "'endpoints' is missing from 'config'")
- process.exit(1)
- end
-
- -- Verify that the endpoint addresses are specified in the correct format
- local endpoints = misc.split(self._config['endpoints'], '[^,]+')
-
- if #endpoints == 0 then
- logging.log(logging.ERR, "at least one endpoint needs to be specified")
- process.exit(1)
- end
-
- for i, address in ipairs(endpoints) do
- if misc.splitAddress(address) == nil then
- logging.log(logging.ERR, "endpoint needs to be specified in the following format ip:port")
+function MonitoringAgent:_loadEndpoints(callback)
+ local endpoints
+ local query_endpoints
+ if self._config['monitoring_query_endpoints'] and
+ self._config['monitoring_endpoints'] == nil then
+ -- Verify that the endpoint addresses are specified in the correct format
+ query_endpoints = misc.split(self._config['monitoring_query_endpoints'], '[^,]+')
+ logging.log(logging.INFO, "querying for endpoints")
+ self:_queryForEndpoints(query_endpoints, function(err, endpoints)
+ if err then
+ callback(err)
+ return
+ end
+ self._config['monitoring_endpoints'] = endpoints
+ callback()
+ end)
+ else
+ -- Verify that the endpoint addresses are specified in the correct format
+ endpoints = misc.split(self._config['monitoring_endpoints'], '[^,]+')
+ if #endpoints == 0 then
+ logging.log(logging.ERR, "at least one endpoint needs to be specified")
process.exit(1)
end
+ for i, address in ipairs(endpoints) do
+ if misc.splitAddress(address) == nil then
+ logging.log(logging.ERR, "endpoint needs to be specified in the following format ip:port")
+ process.exit(1)
+ end
+ end
+ logging.log(logging.INFO, "using id " .. self._config['monitoring_id'])
+ callback()
end
-
- logging.log(logging.INFO, "using id " .. self._config['id'])
- callback()
end
function MonitoringAgent:loadStates(callback)
@@ -126,17 +127,26 @@ function MonitoringAgent:loadStates(callback)
-- Verify
function(callback)
self:_verifyState(callback)
- end
+ end,
+ function(callback)
+ self:_loadEndpoints(callback)
+ end
}, callback)
end
function MonitoringAgent:connect(callback)
- local endpoints = misc.split(self._config['endpoints'], '[^,]+')
- self._streams = ConnectionStream:new(self._config['id'], self._config['token'])
+ local endpoints = misc.split(self._config['monitoring_endpoints'], '[^,]+')
+ if #endpoints <= 0 then
+ logging.log(logging.ERR, 'no endpoints')
+ timer.setTimeout(misc.calcJitter(constants.SRV_RECORD_FAILURE_DELAY, constants.SRV_RECORD_FAILURE_DELAY_JITTER), function()
+ process.exit(1)
+ end)
+ return
+ end
+ self._streams = ConnectionStream:new(self._config['monitoring_id'], self._config['monitoring_token'])
self._streams:on('error', function(err)
logging.log(logging.ERR, JSON.stringify(err))
end)
-
self._streams:createConnections(endpoints, callback)
end
@@ -150,7 +160,7 @@ end
function MonitoringAgent.run(options)
if not options then options = {} end
local agent = MonitoringAgent:new(options.stateDirectory, options.configFile)
- async.waterfall({
+ async.series({
function(callback)
agent:loadStates(callback)
end,
View
@@ -1,3 +1,4 @@
-id agentA
-token aToken
-endpoints 127.0.0.1:50041,127.0.0.1:50051,127.0.0.1:50061
+monitoring_id agentA
+monitoring_token aToken
+monitoring_query_endpoints _monitoring_agent._tcp.lon3.stage.monitoring.api.rackspacecloud.com,_monitoring_agent._tcp.ord1.stage.monitoring.api.rackspacecloud.com
+#monitoring_endpoints 127.0.0.1:50041,127.0.0.1:50051,127.0.0.1:50061
Submodule luvit updated from 0653b7 to d3b395
@@ -1,3 +1,3 @@
-id agentA
-token aToken
-endpoints 127.0.0.1:50041,127.0.0.1:50051,127.0.0.1:50061
+monitoring_id REPLACE_ID
+monitoring_token REPLACE_TOKEN
+monitoring_query_endpoints _monitoring_agent._tcp.lon3.stage.monitoring.api.rackspacecloud.com,_monitoring_agent._tcp.ord1.stage.monitoring.api.rackspacecloud.com