Skip to content

Commit

Permalink
net.box: rewrite response decoder in C
Browse files Browse the repository at this point in the history
This patch moves method_decoder table from Lua to C. This is a step
towards rewriting performance-critical parts of net.box in C.

Part of #6241
  • Loading branch information
locker committed Jul 30, 2021
1 parent b0cd298 commit 1087c1e
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 96 deletions.
230 changes: 173 additions & 57 deletions src/box/lua/net_box.c
Expand Up @@ -664,10 +664,72 @@ netbox_encode_method(struct lua_State *L)
return 0;
}

/**
* Decode IPROTO_DATA into tuples array.
* @param L Lua stack to push result on.
* @param data MessagePack.
/*
* This function handles a response that is supposed to have an empty body
* (e.g. IPROTO_PING result). It doesn't decode anything per se. Instead it
* simply pushes nil to Lua stack and advances the data ptr to data_end.
*/
static void
netbox_decode_nil(struct lua_State *L, const char **data,
const char *data_end, struct tuple_format *format)
{
(void)format;
*data = data_end;
lua_pushnil(L);
}

/*
* This helper skips a MessagePack map header and IPROTO_DATA key so that
* *data points to the actual response content.
*/
static void
netbox_skip_to_data(const char **data)
{
assert(mp_typeof(**data) == MP_MAP);
uint32_t map_size = mp_decode_map(data);
/* Until 2.0 body has no keys except DATA. */
assert(map_size == 1);
(void)map_size;
uint32_t key = mp_decode_uint(data);
assert(key == IPROTO_DATA);
(void)key;
}

/*
* Decodes Tarantool response body consisting of single IPROTO_DATA key into
* a Lua table and pushes the table to Lua stack.
*/
static void
netbox_decode_table(struct lua_State *L, const char **data,
const char *data_end, struct tuple_format *format)
{
(void)data_end;
(void)format;
netbox_skip_to_data(data);
luamp_decode(L, cfg, data);
}

/*
* Same as netbox_decode_table, but only decodes the first element of the
* table, skipping the rest.
*/
static void
netbox_decode_value(struct lua_State *L, const char **data,
const char *data_end, struct tuple_format *format)
{
(void)data_end;
(void)format;
netbox_skip_to_data(data);
uint32_t count = mp_decode_array(data);
if (count == 0)
return lua_pushnil(L);
luamp_decode(L, cfg, data);
for (uint32_t i = 1; i < count; ++i)
mp_next(data);
}

/*
* Decodes IPROTO_DATA into a tuple array and pushes the array to Lua stack.
*/
static void
netbox_decode_data(struct lua_State *L, const char **data,
Expand All @@ -687,34 +749,40 @@ netbox_decode_data(struct lua_State *L, const char **data,
}
}

/**
* Decode Tarantool response body consisting of single
* IPROTO_DATA key into array of tuples.
* @param Lua stack[1] Raw MessagePack pointer.
* @retval Tuples array and position of the body end.
/*
* Decodes Tarantool response body consisting of single IPROTO_DATA key into
* tuple array and pushes the array to Lua stack.
*/
static int
netbox_decode_select(struct lua_State *L)
static void
netbox_decode_select(struct lua_State *L, const char **data,
const char *data_end, struct tuple_format *format)
{
uint32_t ctypeid;
assert(lua_gettop(L) == 3);
struct tuple_format *format;
if (lua_type(L, 3) == LUA_TCDATA)
format = lbox_check_tuple_format(L, 3);
else
format = tuple_format_runtime;
const char *data = *(const char **)luaL_checkcdata(L, 1, &ctypeid);
assert(mp_typeof(*data) == MP_MAP);
uint32_t map_size = mp_decode_map(&data);
/* Until 2.0 body has no keys except DATA. */
assert(map_size == 1);
(void) map_size;
uint32_t key = mp_decode_uint(&data);
assert(key == IPROTO_DATA);
(void) key;
netbox_decode_data(L, &data, format);
*(const char **)luaL_pushcdata(L, ctypeid) = data;
return 2;
(void)data_end;
netbox_skip_to_data(data);
netbox_decode_data(L, data, format);
}

/*
* Same as netbox_decode_select, but only decodes the first tuple of the array,
* skipping the rest.
*/
static void
netbox_decode_tuple(struct lua_State *L, const char **data,
const char *data_end, struct tuple_format *format)
{
(void)data_end;
netbox_skip_to_data(data);
uint32_t count = mp_decode_array(data);
if (count == 0)
return lua_pushnil(L);
const char *begin = *data;
mp_next(data);
struct tuple *tuple = box_tuple_new(format, begin, *data);
if (tuple == NULL)
luaT_error(L);
luaT_pushtuple(L, tuple);
for (uint32_t i = 1; i < count; ++i)
mp_next(data);
}

