Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup. Add ConnectionStream object to funnel all messages #48

Merged
merged 16 commits into from
Apr 13, 2012
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ zip_files = monitoring.zip monitoring-test.zip
sig_files = $(zip_files:%.zip=%.zip.sig)

%.zip:
-ln -fs out/Debug/$@ $@
-ln -fs out/${BUILDTYPE}/$@ $@

%.zip.sig: $(zip_files)
openssl dgst -sign tests/ca/server.key.insecure $(patsubst %.zip.sig, %.zip, $@) > out/Debug/$@
-ln -fs out/Debug/$@ $@
openssl dgst -sign tests/ca/server.key.insecure $(patsubst %.zip.sig, %.zip, $@) > out/${BUILDTYPE}/$@
-ln -fs out/${BUILDTYPE}/$@ $@

all: out/Makefile
$(MAKE) -C out V=1 BUILDTYPE=$(BUILDTYPE) -j4
-ln -fs out/Debug/monitoring-agent monitoring-agent
-ln -fs out/${BUILDTYPE}/monitoring-agent monitoring-agent
$(MAKE) $(sig_files)

out/Release/monitoring-agent: all
Expand Down
72 changes: 35 additions & 37 deletions agents/monitoring/lua/lib/client/client.lua
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ local misc = require('../util/misc')
local loggingUtil = require ('../util/logging')
local AgentProtocolConnection = require('../protocol/connection')
local table = require('table')
local Scheduler = require('../schedule').Scheduler

local fmt = require('string').format

Expand All @@ -49,10 +48,15 @@ function AgentClient:initialize(options)--datacenter, id, token, host, port, tim
self._ping_interval = nil
self._sent_ping_count = 0
self._got_pong_count = 0
self._latency = 0

self._log = loggingUtil.makeLogger(fmt('%s:%s', self._host, self._port))
end

function AgentClient:getDatacenter()
return self._datacenter
end

function AgentClient:_createChecks(manifest)
local checks = {}

Expand All @@ -67,8 +71,12 @@ function AgentClient:_createChecks(manifest)
return checks
end

function AgentClient:getLog()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just have it call self.log(...)? the c:getLog()("STUFF") is sort of ugly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed in ccfc342

client:log(PRIORITY, message, ...)

Good call... I think this looks cleaner.

return self._log
end

function AgentClient:_socketTimeout()
return misc.createJitterTimeout(self._timeout, 19)
return misc.calcJitter(PING_INTERVAL, 19000)
end

function AgentClient:connect()
Expand All @@ -80,12 +88,12 @@ function AgentClient:connect()

-- setup protocol
self.protocol = AgentProtocolConnection:new(self._log, self._id, self._token, cleartext)

self.protocol:on('error', function(err)
self:emit(err)
end)
-- response to messages
self.protocol:on('message', function(msg)
self:emit('message', msg, self)
self.protocol:execute(msg)
end)

Expand All @@ -95,29 +103,11 @@ function AgentClient:connect()
self:emit('error', err)
else
self._ping_interval = msg.result.ping_interval
self:startPingInterval()

-- retrieve manifest
self.protocol:getManifest(function(err, manifest)
if err then
-- TODO Abort connection?
self._log(logging.ERROR, 'Error while retrieving manifest: ' .. err.message)
else
local checks = self:_createChecks(manifest)
self._scheduler = Scheduler:new('scheduler.state', checks, function()
self._scheduler:start()
end)
self._scheduler:on('check', function(check, checkResults)
self._log(logging.DEBUG, 'Check Results')
self._log(logging.DEBUG, checkResults:toString())
self.protocol:sendMetrics(check, checkResults)
end)
end
end)
self:emit('handshake_success')
end
end)
end)
self._log(logging.DBG, fmt('Using timeout %sms', self._timeout))
self._log(logging.DBG, fmt('Using timeout %sms', self:_socketTimeout()))
self._sock.socket:setTimeout(self:_socketTimeout(), function()
self:emit('timeout')
end)
Expand All @@ -130,36 +120,44 @@ function AgentClient:connect()
end)
end

function AgentClient:getLatency()
return self._latency
end

function AgentClient:startPingInterval()
self._log(logging.DEBUG, fmt('Starting ping interval, interval=%dms', self._ping_interval))
function startInterval(this)
local timeout = misc.calcJitter(self._ping_interval, 7000)

