Skip to content

Commit

Permalink
box: add is_sync option
Browse files Browse the repository at this point in the history
Added the new is_sync parameter to `box.begin()`, `box.commit()`, and
`box.atomic()`. To make the transaction synchronous, set the `is_sync`
option to `true`. If to set any value other than `true/nil`, for example
`is_sync = "some string"`, then an error will be thrown.

Example:

```Lua
-- Sync transactions
box.atomic({is_sync = true}, function() ... end)

box.begin({is_sync = true}) ... box.commit({is_sync = true})

box.begin({is_sync = true}) ... box.commit()

box.begin() ... box.commit({is_sync = true})

-- Async transactions
box.atomic(function() ... end)

box.begin() ... box.commit()
```

Closes tarantool#8650

@TarantoolBot document
Title: box.atomic({is_sync = true})

Added the new `is_sync` parameter to `box.atomic()`. To make the
transaction synchronous, set the `is_sync` option to `true`. Setting
`is_sync = false` is prohibited. If to set any value other than true
for example `is_sync = "some string"`, then an error will be thrown.
  • Loading branch information
yanshtunder committed Oct 11, 2023
1 parent 4a86856 commit eff43d1
Show file tree
Hide file tree
Showing 14 changed files with 701 additions and 33 deletions.
6 changes: 6 additions & 0 deletions changelogs/unreleased/gh-8650-add-is_sync-option.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
## feature/box

* Added the new `is_sync` parameter to `box.atomic()`. To make the
transaction synchronous, set the `is_sync` option to `true`. Setting
`is_sync = false` is prohibited. If to set any value other than true/nil,
for example `is_sync = "some string"`, then an error will be thrown (gh-8650).
1 change: 1 addition & 0 deletions extra/exports
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ box_txn_rollback_to_savepoint
box_txn_savepoint
box_txn_set_isolation
box_txn_set_timeout
box_txn_make_sync
box_update
box_upsert
clock_monotonic
Expand Down
16 changes: 14 additions & 2 deletions src/box/iproto.cc
Original file line number Diff line number Diff line change
Expand Up @@ -376,10 +376,12 @@ struct iproto_msg
struct auth_request auth;
/** Features request. */
struct id_request id;
/* SQL request, if this is the EXECUTE/PREPARE request. */
/** SQL request, if this is the EXECUTE/PREPARE request. */
struct sql_request sql;
/* BEGIN request */
/** BEGIN request */
struct begin_request begin;
/** COMMIT request */
struct commit_request commit;
/** In case of iproto parse error, saved diagnostics. */
struct diag diag;
};
Expand Down Expand Up @@ -1645,6 +1647,8 @@ iproto_msg_decode(struct iproto_msg *msg, struct cmsg_hop **route)
return 0;
case IPROTO_COMMIT:
*route = iproto_thread->commit_route;
if (xrow_decode_commit(&msg->header, &msg->commit) != 0)
return -1;
return 0;
case IPROTO_ROLLBACK:
*route = iproto_thread->rollback_route;
Expand Down Expand Up @@ -2018,6 +2022,7 @@ tx_process_begin(struct cmsg *m)
struct obuf *out;
struct obuf_svp header;
uint32_t txn_isolation = msg->begin.txn_isolation;
bool is_sync = msg->begin.is_sync;

if (tx_check_msg(msg) != 0)
goto error;
Expand All @@ -2038,6 +2043,9 @@ tx_process_begin(struct cmsg *m)
(void)rc;
goto error;
}
if (is_sync)
box_txn_make_sync();

out = msg->connection->tx.p_obuf;
header = obuf_create_svp(out);
iproto_reply_ok(out, msg->header.sync, ::schema_version);
Expand All @@ -2057,10 +2065,14 @@ tx_process_commit(struct cmsg *m)
struct iproto_msg *msg = tx_accept_msg(m);
struct obuf *out;
struct obuf_svp header;
bool is_sync = msg->commit.is_sync;

if (tx_check_msg(msg) != 0)
goto error;

if (is_sync)
box_txn_make_sync();

if (box_txn_commit() != 0)
goto error;

Expand Down
4 changes: 4 additions & 0 deletions src/box/iproto_constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ extern const char *iproto_flag_bit_strs[];
* when identifier is present (i.e., the identifier is ignored).
*/ \
_(INDEX_NAME, 0x5f, MP_STR) \
/**
* Flag indicating whether the transaction is synchronous.
*/ \
_(IS_SYNC, 0x60, MP_BOOL) \

#define IPROTO_KEY_MEMBER(s, v, ...) IPROTO_ ## s = v,