/** Decode optional (i.e. may be present in response) metadata fields. */
Expand Down Expand Up @@ -833,28 +901,29 @@ netbox_decode_sql_info(struct lua_State *L, const char **data)
}
}

static int
netbox_decode_execute(struct lua_State *L)
static void
netbox_decode_execute(struct lua_State *L, const char **data,
const char *data_end, struct tuple_format *format)
{
uint32_t ctypeid;
const char *data = *(const char **)luaL_checkcdata(L, 1, &ctypeid);
assert(mp_typeof(*data) == MP_MAP);
uint32_t map_size = mp_decode_map(&data);
(void)data_end;
(void)format;
assert(mp_typeof(**data) == MP_MAP);
uint32_t map_size = mp_decode_map(data);
int rows_index = 0, meta_index = 0, info_index = 0;
for (uint32_t i = 0; i < map_size; ++i) {
uint32_t key = mp_decode_uint(&data);
uint32_t key = mp_decode_uint(data);
switch(key) {
case IPROTO_DATA:
netbox_decode_data(L, &data, tuple_format_runtime);
netbox_decode_data(L, data, tuple_format_runtime);
rows_index = i - map_size;
break;
case IPROTO_METADATA:
netbox_decode_metadata(L, &data);
netbox_decode_metadata(L, data);
meta_index = i - map_size;
break;
default:
assert(key == IPROTO_SQL_INFO);
netbox_decode_sql_info(L, &data);
netbox_decode_sql_info(L, data);
info_index = i - map_size;
break;
}
Expand All @@ -871,42 +940,41 @@ netbox_decode_execute(struct lua_State *L)
assert(meta_index == 0);
assert(rows_index == 0);
}
*(const char **)luaL_pushcdata(L, ctypeid) = data;
return 2;
}

