Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

cleanup. Add ConnectionStream object to funnel all messages #48

Merged
merged 16 commits into from

2 participants

@rphillips
Owner
  • Fixes CFLAGS and Makefile for release
  • Adds ConnectionStream object to funnel all messages
agents/monitoring/lua/monitoring-agent.lua
@@ -28,6 +28,8 @@ local stateFile = require('./lib/state_file')
local MonitoringAgent = Object:extend()
+DEFAULT_STATE_DIRECTORY = '/var/run/agent/states'
@philips
philips added a note

Oops, this shouldn't be here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
agents/monitoring/lua/monitoring-agent.lua
@@ -124,7 +126,6 @@ function MonitoringAgent:connect(callback)
self._streams:on('error', function(err)
logging.log(logging.ERR, fmt('%s:%d -> %s', err.host, err.port, err.message))
end)
-
@philips
philips added a note

Hrm, file's changes are all invalid IMHO

@rphillips Owner

they got pulled in with the last merge from master

@rphillips Owner

fixed in 9fd37fb

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
contrib/config.state
((3 lines not shown))
token aToken
-endpoints 192.168.95.130:50041
+endpoints 192.168.95.130:50041,192.168.95.130:50051,192.168.95.130:50061
@philips
philips added a note

Can you change all of this to 127.0.0.1? No one else has your IP setup.

@rphillips Owner

fixed in fd6581c

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
agents/monitoring/lua/lib/client/connection_messages.lua
((17 lines not shown))
+
+-- State
+
+local State = Emitter:extend()
+
+function State:initialize(name, messages)
+ self._name = name
+ self._messages = messages
+ self._log = loggingUtil.makeLogger(fmt('State(%s)', self._name))
+end
+
+function State:getName()
+ return self._name
+end
+
+-- RegisterCheckState State
@philips
philips added a note

Can you describe what a RegisterCheckState is in a few words here?

@rphillips Owner

Added in 02a43ca.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
agents/monitoring/lua/lib/client/connection_messages.lua
((22 lines not shown))
+function State:initialize(name, messages)
+ self._name = name
+ self._messages = messages
+ self._log = loggingUtil.makeLogger(fmt('State(%s)', self._name))
+end
+
+function State:getName()
+ return self._name
+end
+
+-- RegisterCheckState State
+
+local RegisterCheckState = State:extend()
+function RegisterCheckState:initialize(messages)
+ State.initialize(self, 'RegisterCheckState', messages)
+ self._timeout = SECONDS(20, 10)
@philips
philips added a note

Can you pull these values out to the top of the file and describe why they were chosen?

@rphillips Owner

I removed this code, because we don't use the timeout in this context. c00ec4d

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
agents/monitoring/lua/lib/client/connection_stream.lua
((19 lines not shown))
+ async.forEach(addresses, function(address, callback)
+ local split, client, options
+ split = misc.splitAddress(address)
+ options = {}
+ options.host = split[1]
+ options.port = split[2]
+ options.datacenter = address
+ self:createConnection(options, callback)
+ end)
+ end
+ }, callback)
+end
+
+function ConnectionStream:_setDelay(datacenter)
+ local maxDelay = 5 * 60 * 1000 -- max connection delay in ms
+ local jitter = 7000 -- jitter in ms
@philips
philips added a note

Perhaps we should have a master "tunables" file for all of these delays and jitters?

@rphillips Owner

added a constants file into util/constants.lua

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
agents/monitoring/lua/lib/client/client.lua
@@ -67,8 +71,12 @@ function AgentClient:_createChecks(manifest)
return checks
end
+function AgentClient:getLog()
@philips
philips added a note

Why not just have it call self.log(...)? the c:getLog()("STUFF") is sort of ugly.

@rphillips Owner

Changed in ccfc342

client:log(PRIORITY, message, ...)