Expand Down
31 changes: 31 additions & 0 deletions src/box/lua/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,37 @@ static const char *lua_sources[] = {
static int
lbox_commit(lua_State *L)
{
int top = lua_gettop(L);
if (top >= 1) {
/* box.commit(nil) */
if (lua_isnil(L, 1))
goto box_commit;

if (lua_type(L, 1) != LUA_TTABLE) {
diag_set(IllegalParams, "Illegal parameters, options "
"should be a table");
return luaT_error(L);
}
/* Get is_sync */
lua_getfield(L, 1, "is_sync");

if (!lua_isboolean(L, -1)) {
diag_set(IllegalParams, "Illegal parameters, is_sync "
"must be a boolean");
return luaT_error(L);
}
bool is_sync = lua_toboolean(L, lua_gettop(L));
lua_pop(L, 1);

if (!is_sync) {
diag_set(IllegalParams, "Illegal parameters, is_sync "
"can only be true");
return luaT_error(L);
}
box_txn_make_sync();
}

box_commit:
if (box_txn_commit() != 0)
return luaT_error(L);
return 0;
Expand Down
50 changes: 31 additions & 19 deletions src/box/lua/net_box.c
Original file line number Diff line number Diff line change
Expand Up @@ -1289,29 +1289,18 @@ netbox_encode_unprepare(lua_State *L, int idx, struct mpstream *stream,
return netbox_encode_prepare(L, idx, stream, sync, stream_id);
}

static inline int
netbox_encode_commit_or_rollback(lua_State *L, enum iproto_type type, int idx,
struct mpstream *stream, uint64_t sync,
uint64_t stream_id)
{
(void)L;
(void) idx;
assert(type == IPROTO_COMMIT || type == IPROTO_ROLLBACK);
size_t svp = netbox_begin_encode(stream, sync, type, stream_id);
netbox_end_encode(stream, svp);
return 0;
}

static int
netbox_encode_begin(struct lua_State *L, int idx, struct mpstream *stream,
uint64_t sync, uint64_t stream_id)
{
size_t svp = netbox_begin_encode(stream, sync, IPROTO_BEGIN, stream_id);
bool has_timeout = !lua_isnoneornil(L, idx);
bool has_txn_isolation = !lua_isnoneornil(L, idx + 1);
if (has_timeout || has_txn_isolation) {
bool has_is_sync = !lua_isnoneornil(L, idx + 2);
if (has_timeout || has_txn_isolation || has_is_sync) {
uint32_t map_size = (has_timeout ? 1 : 0) +
(has_txn_isolation ? 1 : 0);
(has_txn_isolation ? 1 : 0) +
(has_is_sync ? 1 : 0);
mpstream_encode_map(stream, map_size);
}
if (has_timeout) {
Expand All @@ -1326,6 +1315,13 @@ netbox_encode_begin(struct lua_State *L, int idx, struct mpstream *stream,
mpstream_encode_uint(stream, IPROTO_TXN_ISOLATION);
mpstream_encode_uint(stream, txn_isolation);
}
if (has_is_sync) {
if (lua_type(L, idx + 2) == LUA_TBOOLEAN) {
bool is_sync = lua_toboolean(L, idx + 2);
mpstream_encode_uint(stream, IPROTO_IS_SYNC);
mpstream_encode_bool(stream, is_sync);
}
}
netbox_end_encode(stream, svp);
return 0;
}
Expand All @@ -1334,16 +1330,32 @@ static int
netbox_encode_commit(struct lua_State *L, int idx, struct mpstream *stream,
uint64_t sync, uint64_t stream_id)
{
return netbox_encode_commit_or_rollback(L, IPROTO_COMMIT, idx, stream,
sync, stream_id);
size_t svp = netbox_begin_encode(stream, sync, IPROTO_COMMIT,
stream_id);
bool has_is_sync = !lua_isnoneornil(L, idx);
if (has_is_sync) {
uint32_t map_size = 1;
mpstream_encode_map(stream, map_size);
if (lua_type(L, idx) == LUA_TBOOLEAN) {
bool is_sync = lua_toboolean(L, idx);
mpstream_encode_uint(stream, IPROTO_IS_SYNC);
mpstream_encode_bool(stream, is_sync);
}
}
netbox_end_encode(stream, svp);
return 0;
}

static int
netbox_encode_rollback(struct lua_State *L, int idx, struct mpstream *stream,
uint64_t sync, uint64_t stream_id)
{
return netbox_encode_commit_or_rollback(L, IPROTO_ROLLBACK, idx, stream,
sync, stream_id);
(void)L;
(void)idx;
size_t svp = netbox_begin_encode(stream, sync, IPROTO_ROLLBACK,
stream_id);
netbox_end_encode(stream, svp);
return 0;
}

/**
Expand Down
79 changes: 74 additions & 5 deletions src/box/lua/net_box.lua
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ local function stream_begin(stream, txn_opts, netbox_opts)
check_param_table(netbox_opts, REQUEST_OPTION_TYPES)
local timeout
local txn_isolation
local is_sync
if txn_opts then
if type(txn_opts) ~= 'table' then
error("txn_opts should be a table")
Expand All @@ -536,19 +537,87 @@ local function stream_begin(stream, txn_opts, netbox_opts)
txn_isolation =
box.internal.normalize_txn_isolation_level(txn_isolation)
end
is_sync = txn_opts.is_sync
if is_sync ~= nil and type(is_sync) ~= "boolean" then
error("is_sync must be a boolean")
end
if is_sync == false then
error("is_sync can only be true")
end
end
local res = stream:_request('BEGIN', netbox_opts, nil,
stream._stream_id, timeout, txn_isolation)
stream._stream_id, timeout, txn_isolation,
is_sync)
if netbox_opts and netbox_opts.is_async then
return res
end
end

local function stream_commit(stream, opts)
-- A function that correctly orders commit options. This is done so that
-- txn_opts is the second parameter (as in begin), and opts is the third
-- parameter. This is done for backward compatibility.
local function order_opts_commit(opts_1, opts_2)
if opts_1 == nil and opts_2 == nil then
return opts_1, opts_2
end
if type(opts_1) ~= 'table' and type(opts_2) ~= 'table' then
box.error(box.error.ILLEGAL_PARAMS, "options should be a table")
end

local new_opts_1 = opts_1
local new_opts_2 = opts_2

if opts_1 ~= nil and type(opts_1) == 'table' then
for i, _ in pairs(opts_1) do
for k, _ in pairs(REQUEST_OPTION_TYPES) do
if i == k then
new_opts_1, new_opts_2 = opts_2, opts_1
break
end
end
end
end

if opts_2 ~= nil and type(opts_2) == 'table' then
for i, _ in pairs(opts_2) do
for k, _ in pairs(REQUEST_OPTION_TYPES) do
if i == k then
new_opts_1, new_opts_2 = opts_1, opts_2
break
end
end
end
end
return new_opts_1, new_opts_2
end

local function stream_commit(stream, txn_opts, opts)
check_remote_arg(stream, 'commit')
check_param_table(opts, REQUEST_OPTION_TYPES)
local res = stream:_request('COMMIT', opts, nil, stream._stream_id)
if opts and opts.is_async then
local new_txn_opts, new_opts = order_opts_commit(txn_opts, opts)
check_param_table(new_opts, REQUEST_OPTION_TYPES)

-- The options are in the wrong order. Example:
-- stream:commit({opts}, {txn_opts})
if new_txn_opts and new_opts and new_txn_opts ~= txn_opts then
error("Incorrect use of options")
end

local is_sync
if new_txn_opts then
if type(new_txn_opts) ~= 'table' then
box.error(box.error.ILLEGAL_PARAMS, "options should be a table")
end
is_sync = new_txn_opts.is_sync
if type(is_sync) ~= "boolean" and is_sync ~= nil then
error("is_sync must be a boolean")
end
if is_sync == false then
error("is_sync can only be true")
end
end
local res = stream:_request('COMMIT', new_opts, nil, stream._stream_id,
is_sync)
if new_opts and new_opts.is_async then
return res
end
end
Expand Down
25 changes: 21 additions & 4 deletions src/box/lua/schema.lua
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ ffi.cdef[[
int
box_txn_set_timeout(double timeout);
void
box_txn_make_sync();
void
memtx_tx_story_gc_step();
int
box_sequence_current(uint32_t seq_id, int64_t *result);
Expand Down Expand Up @@ -296,16 +298,28 @@ local begin_options = {
return true
end,
txn_isolation = normalize_txn_isolation_level,
is_sync = function(is_sync)
if type(is_sync) ~= "boolean" then
box.error(box.error.ILLEGAL_PARAMS,
"is_sync must be a boolean")
end
if is_sync == false then
box.error(box.error.ILLEGAL_PARAMS, "is_sync can only be true")
end
return true
end,
}

box.begin = function(options)
local timeout
local txn_isolation
if options then
local is_sync
if options ~= nil or options == false then
check_param_table(options, begin_options)
timeout = options.timeout
txn_isolation = options.txn_isolation and
normalize_txn_isolation_level(options.txn_isolation)
is_sync = options.is_sync
end
if builtin.box_txn_begin() == -1 then
box.error()
Expand All @@ -318,6 +332,9 @@ box.begin = function(options)
box.rollback()
box.error()
end
if is_sync then
builtin.box_txn_make_sync()
end
end

box.is_in_txn = builtin.box_txn
Expand Down Expand Up @@ -357,9 +374,9 @@ local function atomic_tail(status, ...)
if not status then
box.rollback()
error((...), 2)
end
box.commit()
return ...
end
box.commit()
return ...
end

box.atomic = function(arg0, arg1, ...)
Expand Down

0 comments on commit eff43d1

Please sign in to comment.