static int
netbox_decode_prepare(struct lua_State *L)
static void
netbox_decode_prepare(struct lua_State *L, const char **data,
const char *data_end, struct tuple_format *format)
{
uint32_t ctypeid;
const char *data = *(const char **)luaL_checkcdata(L, 1, &ctypeid);
assert(mp_typeof(*data) == MP_MAP);
uint32_t map_size = mp_decode_map(&data);
(void)data_end;
(void)format;
assert(mp_typeof(**data) == MP_MAP);
uint32_t map_size = mp_decode_map(data);
int stmt_id_idx = 0, meta_idx = 0, bind_meta_idx = 0,
bind_count_idx = 0;
uint32_t stmt_id = 0;
for (uint32_t i = 0; i < map_size; ++i) {
uint32_t key = mp_decode_uint(&data);
uint32_t key = mp_decode_uint(data);
switch(key) {
case IPROTO_STMT_ID: {
stmt_id = mp_decode_uint(&data);
stmt_id = mp_decode_uint(data);
luaL_pushuint64(L, stmt_id);
stmt_id_idx = i - map_size;
break;
}
case IPROTO_METADATA: {
netbox_decode_metadata(L, &data);
netbox_decode_metadata(L, data);
meta_idx = i - map_size;
break;
}
case IPROTO_BIND_METADATA: {
netbox_decode_metadata(L, &data);
netbox_decode_metadata(L, data);
bind_meta_idx = i - map_size;
break;
}
default: {
assert(key == IPROTO_BIND_COUNT);
uint32_t bind_count = mp_decode_uint(&data);
uint32_t bind_count = mp_decode_uint(data);
luaL_pushuint64(L, bind_count);
bind_count_idx = i - map_size;
break;
Expand All @@ -926,8 +994,58 @@ netbox_decode_prepare(struct lua_State *L)
lua_pushvalue(L, meta_idx - 1);
lua_setfield(L, -2, "metadata");
}
}

*(const char **)luaL_pushcdata(L, ctypeid) = data;
/*
* Decodes a response body for the specified method. Pushes the result and the
* end of the decoded data to Lua stack.
*
* Takes the following arguments:
* - method: a value from the netbox_method enumeration
* - data: pointer to the data to decode (char ptr)
* - data_end: pointer to the end of the data (char ptr)
* - format: tuple format to use for decoding the body or nil
*/
static int
netbox_decode_method(struct lua_State *L)
{
typedef void (*method_decoder_f)(struct lua_State *L, const char **data,
const char *data_end,
struct tuple_format *format);
static method_decoder_f method_decoder[] = {
[NETBOX_PING] = netbox_decode_nil,
[NETBOX_CALL_16] = netbox_decode_select,
[NETBOX_CALL_17] = netbox_decode_table,
[NETBOX_EVAL] = netbox_decode_table,
[NETBOX_INSERT] = netbox_decode_tuple,
[NETBOX_REPLACE] = netbox_decode_tuple,
[NETBOX_DELETE] = netbox_decode_tuple,
[NETBOX_UPDATE] = netbox_decode_tuple,
[NETBOX_UPSERT] = netbox_decode_nil,
[NETBOX_SELECT] = netbox_decode_select,
[NETBOX_EXECUTE] = netbox_decode_execute,
[NETBOX_PREPARE] = netbox_decode_prepare,
[NETBOX_UNPREPARE] = netbox_decode_nil,
[NETBOX_GET] = netbox_decode_tuple,
[NETBOX_MIN] = netbox_decode_tuple,
[NETBOX_MAX] = netbox_decode_tuple,
[NETBOX_COUNT] = netbox_decode_value,
[NETBOX_INJECT] = netbox_decode_table,
};
enum netbox_method method = lua_tointeger(L, 1);
assert(method < netbox_method_MAX);
uint32_t ctypeid;
const char *data = *(const char **)luaL_checkcdata(L, 2, &ctypeid);
assert(ctypeid == CTID_CHAR_PTR || ctypeid == CTID_CONST_CHAR_PTR);
const char *data_end = *(const char **)luaL_checkcdata(L, 3, &ctypeid);
assert(ctypeid == CTID_CHAR_PTR || ctypeid == CTID_CONST_CHAR_PTR);
struct tuple_format *format;
if (!lua_isnil(L, 4))
format = lbox_check_tuple_format(L, 4);
else
format = tuple_format_runtime;
method_decoder[method](L, &data, data_end, format);
*(const char **)luaL_pushcdata(L, CTID_CONST_CHAR_PTR) = data;
return 2;
}

Expand All @@ -938,10 +1056,8 @@ luaopen_net_box(struct lua_State *L)
{ "encode_auth", netbox_encode_auth },
{ "encode_method", netbox_encode_method },
{ "decode_greeting",netbox_decode_greeting },
{ "decode_method", netbox_decode_method },
{ "communicate", netbox_communicate },
{ "decode_select", netbox_decode_select },
{ "decode_execute", netbox_decode_execute },
{ "decode_prepare", netbox_decode_prepare },
{ NULL, NULL}
};
/* luaL_register_module polutes _G */
Expand Down
43 changes: 4 additions & 39 deletions src/box/lua/net_box.lua
Expand Up @@ -26,6 +26,7 @@ local communicate = internal.communicate
local encode_auth = internal.encode_auth
local encode_method = internal.encode_method
local decode_greeting = internal.decode_greeting
local decode_method = internal.decode_method

local TIMEOUT_INFINITY = 500 * 365 * 86400
local VSPACE_ID = 281
Expand Down Expand Up @@ -83,21 +84,6 @@ error_unref(struct error *e);
-- utility tables
local is_final_state = {closed = 1, error = 1}

local function decode_nil(raw_data, raw_data_end) -- luacheck: no unused args
return nil, raw_data_end
end
local function decode_data(raw_data)
local response, raw_end = decode(raw_data)
return response[IPROTO_DATA_KEY], raw_end
end
local function decode_tuple(raw_data, raw_data_end, format) -- luacheck: no unused args
local response, raw_end = internal.decode_select(raw_data, nil, format)
return response[1], raw_end
end
local function decode_count(raw_data)
local response, raw_end = decode(raw_data)
return response[IPROTO_DATA_KEY][1], raw_end
end
local function decode_push(raw_data)
local response, raw_end = decode(raw_data)
return response[IPROTO_DATA_KEY][1], raw_end
Expand All @@ -111,27 +97,6 @@ local function version_at_least(peer_version_id, major, minor, patch)
return peer_version_id >= version_id(major, minor, patch)
end

local method_decoder = {
[M_PING] = decode_nil,
[M_CALL_16] = internal.decode_select,
[M_CALL_17] = decode_data,
[M_EVAL] = decode_data,
[M_INSERT] = decode_tuple,
[M_REPLACE] = decode_tuple,
[M_DELETE] = decode_tuple,
[M_UPDATE] = decode_tuple,
[M_UPSERT] = decode_nil,
[M_SELECT] = internal.decode_select,
[M_EXECUTE] = internal.decode_execute,
[M_PREPARE] = internal.decode_prepare,
[M_UNPREPARE] = decode_nil,
[M_GET] = decode_tuple,
[M_MIN] = decode_tuple,
[M_MAX] = decode_tuple,
[M_COUNT] = decode_count,
[M_INJECT] = decode_data,
}

local function decode_error(raw_data)
local ptr = ffi.new('const char *[1]', raw_data)
local err = ffi.C.error_unpack_unsafe(ptr)
Expand Down Expand Up @@ -642,9 +607,9 @@ local function create_transport(host, port, user, password, callback,
local real_end
-- Decode xrow.body[DATA] to Lua objects
if status == IPROTO_OK_KEY then
request.response, real_end =
method_decoder[request.method](body_rpos, body_end,
request.format)
request.response, real_end = decode_method(request.method,
body_rpos, body_end,
request.format)
assert(real_end == body_end, "invalid body length")
requests[id] = nil
request.id = nil
Expand Down

0 comments on commit 1087c1e

Please sign in to comment.