Skip to content

Commit

Permalink
[Feature] Lua_clickhouse: Add optional row callback for large selections
Browse files Browse the repository at this point in the history
  • Loading branch information
vstakhov committed Nov 30, 2020
1 parent 3fc494b commit a092f57
Showing 1 changed file with 15 additions and 9 deletions.
24 changes: 15 additions & 9 deletions lualib/lua_clickhouse.lua
Expand Up @@ -95,7 +95,7 @@ end
exports.row_to_tsv = row_to_tsv
-- Parses JSONEachRow reply from CH
local function parse_clickhouse_response_json_eachrow(params, data)
local function parse_clickhouse_response_json_eachrow(params, data, row_cb)
local ucl = require "ucl"
if data == nil then
Expand Down Expand Up @@ -125,7 +125,11 @@ local function parse_clickhouse_response_json_eachrow(params, data)
if plain_row and #plain_row > 1 then
local parsed_row = parse_string(plain_row)
if parsed_row then
table.insert(parsed_rows, parsed_row)
if row_cb then
row_cb(parsed_row)
else
table.insert(parsed_rows, parsed_row)
end
end
end
end
Expand Down Expand Up @@ -169,7 +173,7 @@ local function parse_clickhouse_response_json(params, data)
end
-- Helper to generate HTTP closure
local function mk_http_select_cb(upstream, params, ok_cb, fail_cb)
local function mk_http_select_cb(upstream, params, ok_cb, fail_cb, row_cb)
local function http_cb(err_message, code, data, _)
if code ~= 200 or err_message then
if not err_message then err_message = data end
Expand All @@ -185,7 +189,7 @@ local function mk_http_select_cb(upstream, params, ok_cb, fail_cb)
upstream:fail()
else
upstream:ok()
local rows = parse_clickhouse_response_json_eachrow(params, data)
local rows = parse_clickhouse_response_json_eachrow(params, data, row_cb)
if rows then
if ok_cb then
Expand Down Expand Up @@ -264,16 +268,17 @@ end
-- @param {string} query select query (passed in HTTP body)
-- @param {function} ok_cb callback to be called in case of success
-- @param {function} fail_cb callback to be called in case of some error
-- @param {function} row_cb optional callback to be called on each parsed data row (instead of table insertion)
-- @return {boolean} whether a connection was successful
-- @example
--
--]]
exports.select = function (upstream, settings, params, query, ok_cb, fail_cb)
exports.select = function (upstream, settings, params, query, ok_cb, fail_cb, row_cb)
local http_params = {}
for k,v in pairs(params) do http_params[k] = v end
http_params.callback = mk_http_select_cb(upstream, http_params, ok_cb, fail_cb)
http_params.callback = mk_http_select_cb(upstream, http_params, ok_cb, fail_cb, row_cb)
http_params.gzip = settings.use_gzip
http_params.mime_type = 'text/plain'
http_params.timeout = settings.timeout or default_timeout
Expand Down Expand Up @@ -302,7 +307,7 @@ end
--[[[
-- @function lua_clickhouse.select_sync(upstream, settings, params, query,
ok_cb, fail_cb)
ok_cb, fail_cb, row_cb)
-- Make select request to clickhouse
-- @param {upstream} upstream clickhouse server upstream
-- @param {table} settings global settings table:
Expand All @@ -315,13 +320,14 @@ end
-- @param {string} query select query (passed in HTTP body)
-- @param {function} ok_cb callback to be called in case of success
-- @param {function} fail_cb callback to be called in case of some error
-- @param {function} row_cb optional callback to be called on each parsed data row (instead of table insertion)
-- @return
-- {string} error message if exists
-- nil | {rows} | {http_response}
-- @example
--
--]]
exports.select_sync = function (upstream, settings, params, query, ok_cb, fail_cb)
exports.select_sync = function (upstream, settings, params, query, row_cb)
local http_params = {}
for k,v in pairs(params) do http_params[k] = v end
Expand Down Expand Up @@ -357,7 +363,7 @@ exports.select_sync = function (upstream, settings, params, query, ok_cb, fail_c
return response.content, response
else
lua_util.debugm(N, http_params.log_obj, "clickhouse select response: %1", response)
local rows = parse_clickhouse_response_json_eachrow(params, response.content)
local rows = parse_clickhouse_response_json_eachrow(params, response.content, row_cb)
return nil, rows
end
end
Expand Down

0 comments on commit a092f57

Please sign in to comment.