function startInterval()
self._pingTimeout = timer.setTimeout(misc.createJitterTimeout(self._ping_interval, 7000), function()
this._log(logging.DEBUG, fmt('Starting ping interval, interval=%dms', this._ping_interval))

this._pingTimeout = timer.setTimeout(timeout, function()
local timestamp = os.time()

self._log(logging.DEBUG, fmt('Sending ping (timestamp=%d,sent_ping_count=%d,got_pong_count=%d)',
timestamp, self._sent_ping_count, self._got_pong_count))
self._sent_ping_count = self._sent_ping_count + 1
self.protocol:sendPing(timestamp, function(err, msg)
this._log(logging.DEBUG, fmt('Sending ping (timestamp=%d,sent_ping_count=%d,got_pong_count=%d)',
timestamp, this._sent_ping_count, this._got_pong_count))
this._sent_ping_count = this._sent_ping_count + 1
this.protocol:sendPing(timestamp, function(err, msg)
if err then
self._log(logging.DEBUG, 'Got an error while sending ping: ' .. tostring(err))
this._log(logging.DEBUG, 'Got an error while sending ping: ' .. tostring(err))
return
end

if msg.result.timestamp then
self._got_pong_count = self._got_pong_count + 1
self._log(logging.DEBUG, fmt('Got pong (sent_ping_count=%d,got_pong_count=%d)',
self._sent_ping_count, self._got_pong_count))
this._got_pong_count = this._got_pong_count + 1
this._log(logging.DEBUG, fmt('Got pong (sent_ping_count=%d,got_pong_count=%d)',
this._sent_ping_count, this._got_pong_count))
else
self._log(logging.DEBUG, 'Got invalid pong response')
this._log(logging.DEBUG, 'Got invalid pong response')
end

startInterval()
this._latency = os.time() - timestamp

startInterval(this)
end)
end)
end

startInterval()
startInterval(self)
end

function AgentClient:clearPingInterval()
Expand Down
132 changes: 132 additions & 0 deletions agents/monitoring/lua/lib/client/connection_messages.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
local bind = require('utils').bind
local timer = require('timer')
local Emitter = require('core').Emitter
local Object = require('core').Object
local misc = require('../util/misc')
local logging = require('logging')
local loggingUtil = require ('../util/logging')
local table = require('table')
local os = require('os')
local Scheduler = require('../schedule').Scheduler

local fmt = require('string').format

function SECONDS(n, jitter)
return misc.calcJitter(n * 1000, jitter * 1000)
end

-- State

local State = Emitter:extend()

function State:initialize(name, messages)
self._name = name
self._messages = messages
self._log = loggingUtil.makeLogger(fmt('State(%s)', self._name))
end

function State:getName()
return self._name
end

-- RegisterCheckState State
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you describe what a RegisterCheckState is in a few words here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in 02a43ca.


local RegisterCheckState = State:extend()
function RegisterCheckState:initialize(messages)
State.initialize(self, 'RegisterCheckState', messages)
self._timeout = SECONDS(20, 10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you pull these values out to the top of the file and describe why they were chosen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this code, because we don't use the timeout in this context. c00ec4d

self._lastFetchTime = 0
end

function RegisterCheckState:_scheduleManifest(client, manifest)
local checks = client:_createChecks(manifest)
self._scheduler = Scheduler:new('scheduler.state', checks, function()
self._scheduler:start()
end)
self._scheduler:on('check', function(check, checkResults)
local client = self._messages:getStream():getClient()
if client then
client.protocol:sendMetrics(check, checkResults)
end
end)
end

function RegisterCheckState:onHandshake()

function run()
local client = self._messages:getStream():getClient()
if client then
client.protocol:getManifest(function(err, manifest)
if err then
-- TODO Abort connection?
client:getLog()(logging.ERROR, 'Error while retrieving manifest: ' .. err.message)
else
self:_scheduleManifest(client, manifest)
end
end)
end
end

-- TODO at some point we want to add logic to update the manifest
if self._lastFetchTime == 0 then
if self._timer then
timer.clearTimer(self._timer)
end
self._log(logging.DEBUG, fmt('registering timeout %d', self._timeout))
self._timer = process.nextTick(run)
self._lastFetchTime = os.time()
end
end

-- Connection Messages

local ConnectionMessages = Emitter:extend()
function ConnectionMessages:initialize(connectionStream)
self._connectionStream = connectionStream
self._states = {}
self:_addState(RegisterCheckState:new(self))
self:on('handshake_success', bind(ConnectionMessages.onHandshake, self))
self:on('client_end', bind(ConnectionMessages.onClientEnd, self))
self:on('message', bind(ConnectionMessages.onMessage, self))
end

function ConnectionMessages:_addState(state)
table.insert(self._states, state)
end

function ConnectionMessages:getStream()
return self._connectionStream
end

function ConnectionMessages:onClientEnd(client)
client:getLog()(logging.INFO, 'Detected client disconnect')
for i in ipairs(self._states) do
if self._states[i].onClientEnd then
self._states[i]:onClientEnd(client)
end
end
end

function ConnectionMessages:onHandshake(client)
client:getLog()(logging.DEBUG, '(onHandshake)')
for i in ipairs(self._states) do
if self._states[i].onHandshake then
self._states[i]:onHandshake(client)
end
end
end

function ConnectionMessages:onMessage(client, msg)
client:getLog()(logging.DEBUG, '(onMessage)')
for i in ipairs(self._states) do
if self._states[i].onMessage then
self._states[i]:onMessage(client, msg)
end
end
client.protocol:processMessage(msg)
end

local exports = {}
exports.State = State
exports.ConnectionMessages = ConnectionMessages
return exports