Good call... I think this looks cleaner.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
agents/monitoring/lua/lib/client/connection_stream.lua
@@ -90,6 +100,29 @@ function ConnectionStream:reconnect(options, callback)
end)
end
+function ConnectionStream:getClient()
+ local client, min_latency, latency
+ for k, v in pairs(self._clients) do
+ latency = self._clients[k]:getLatency()
+ if client == nil or min_latency > latency then
@philips
philips added a note

It looks like self._latency = 0 is the initializer. Shouldn't it be initialized to max int or null?

Also, what is min_latency initialized to? Shouldn't it be MAX_LATENCY too?

@rphillips Owner

fixed in 67b2375

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@philips

LGTM+1

@rphillips rphillips merged commit 11e3232 into master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
View
8 Makefile
@@ -4,15 +4,15 @@ zip_files = monitoring.zip monitoring-test.zip
sig_files = $(zip_files:%.zip=%.zip.sig)
%.zip:
- -ln -fs out/Debug/$@ $@
+ -ln -fs out/${BUILDTYPE}/$@ $@
%.zip.sig: $(zip_files)
- openssl dgst -sign tests/ca/server.key.insecure $(patsubst %.zip.sig, %.zip, $@) > out/Debug/$@
- -ln -fs out/Debug/$@ $@
+ openssl dgst -sign tests/ca/server.key.insecure $(patsubst %.zip.sig, %.zip, $@) > out/${BUILDTYPE}/$@
+ -ln -fs out/${BUILDTYPE}/$@ $@
all: out/Makefile
$(MAKE) -C out V=1 BUILDTYPE=$(BUILDTYPE) -j4
- -ln -fs out/Debug/monitoring-agent monitoring-agent
+ -ln -fs out/${BUILDTYPE}/monitoring-agent monitoring-agent
$(MAKE) $(sig_files)
out/Release/monitoring-agent: all
View
73 agents/monitoring/lua/lib/client/client.lua
@@ -15,6 +15,7 @@ limitations under the License.
--]]
local os = require('os')
+local consts = require('../util/constants')
local tls = require('tls')
local timer = require('timer')
local Error = require('core').Error
@@ -26,7 +27,6 @@ local misc = require('../util/misc')
local loggingUtil = require ('../util/logging')
local AgentProtocolConnection = require('../protocol/connection')
local table = require('table')
-local Scheduler = require('../schedule').Scheduler
local fmt = require('string').format
@@ -49,10 +49,15 @@ function AgentClient:initialize(options)--datacenter, id, token, host, port, tim
self._ping_interval = nil
self._sent_ping_count = 0
self._got_pong_count = 0
+ self._latency = nil
self._log = loggingUtil.makeLogger(fmt('%s:%s', self._host, self._port))
end
+function AgentClient:getDatacenter()
+ return self._datacenter
+end
+
function AgentClient:_createChecks(manifest)
local checks = {}
@@ -67,8 +72,12 @@ function AgentClient:_createChecks(manifest)
return checks
end
+function AgentClient:log(priority, ...)
+ self._log(priority, unpack({...}))
+end
+
function AgentClient:_socketTimeout()
- return misc.createJitterTimeout(self._timeout, 19)
+ return misc.calcJitter(PING_INTERVAL, consts.SOCKET_TIMEOUT)
end
function AgentClient:connect()
@@ -80,12 +89,12 @@ function AgentClient:connect()
-- setup protocol
self.protocol = AgentProtocolConnection:new(self._log, self._id, self._token, cleartext)
-
self.protocol:on('error', function(err)
self:emit(err)
end)
-- response to messages
self.protocol:on('message', function(msg)
+ self:emit('message', msg, self)
self.protocol:execute(msg)
end)
@@ -95,29 +104,11 @@ function AgentClient:connect()
self:emit('error', err)
else
self._ping_interval = msg.result.ping_interval
- self:startPingInterval()
-
- -- retrieve manifest
- self.protocol:getManifest(function(err, manifest)
- if err then
- -- TODO Abort connection?
- self._log(logging.ERROR, 'Error while retrieving manifest: ' .. err.message)
- else
- local checks = self:_createChecks(manifest)
- self._scheduler = Scheduler:new('scheduler.state', checks, function()
- self._scheduler:start()
- end)
- self._scheduler:on('check', function(check, checkResults)
- self._log(logging.DEBUG, 'Check Results')
- self._log(logging.DEBUG, checkResults:toString())
- self.protocol:sendMetrics(check, checkResults)
- end)
- end
- end)
+ self:emit('handshake_success')
end
end)
end)
- self._log(logging.DBG, fmt('Using timeout %sms', self._timeout))
+ self._log(logging.DBG, fmt('Using timeout %sms', self:_socketTimeout()))
self._sock.socket:setTimeout(self:_socketTimeout(), function()
self:emit('timeout')
end)
@@ -130,36 +121,44 @@ function AgentClient:connect()
end)
end
+function AgentClient:getLatency()
+ return self._latency
+end
+
function AgentClient:startPingInterval()
- self._log(logging.DEBUG, fmt('Starting ping interval, interval=%dms', self._ping_interval))
+ function startInterval(this)
+ local timeout = misc.calcJitter(self._ping_interval, consts.PING_INTERVAL_JITTER)
- function startInterval()
- self._pingTimeout = timer.setTimeout(misc.createJitterTimeout(self._ping_interval, 7000), function()
+ this._log(logging.DEBUG, fmt('Starting ping interval, interval=%dms', this._ping_interval))
+
+ this._pingTimeout = timer.setTimeout(timeout, function()
local timestamp = os.time()
- self._log(logging.DEBUG, fmt('Sending ping (timestamp=%d,sent_ping_count=%d,got_pong_count=%d)',
- timestamp, self._sent_ping_count, self._got_pong_count))
- self._sent_ping_count = self._sent_ping_count + 1
- self.protocol:sendPing(timestamp, function(err, msg)
+ this._log(logging.DEBUG, fmt('Sending ping (timestamp=%d,sent_ping_count=%d,got_pong_count=%d)',
+ timestamp, this._sent_ping_count, this._got_pong_count))
+ this._sent_ping_count = this._sent_ping_count + 1
+ this.protocol:sendPing(timestamp, function(err, msg)
if err then
- self._log(logging.DEBUG, 'Got an error while sending ping: ' .. tostring(err))
+ this._log(logging.DEBUG, 'Got an error while sending ping: ' .. tostring(err))
return
end
if msg.result.timestamp then
- self._got_pong_count = self._got_pong_count + 1
- self._log(logging.DEBUG, fmt('Got pong (sent_ping_count=%d,got_pong_count=%d)',
- self._sent_ping_count, self._got_pong_count))
+ this._got_pong_count = this._got_pong_count + 1
+ this._log(logging.DEBUG, fmt('Got pong (sent_ping_count=%d,got_pong_count=%d)',
+ this._sent_ping_count, this._got_pong_count))
else
- self._log(logging.DEBUG, 'Got invalid pong response')
+ this._log(logging.DEBUG, 'Got invalid pong response')
end
- startInterval()
+ this._latency = os.time() - timestamp
+
+ startInterval(this)
end)
end)
end
- startInterval()
+ startInterval(self)
end
function AgentClient:clearPingInterval()
View
133 agents/monitoring/lua/lib/client/connection_messages.lua
@@ -0,0 +1,133 @@
+local bind = require('utils').bind
+local timer = require('timer')
+local Emitter = require('core').Emitter
+local Object = require('core').Object
+local misc = require('../util/misc')
+local logging = require('logging')
+local loggingUtil = require ('../util/logging')
+local table = require('table')
+local os = require('os')
+local Scheduler = require('../schedule').Scheduler
+
+local fmt = require('string').format
+
+-- State
+
+local State = Emitter:extend()
+
+function State:initialize(name, messages)
+ self._name = name
+ self._messages = messages
+ self._log = loggingUtil.makeLogger(fmt('State(%s)', self._name))
+end
+
+function State:getName()
+ return self._name
+end
+
+--[[ RegisterCheckState State
+ Whenever a client handshake is successful the onHandshake function gets
+ called. We only want to load the manifest once for now, so this small
+ state machine handles this.
+
+ We may want to add more advance logic to pull the manifest at certain
+ intervals.
+]]--
+
+local RegisterCheckState = State:extend()
+function RegisterCheckState:initialize(messages)
+ State.initialize(self, 'RegisterCheckState', messages)
+ self._lastFetchTime = 0
+end
+
+function RegisterCheckState:_scheduleManifest(client, manifest)
+ local checks = client:_createChecks(manifest)
+ self._scheduler = Scheduler:new('scheduler.state', checks, function()
+ self._scheduler:start()
+ end)
+ self._scheduler:on('check', function(check, checkResults)
+ local client = self._messages:getStream():getClient()
+ if client then
+ client.protocol:sendMetrics(check, checkResults)
+ end
+ end)
+end
+
+function RegisterCheckState:onHandshake()
+
+ function run()
+ local client = self._messages:getStream():getClient()
+ if client then
+ client.protocol:getManifest(function(err, manifest)
+ if err then
+ -- TODO Abort connection?
+ client:log(logging.ERROR, 'Error while retrieving manifest: ' .. err.message)
+ else
+ self:_scheduleManifest(client, manifest)
+ end
+ end)
+ end
+ end
+
+ -- TODO at some point we want to add logic to update the manifest
+ if self._lastFetchTime == 0 then
+ if self._timer then
+ timer.clearTimer(self._timer)
+ end
+ self._timer = process.nextTick(run)
+ self._lastFetchTime = os.time()
+ end
+end
+
+-- Connection Messages
+
+local ConnectionMessages = Emitter:extend()
+function ConnectionMessages:initialize(connectionStream)
+ self._connectionStream = connectionStream
+ self._states = {}
+ self:_addState(RegisterCheckState:new(self))
+ self:on('handshake_success', bind(ConnectionMessages.onHandshake, self))
+ self:on('client_end', bind(ConnectionMessages.onClientEnd, self))
+ self:on('message', bind(ConnectionMessages.onMessage, self))
+end
+
+function ConnectionMessages:_addState(state)
+ table.insert(self._states, state)
+end
+
+function ConnectionMessages:getStream()
+ return self._connectionStream
+end
+
+function ConnectionMessages:onClientEnd(client)
+ client:log(logging.INFO, 'Detected client disconnect')
+ for i in ipairs(self._states) do
+ if self._states[i].onClientEnd then
+ self._states[i]:onClientEnd(client)
+ end
+ end
+end
+
+function ConnectionMessages:onHandshake(client)
+ client:log(logging.DEBUG, '(onHandshake)')
+ for i in ipairs(self._states) do
+ if self._states[i].onHandshake then
+ self._states[i]:onHandshake(client)
+ end
+ end
+end
+
+function ConnectionMessages:onMessage(client, msg)
+ client:log(logging.DEBUG, '(onMessage)')
+ for i in ipairs(self._states) do
+ if self._states[i].onMessage then
+ self._states[i]:onMessage(client, msg)
+ end
+ end
+ client.protocol:processMessage(msg)
+end
+
+local exports = {}
+exports.State = State
+exports.ConnectionMessages = ConnectionMessages
+return exports
View
116 agents/monitoring/lua/lib/client/connection_stream.lua
@@ -22,17 +22,21 @@ local fmt = require('string').format
local async = require('async')
local AgentClient = require('./client').AgentClient
+local ConnectionMessages = require('./connection_messages').ConnectionMessages
local logging = require('logging')
+local consts = require('../util/constants')
local misc = require('../util/misc')
-local CONNECT_TIMEOUT = 6000
+local fmt = require('string').format
local ConnectionStream = Emitter:extend()
function ConnectionStream:initialize(id, token)
self._id = id
self._token = token
self._clients = {}
+ self._unauthedClients = {}
self._delays = {}
+ self._messages = ConnectionMessages:new(self)
end
--[[
@@ -43,18 +47,37 @@ callback - Callback called with (err) when all the connections have been
established.
--]]
function ConnectionStream:createConnections(addresses, callback)
- local client
+ async.series({
+ -- connect
+ function(callback)
+ async.forEach(addresses, function(address, callback)
+ local split, client, options
+ split = misc.splitAddress(address)
+ options = {}
+ options.host = split[1]
+ options.port = split[2]
+ options.datacenter = address
+ self:createConnection(options, callback)
+ end)
+ end
+ }, callback)
+end
+
+function ConnectionStream:_setDelay(datacenter)
+ local maxDelay = consts.DATACENTER_MAX_DELAY
+ local jitter = consts.DATACENTER_MAX_DELAY_JITTER
+ local previousDelay = self._delays[datacenter]
+ local delay
+
+ if previousDelay == nil then
+ self._delays[datacenter] = 0
+ previousDelay = 0
+ end
- async.forEach(addresses, function(address, callback)
- local client, split, host, port, options
+ delay = math.min(previousDelay, maxDelay) + (jitter * math.random())
+ self._delays[datacenter] = delay
- split = misc.splitAddress(address)
- options = {}
- options.host = split[1]
- options.port = split[2]
- options.datacenter = address
- client = self:createConnection(options, callback)
- end, callback)
+ return delay
end
--[[
@@ -67,22 +90,8 @@ options - datacenter, host, port
callback - Callback called with (err)
]]--
function ConnectionStream:reconnect(options, callback)
- local previous_delay, delay, max_delay, jitter, value
local datacenter = options.datacenter
-
- max_delay = 5 * 60 * 1000 -- max connection delay in ms
- jitter = 7000 -- jitter in ms
-
- previous_delay = self._delays[datacenter]
-
- -- First reconnection attempt
- if self._delays[datacenter] == nil then
- self._delays[datacenter] = 0
- previous_delay = 0
- end
-
- delay = math.min(previous_delay, max_delay) + (jitter * math.random())
- self._delays[datacenter] = delay
+ local delay = self:_setDelay(datacenter)
logging.log(logging.INFO, fmt('%s:%d -> Retrying connection in %dms', options.host, options.port, delay))
timer.setTimeout(delay, function()
@@ -90,6 +99,33 @@ function ConnectionStream:reconnect(options, callback)
end)
end
+function ConnectionStream:getClient()
+ local client
+ local latency
+ local min_latency = 2147483647
+ for k, v in pairs(self._clients) do
+ latency = self._clients[k]:getLatency()
+ if latency == nil then
+ client = self._clients[k]
+ elseif min_latency > latency then
+ client = self._clients[k]
+ min_latency = latency
+ end
+ end
+ return client
+end
+
+--[[
+Move an unauthenticated client to the list of clients that have been authenticated.
+client - the client.
+]]--
+function ConnectionStream:_promoteClient(client)
+ local datacenter = client:getDatacenter()
+ client:log(logging.INFO, fmt('Connection has been authenticated to %s', datacenter))
+ self._clients[datacenter] = client
+ self._unauthedClients[datacenter] = nil
+end
+
--[[
Create and establish a connection to the endpoint.
@@ -99,18 +135,17 @@ port - Port.
callback - Callback called with (err)
]]--
function ConnectionStream:createConnection(options, callback)
-
local opts = misc.merge({
id = self._id,
token = self._token,
- timeout = CONNECT_TIMEOUT
+ timeout = consts.CONNECT_TIMEOUT
}, options)
local client = AgentClient:new(opts)
client:on('error', function(err)
- err.host = host
- err.port = port
- err.datacenter = datacenter
+ err.host = opts.host
+ err.port = opts.port
+ err.datacenter = opts.datacenter
client:destroy()
self:reconnect(opts, callback)
@@ -120,16 +155,29 @@ function ConnectionStream:createConnection(options, callback)
end)
client:on('timeout', function()
+ logging.log(logging.DEBUG, fmt('%s:%d -> Client Timeout', opts.host, opts.port))
client:destroy()
self:reconnect(opts, callback)
end)
client:on('end', function()
- logging.log(logging.DEBUG, fmt('%s:%d -> Remote endpoint closed the connection', host, port))
+ self:emit('client_end', client)
+ logging.log(logging.DEBUG, fmt('%s:%d -> Remote endpoint closed the connection', opts.host, opts.port))
client:destroy()
self:reconnect(opts, callback)
end)
+ client:on('handshake_success', function()
+ self:_promoteClient(client)
+ self._delays[options.datacenter] = 0
+ client:startPingInterval()
+ self._messages:emit('handshake_success', client)
+ end)
+
+ client:on('message', function(msg)
+ self._messages:emit('message', client, msg)
+ end)
+
client:connect(function(err)
if err then
client:destroy()
@@ -139,10 +187,8 @@ function ConnectionStream:createConnection(options, callback)
end
client.datacenter = datacenter
- self._clients[datacenter] = client
+ self._unauthedClients[datacenter] = client
- -- TODO should do this after auth
- self._delays[datacenter] = 0
callback();
end)
View
8 agents/monitoring/lua/lib/util/constants.lua
@@ -0,0 +1,8 @@
+local exports = {}
+exports.CONNECT_TIMEOUT = 6000
+exports.SOCKET_TIMEOUT = 10000
+exports.PING_INTERVAL_JITTER = 7000
+
+exports.DATACENTER_MAX_DELAY = 5 * 60 * 1000 -- max connection delay
+exports.DATACENTER_MAX_DELAY_JITTER = 7000
+return exports
View
6 agents/monitoring/lua/lib/util/misc.lua
@@ -90,8 +90,8 @@ function toString(tbl)
end
end
-function createJitterTimeout(n, jitter)
- return n + (jitter * math.random())
+function calcJitter(n, jitter)
+ return math.floor(n + (jitter * math.random()))
end
-- merge tables
@@ -113,7 +113,7 @@ end
--[[ Exports ]]--
local exports = {}
-exports.createJitterTimeout = createJitterTimeout
+exports.calcJitter = calcJitter
exports.merge = merge
exports.splitAddress = splitAddress
exports.split = split
View
4 common.gypi
@@ -36,8 +36,7 @@
},
},
'Release': {
- 'defines': [ 'NDEBUG' ],
- 'cflags': [ '-Os' ],
+ 'cflags': [ '-g', '-O3', '-fdata-sections', '-ffunction-sections' ],
'msvs_settings': {
'VCCLCompilerTool': {
'target_conditions': [
@@ -150,6 +149,7 @@
'GCC_THREADSAFE_STATICS': 'NO', # -fno-threadsafe-statics
'GCC_VERSION': '4.2',
'GCC_WARN_ABOUT_MISSING_NEWLINE': 'YES', # -Wnewline-eof
+ 'GCC_OPTIMIZATION_LEVEL': '3', # -O3
'MACOSX_DEPLOYMENT_TARGET': '10.4', # -mmacosx-version-min=10.4
'PREBINDING': 'NO', # No -Wl,-prebind
'USE_HEADERMAP': 'NO',
View
4 contrib/config.state
@@ -1,3 +1,3 @@
-id testing
+id agentA
token aToken
-endpoints 192.168.95.130:50041
+endpoints 127.0.0.1:50041,127.0.0.1:50051,127.0.0.1:50061
2  deps/luvit
@@ -1 +1 @@
-Subproject commit eebd234fa7fbd33e9994f2a28ddd8be30b3da41b
+Subproject commit feb76375f3c64033312e3227aa1df76250bb0645
Something went wrong with that request. Please try again.