Skip to content

Commit

Permalink
[Fix] Clickhouse: Avoid potential races in collection
Browse files Browse the repository at this point in the history
  • Loading branch information
vstakhov committed Nov 5, 2019
1 parent a2af525 commit c9e6e26
Showing 1 changed file with 36 additions and 6 deletions.
42 changes: 36 additions & 6 deletions src/plugins/lua/clickhouse.lua
Expand Up @@ -32,6 +32,7 @@ local custom_rows = {}
local nrows = 0
local used_memory = 0
local last_collection = 0
local final_call = false -- If the final collection has been started
local schema_version = 8 -- Current schema version

local settings = {
Expand Down Expand Up @@ -375,7 +376,7 @@ local function clickhouse_check_symbol(task, settings_field_name, fields_table,
return false
end

local function clickhouse_send_data(task, ev_base, why)
local function clickhouse_send_data(task, ev_base, why, gen_rows, cust_rows)
local log_object = task or rspamd_config
local upstream = settings.upstream:get_upstream_round_robin()
local ip_addr = upstream:get_addr():to_string(true)
Expand Down Expand Up @@ -429,11 +430,11 @@ local function clickhouse_send_data(task, ev_base, why)
clickhouse_groups_row(fields)
end

send_data('generic data', data_rows,
send_data('generic data', gen_rows,
string.format('INSERT INTO rspamd (%s)',
table.concat(fields, ',')))

for k,crows in pairs(custom_rows) do
for k,crows in pairs(cust_rows) do
if #crows > 1 then
send_data('custom data ('..k..')', crows,
settings.custom_rules[k].first_row())
Expand Down Expand Up @@ -929,19 +930,26 @@ local function clickhouse_maybe_send_data_periodic(cfg, ev_base, now)
return settings.check_timeout
end

if final_call then
lua_util.debugm(N, cfg, "no need to send data, final call has been issued")
return 0
end

if settings.limits.max_rows > 0 then
if nrows > settings.limits.max_rows then
need_collect = true
reason = string.format('limit of rows has been reached: %d', nrows)
end
end
if settings.limits.max_interval > 0 then

if last_collection > 0 and settings.limits.max_interval > 0 then
if now - last_collection > settings.limits.max_interval then
need_collect = true
reason = string.format('limit of time since last collection has been reached: %d seconds passed',
(now - last_collection) - settings.limits.max_interval)
end
end

if settings.limits.max_memory > 0 then
if used_memory >= settings.limits.max_memory then
need_collect = true
Expand All @@ -950,14 +958,22 @@ local function clickhouse_maybe_send_data_periodic(cfg, ev_base, now)
end
end

if last_collection == 0 then
last_collection = now
end

if need_collect then
clickhouse_send_data(nil, ev_base, reason)
-- Do it atomic
local saved_rows = data_rows
local saved_custom = custom_rows
nrows = 0
last_collection = now
used_memory = 0
data_rows = {}
custom_rows = {}

clickhouse_send_data(nil, ev_base, reason, saved_rows, saved_custom)

if settings.collect_garbadge then
collectgarbage()
end
Expand Down Expand Up @@ -1243,7 +1259,21 @@ if opts then
})
rspamd_config:register_finish_script(function(task)
if nrows > 0 then
clickhouse_send_data(task, nil, 'final collection')
final_call = true
local saved_rows = data_rows
local saved_custom = custom_rows

nrows = 0
data_rows = {}
used_memory = 0
custom_rows = {}

clickhouse_send_data(task, nil, 'final collection',
saved_rows, saved_custom)

if settings.collect_garbadge then
collectgarbage()
end
end
end)
-- Create tables on load
Expand Down

0 comments on commit c9e6e26

Please sign in to comment.