Skip to content

Commit

Permalink
Merge pull request Kong#471 from Mashape/hotfix/mashape-analytics
Browse files Browse the repository at this point in the history
[hotfix/analytics] more robust buffer

Former-commit-id: 2c9ae153921962683cab1e3149b6c91e9fe28da3
  • Loading branch information
thibaultcha committed Aug 13, 2015
2 parents e000908 + c60fcfa commit a873061
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 25 deletions.
5 changes: 3 additions & 2 deletions kong/plugins/log_serializers/alf.lua
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ function _M.serialize_entry(ngx)

local alf_req_body = analytics_data.req_body or ""
local alf_res_body = analytics_data.res_body or ""
local alf_req_post_args = analytics_data.req_post_args or {}

-- timers
local proxy_started_at, proxy_ended_at = ngx.ctx.proxy_started_at, ngx.ctx.proxy_ended_at
Expand Down Expand Up @@ -128,7 +129,7 @@ function _M.serialize_entry(ngx)
bodySize = string.len(alf_req_body),
postData = {
mimeType = alf_req_mimeType,
params = dic_to_array(ngx.req.get_post_args()),
params = dic_to_array(alf_req_post_args),
text = alf_req_body
}
},
Expand Down Expand Up @@ -182,7 +183,7 @@ function _M.new_alf(ngx, token, environment)
version = "1.2",
creator = {
name = "mashape-analytics-agent-kong",
version = "1.0.1"
version = "1.0.2"
},
entries = {_M.serialize_entry(ngx)}
}
Expand Down
48 changes: 33 additions & 15 deletions kong/plugins/mashape-analytics/buffer.lua
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-- ALF buffer module
--
-- This module contains a buffered array of ALF objects. When the buffer is full (max number of entries
-- or max payload size), it is converted to a JSON payload and moved to another buffer of payloads to be
-- or max payload size), it is converted to a JSON payload and moved a queue of payloads to be
-- sent to the server.
--
-- 1 buffer of ALFs (gets flushed once it reached the mmax size)
Expand Down Expand Up @@ -30,6 +30,7 @@ local ANALYTICS_SOCKET = {

local buffer_mt = {}
buffer_mt.__index = buffer_mt
buffer_mt.MAX_BUFFER_SIZE = MAX_BUFFER_SIZE

-- A handler for delayed batch sending. When no call has been made for X seconds
-- (X being conf.delay), we send the batch to keep analytics as close to real-time
Expand Down Expand Up @@ -85,8 +86,15 @@ function buffer_mt:add_alf(alf)
local next_n_entries = #self.entries + 1
local alf_size = string.len(str)

-- If the alf_size exceeds the payload limit by itself, we have a big problem
if alf_size > self.MAX_SIZE then
ngx.log(ngx.ERR, string.format("[mashape-analytics] ALF size exceeded the maximum size (%sMB) accepted by the socket server. Dropping it.",
self.MAX_SIZE / MB))
return
end

-- If size or entries exceed the max limits
local full = next_n_entries > self.MAX_ENTRIES or self:get_size() > self.MAX_SIZE
local full = next_n_entries > self.MAX_ENTRIES or (self:get_size() + alf_size) > self.MAX_SIZE
if full then
self:flush()
-- Batch size reached, let's send the data
Expand Down Expand Up @@ -129,7 +137,11 @@ end
-- 3. Empty the buffer and reset the current buffer size
function buffer_mt:flush()
local payload = self:payload_string()
table.insert(self.sending_queue, payload)
table.insert(self.sending_queue, {
payload = payload,
n_entries = #self.entries,
size = self:get_size()
})
self.entries = {}
self.entries_size = 0
end
Expand All @@ -145,23 +157,29 @@ function buffer_mt.send_batch(premature, self)
return
end

-- Let's send the oldest payload in our buffer
local message = self.sending_queue[1]
-- Let's send the oldest batch in our queue
local batch_to_send = table.remove(self.sending_queue, 1)

local batch_saved = false
local drop_batch = false
local client = http:new()
client:set_timeout(50000) -- 5 sec

local ok, err = client:connect(ANALYTICS_SOCKET.host, ANALYTICS_SOCKET.port)
if ok then
local res, err = client:request({path = ANALYTICS_SOCKET.path, body = message})
local res, err = client:request({path = ANALYTICS_SOCKET.path, body = batch_to_send.payload})
if not res then
ngx.log(ngx.ERR, "[mashape-analytics] failed to send batch: "..err)
ngx.log(ngx.ERR, string.format("[mashape-analytics] failed to send batch (%s ALFs %s bytes): %s",
batch_to_send.n_entries, batch_to_send.size, err))
elseif res.status == 200 then
batch_saved = true
drop_batch = true
ngx.log(ngx.DEBUG, string.format("[mashape-analytics] successfully saved the batch. (%s)", res.body))
elseif res.status == 400 then
ngx.log(ngx.ERR, string.format("[mashape-analytics] socket server refused the batch (%s ALFs %s bytes). Dropping batch. Status: (%s) Error: (%s)",
batch_to_send.n_entries, batch_to_send.size, res.status, res.body))
drop_batch = true
else
ngx.log(ngx.ERR, string.format("[mashape-analytics] socket server refused the batch. Status: (%s) Error: (%s)", res.status, res.body))
ngx.log(ngx.ERR, string.format("[mashape-analytics] socket server could not save the batch (%s ALFs %s bytes). Status: (%s) Error: (%s)",
batch_to_send.n_entries, batch_to_send.size, res.status, res.body))
end

-- close connection, or put it into the connection pool
Expand All @@ -177,16 +195,16 @@ function buffer_mt.send_batch(premature, self)
ngx.log(ngx.ERR, "[mashape-analytics] failed to connect to the socket server: "..err)
end

if batch_saved then
-- Remove the payload that was sent
table.remove(self.sending_queue, 1)
if not drop_batch then
-- If the batch is not dropped, then add it back to the end of the queue and it will be tried again later
table.insert(self.sending_queue, batch_to_send)
end

self.lock_sending = false

-- Keep sendind data if the buffer is not yet emptied
-- Keep sendind data if the queue is not yet emptied
if #self.sending_queue > 0 then
local ok, err = ngx.timer.at(0, self.send_batch, self)
local ok, err = ngx.timer.at(2, self.send_batch, self)
if not ok then
ngx.log(ngx.ERR, "[mashape-analytics] failed to create batch retry timer: ", err)
end
Expand Down
17 changes: 15 additions & 2 deletions kong/plugins/mashape-analytics/handler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,24 @@ function AnalyticsHandler:access(conf)
-- Retrieve and keep in memory the bodies for this request
ngx.ctx.analytics = {
req_body = "",
res_body = ""
res_body = "",
req_post_args = {}
}

ngx.req.read_body()

local status, res = pcall(ngx.req.get_post_args)
if not status then
if res == "requesty body in temp file not supported" then
ngx.log(ngx.ERR, "[mashape-analytics] cannot read request body from temporary file. Try increasing the client_body_buffer_size directive.")
else
ngx.log(ngx.ERR, res)
end
else
ngx.ctx.analytics.req_post_args = res
end

if conf.log_body then
ngx.req.read_body()
ngx.ctx.analytics.req_body = ngx.req.get_body_data()
end
end
Expand Down
2 changes: 1 addition & 1 deletion spec/plugins/mashape-analytics/alf_serializer_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ describe("ALF serializer", function()
assert.equal("1.2", alf.har.log.version)
assert.truthy(alf.har.log.creator)
assert.equal("mashape-analytics-agent-kong", alf.har.log.creator.name)
assert.equal("1.0.1", alf.har.log.creator.version)
assert.equal("1.0.2", alf.har.log.creator.version)
assert.truthy(alf.har.log.entries)
assert.equal(1, #(alf.har.log.entries))
end)
Expand Down
47 changes: 46 additions & 1 deletion spec/plugins/mashape-analytics/buffer_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,56 @@ describe("ALFBuffer", function()
alf_buffer.flush:revert()
end)
end)
it("should call :flush() when reaching its max size", function()
-- How many stubs to reach the limit?
local COMMA_LEN = string.len(",")
local JSON_ARR_LEN = string.len("[]")
local max_n_stubs = math.ceil(ALFBuffer.MAX_BUFFER_SIZE / (STUB_LEN + COMMA_LEN)) -- + the comma after each ALF in the JSON payload

-- Create a new buffer
local buffer = ALFBuffer.new({batch_size = max_n_stubs + 100, delay = 2})

local s = spy.on(buffer, "flush")

-- Add max_n_stubs - 1 entries
for i = 1, max_n_stubs - 1 do
buffer:add_alf(ALF_STUB)
end

assert.spy(s).was_not_called()

-- We should have `(max_n_stubs - 1) * (STUB_LEN + COMMA_LEN) + JSON_ARR_LEN - COMMA_LEN` because no comma for latest object`
-- as our current buffer size.
assert.equal((max_n_stubs - 1) * (STUB_LEN + COMMA_LEN) + JSON_ARR_LEN - COMMA_LEN, buffer:get_size())

-- adding one more entry
buffer:add_alf(ALF_STUB)
assert.spy(s).was.called()
end)
it("should drop an ALF if it is too big by itself", function()
local str = string.rep(".", ALFBuffer.MAX_BUFFER_SIZE)
local huge_alf = {foo = str}

local buffer = ALFBuffer.new(CONF_STUB)

local s = spy.on(_G.ngx, "log")

buffer:add_alf(huge_alf)

assert.spy(s).was.called()
assert.equal(0, buffer.entries_size)
assert.equal(0, #buffer.entries)

finally(function()
_G.ngx.log:revert()
end)
end)
describe(":flush()", function()
it("should have emptied the current buffer and added a payload to be sent", function()
assert.equal(1, #alf_buffer.entries)
assert.equal(1, #alf_buffer.sending_queue)
assert.equal("string", type(alf_buffer.sending_queue[1]))
assert.equal("table", type(alf_buffer.sending_queue[1]))
assert.equal("string", type(alf_buffer.sending_queue[1].payload))
assert.equal(STUB_LEN, alf_buffer.entries_size)
end)
end)
Expand Down
8 changes: 4 additions & 4 deletions spec/plugins/mashape-analytics/fixtures/requests.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ return {
get_method = function() return "GET" end,
http_version = function() return 1.1 end,
get_headers = function() return {["Accept"] = "/*/", ["Host"] = "mockbin.com"} end,
get_uri_args = function() return {["hello"] = "world", ["foo"] = "bar", ["number"] = 2} end,
get_post_args = function() return {["hello"] = {"world", "earth"}} end
get_uri_args = function() return {["hello"] = "world", ["foo"] = "bar", ["number"] = 2} end
},
resp = {
get_headers = function() return {["Connection"] = "close", ["Content-Type"] = "application/json", ["Content-Length"] = "934"} end
Expand All @@ -30,6 +29,7 @@ return {
analytics = {
req_body = "hello=world&hello=earth",
res_body = "{\"message\":\"response body\"}",
req_post_args = {["hello"] = {"world", "earth"}},
response_received = 143284457211
}
}
Expand Down Expand Up @@ -100,8 +100,7 @@ return {
get_method = function() return "GET" end,
http_version = function() return 1.1 end,
get_headers = function() return {["Accept"] = "/*/", ["Host"] = "mockbin.com"} end,
get_uri_args = function() return {["hello"] = "world", ["foo"] = "bar"} end,
get_post_args = function() return {["hello"] = {"world", "earth"}} end
get_uri_args = function() return {["hello"] = "world", ["foo"] = "bar"} end
},
resp = {
get_headers = function() return {["Connection"] = "close", ["Content-Type"] = "application/json", ["Content-Length"] = "934"} end
Expand All @@ -122,6 +121,7 @@ return {
analytics = {
req_body = "hello=world&hello=earth",
res_body = "{\"message\":\"response body\"}",
req_post_args = {["hello"] = {"world", "earth"}},
response_received = 143284457211
}
}
Expand Down

0 comments on commit a873061

Please sign in to comment.