Skip to content

Commit

Permalink
box: add is_sync option
Browse files Browse the repository at this point in the history
Added a new is_sync parameter to `box.begin()`, `box.commit()`, and
`box.atomic()`. To make the transaction synchronous, set the `is_sync`
option to `true`. If any value other than `true/nil` is set, for example
`is_sync = "some string"`, then an error will be thrown.

Example:

```Lua
-- Sync transactions
box.atomic({is_sync = true}, function() ... end)

box.begin({is_sync = true}) ... box.commit({is_sync = true})

box.begin({is_sync = true}) ... box.commit()

box.begin() ... box.commit({is_sync = true})

-- Async transactions
box.atomic(function() ... end)

box.begin() ... box.commit()
```

Closes #8650

@TarantoolBot document
Title: box.atomic({is_sync = true})

Added the new `is_sync` parameter to `box.atomic()`. To make the
transaction synchronous, set the `is_sync` option to `true`. Setting
`is_sync = false` is prohibited. If to set any value other than true
for example `is_sync = "some string"`, then an error will be thrown.
  • Loading branch information
yanshtunder authored and sergepetrenko committed Nov 29, 2023
1 parent 7fbc1d0 commit c3c0d43
Show file tree
Hide file tree
Showing 15 changed files with 714 additions and 37 deletions.
6 changes: 6 additions & 0 deletions changelogs/unreleased/gh-8650-add-is_sync-option.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
## feature/box

* Added a new `is_sync` parameter to `box.atomic()`. To make the transaction
synchronous, set the `is_sync` option to `true`. Setting `is_sync = false` is
prohibited. If any value other than true/nil is set, for example
`is_sync = "some string"`, then an error will be thrown (gh-8650).
1 change: 1 addition & 0 deletions extra/exports
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ box_txn_begin
box_txn_commit
box_txn_id
box_txn_isolation
box_txn_make_sync
box_txn_rollback
box_txn_rollback_to_savepoint
box_txn_savepoint
Expand Down
14 changes: 13 additions & 1 deletion src/box/applier.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1433,6 +1433,19 @@ apply_plain_tx(uint32_t replica_id, struct stailq *rows,
goto fail;
}

item = stailq_last_entry(rows, struct applier_tx_row, next);

/*
* Look at the flags item->row->flags. If the transaction
* is synchronous, then set is_sync = true (txn.c). This
* should only be done on replicas. The master sets these
* flags and independently decides whether the transaction
* is synchronous or not. All txn meta flags are set only
* for the last txn row.
*/
if ((item->row.flags & IPROTO_FLAG_WAIT_ACK) != 0)
box_txn_make_sync();

if (use_triggers) {
/* We are ready to submit txn to wal. */
struct trigger *on_rollback, *on_wal_write;
Expand Down Expand Up @@ -1464,7 +1477,6 @@ apply_plain_tx(uint32_t replica_id, struct stailq *rows,
* transaction traversed network + remote WAL bundle before
* ack get received.
*/
item = stailq_last_entry(rows, struct applier_tx_row, next);
rcb->replica_id = replica_id;
rcb->txn_last_tm = item->row.tm;

Expand Down
16 changes: 14 additions & 2 deletions src/box/iproto.cc
Original file line number Diff line number Diff line change
Expand Up @@ -377,10 +377,12 @@ struct iproto_msg
struct auth_request auth;
/** Features request. */
struct id_request id;
/* SQL request, if this is the EXECUTE/PREPARE request. */
/** SQL request, if this is the EXECUTE/PREPARE request. */
struct sql_request sql;
/* BEGIN request */
/** BEGIN request */
struct begin_request begin;
/** COMMIT request */
struct commit_request commit;
/** In case of iproto parse error, saved diagnostics. */
struct diag diag;
};
Expand Down Expand Up @@ -1653,6 +1655,8 @@ iproto_msg_decode(struct iproto_msg *msg, struct cmsg_hop **route)
return 0;
case IPROTO_COMMIT:
*route = iproto_thread->commit_route;
if (xrow_decode_commit(&msg->header, &msg->commit) != 0)
return -1;
return 0;
case IPROTO_ROLLBACK:
*route = iproto_thread->rollback_route;
Expand Down Expand Up @@ -2048,6 +2052,7 @@ tx_process_begin(struct cmsg *m)
struct obuf *out;
struct obuf_svp header;
uint32_t txn_isolation = msg->begin.txn_isolation;
bool is_sync = msg->begin.is_sync;

if (tx_check_msg(msg) != 0)
goto error;
Expand All @@ -2068,6 +2073,9 @@ tx_process_begin(struct cmsg *m)
(void)rc;
goto error;
}
if (is_sync)
box_txn_make_sync();

out = msg->connection->tx.p_obuf;
header = obuf_create_svp(out);
iproto_reply_ok(out, msg->header.sync, ::schema_version);
Expand All @@ -2087,10 +2095,14 @@ tx_process_commit(struct cmsg *m)
struct iproto_msg *msg = tx_accept_msg(m);
struct obuf *out;
struct obuf_svp header;
bool is_sync = msg->commit.is_sync;

if (tx_check_msg(msg) != 0)
goto error;

if (is_sync)
box_txn_make_sync();

if (box_txn_commit() != 0)
goto error;

Expand Down
6 changes: 5 additions & 1 deletion src/box/iproto_constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,11 @@ extern const char *iproto_flag_bit_strs[];
* Mapping of format identifier to format clause consisting of field
* names and field types.
*/ \
_(TUPLE_FORMATS, 0x60, MP_MAP)
_(TUPLE_FORMATS, 0x60, MP_MAP) \
/**
* Flag indicating whether the transaction is synchronous.
*/ \
_(IS_SYNC, 0x61, MP_BOOL)

#define IPROTO_KEY_MEMBER(s, v, ...) IPROTO_ ## s = v,

Expand Down
34 changes: 34 additions & 0 deletions src/box/lua/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,40 @@ static const char *lua_sources[] = {
static int
lbox_commit(lua_State *L)
{
int top = lua_gettop(L);
if (top >= 1) {
/* box.commit(nil) */
if (lua_isnil(L, 1))
goto box_commit;

if (lua_type(L, 1) != LUA_TTABLE) {
diag_set(IllegalParams, "Illegal parameters, options "
"should be a table");
return luaT_error(L);
}
/* Get is_sync */
lua_getfield(L, 1, "is_sync");
/* box.commit({is_sync = nil}) */
if (lua_isnil(L, -1))
goto box_commit;

if (!lua_isboolean(L, -1)) {
diag_set(IllegalParams, "Illegal parameters, is_sync "
"must be a boolean");
return luaT_error(L);
}
bool is_sync = lua_toboolean(L, lua_gettop(L));
lua_pop(L, 1);

if (!is_sync) {
diag_set(IllegalParams, "Illegal parameters, is_sync "
"can only be true");
return luaT_error(L);
}
box_txn_make_sync();
}

box_commit:
if (box_txn_commit() != 0)
return luaT_error(L);
return 0;
Expand Down
52 changes: 31 additions & 21 deletions src/box/lua/net_box.c
Original file line number Diff line number Diff line change
Expand Up @@ -1361,19 +1361,6 @@ netbox_encode_unprepare(lua_State *L, int idx,
return netbox_encode_prepare(L, idx, ctx);
}

static inline int
netbox_encode_commit_or_rollback(lua_State *L, enum iproto_type type, int idx,
struct mpstream *stream, uint64_t sync,
uint64_t stream_id)
{
(void)L;
(void) idx;
assert(type == IPROTO_COMMIT || type == IPROTO_ROLLBACK);
size_t svp = netbox_begin_encode(stream, sync, type, stream_id);
netbox_end_encode(stream, svp);
return 0;
}

/**
* Encode an `IPROTO_BEGIN` request and write it to the provided MsgPack stream.
*/
Expand All @@ -1385,9 +1372,11 @@ netbox_encode_begin(struct lua_State *L, int idx,
ctx->stream_id);
bool has_timeout = !lua_isnoneornil(L, idx);
bool has_txn_isolation = !lua_isnoneornil(L, idx + 1);
if (has_timeout || has_txn_isolation) {
bool has_is_sync = !lua_isnoneornil(L, idx + 2);
if (has_timeout || has_txn_isolation || has_is_sync) {
uint32_t map_size = (has_timeout ? 1 : 0) +
(has_txn_isolation ? 1 : 0);
(has_txn_isolation ? 1 : 0) +
(has_is_sync ? 1 : 0);
mpstream_encode_map(ctx->stream, map_size);
}
if (has_timeout) {
Expand All @@ -1402,6 +1391,13 @@ netbox_encode_begin(struct lua_State *L, int idx,
mpstream_encode_uint(ctx->stream, IPROTO_TXN_ISOLATION);
mpstream_encode_uint(ctx->stream, txn_isolation);
}
if (has_is_sync) {
if (lua_type(L, idx + 2) == LUA_TBOOLEAN) {
bool is_sync = lua_toboolean(L, idx + 2);
mpstream_encode_uint(ctx->stream, IPROTO_IS_SYNC);
mpstream_encode_bool(ctx->stream, is_sync);
}
}
netbox_end_encode(ctx->stream, svp);
return 0;
}
Expand All @@ -1410,18 +1406,32 @@ static int
netbox_encode_commit(struct lua_State *L, int idx,
struct netbox_method_encode_ctx *ctx)
{
return netbox_encode_commit_or_rollback(L, IPROTO_COMMIT, idx,
ctx->stream, ctx->sync,
ctx->stream_id);
size_t svp = netbox_begin_encode(ctx->stream, ctx->sync, IPROTO_COMMIT,
ctx->stream_id);
bool has_is_sync = !lua_isnoneornil(L, idx);
if (has_is_sync) {
uint32_t map_size = 1;
mpstream_encode_map(ctx->stream, map_size);
if (lua_type(L, idx) == LUA_TBOOLEAN) {
bool is_sync = lua_toboolean(L, idx);
mpstream_encode_uint(ctx->stream, IPROTO_IS_SYNC);
mpstream_encode_bool(ctx->stream, is_sync);
}
}
netbox_end_encode(ctx->stream, svp);
return 0;
}

static int
netbox_encode_rollback(struct lua_State *L, int idx,
struct netbox_method_encode_ctx *ctx)
{
return netbox_encode_commit_or_rollback(L, IPROTO_ROLLBACK, idx,
ctx->stream, ctx->sync,
ctx->stream_id);
(void)L;
(void)idx;
size_t svp = netbox_begin_encode(ctx->stream, ctx->sync,
IPROTO_ROLLBACK, ctx->stream_id);
netbox_end_encode(ctx->stream, svp);
return 0;
}

/**
Expand Down
86 changes: 81 additions & 5 deletions src/box/lua/net_box.lua
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ local function stream_begin(stream, txn_opts, netbox_opts)
check_param_table(netbox_opts, REQUEST_OPTION_TYPES)
local timeout
local txn_isolation
local is_sync
if txn_opts then
if type(txn_opts) ~= 'table' then
error("txn_opts should be a table")
Expand All @@ -536,19 +537,94 @@ local function stream_begin(stream, txn_opts, netbox_opts)
txn_isolation =
box.internal.normalize_txn_isolation_level(txn_isolation)
end
is_sync = txn_opts.is_sync
if is_sync ~= nil and type(is_sync) ~= "boolean" then
error("is_sync must be a boolean")
end
if is_sync == false then
error("is_sync can only be true")
end
end
local res = stream:_request('BEGIN', netbox_opts, nil,
stream._stream_id, timeout, txn_isolation)
stream._stream_id, timeout, txn_isolation,
is_sync)
if netbox_opts and netbox_opts.is_async then
return res
end
end

local function stream_commit(stream, opts)
-- A function that correctly orders commit options. This is done so that
-- txn_opts is the second parameter (as in begin), and opts is the third
-- parameter. This is done for backward compatibility.
local function order_opts_commit(opts_1, opts_2)
if opts_1 == nil and opts_2 == nil then
return opts_1, opts_2
end
if type(opts_1) ~= 'table' and type(opts_2) ~= 'table' then
box.error(box.error.ILLEGAL_PARAMS, "options should be a table")
end

local new_opts_1 = opts_1
local new_opts_2 = opts_2

if opts_1 ~= nil and type(opts_1) == 'table' then
for i, _ in pairs(opts_1) do
for k, _ in pairs(REQUEST_OPTION_TYPES) do
if i == k then
new_opts_1, new_opts_2 = opts_2, opts_1
break
end
end
end
end

if opts_2 ~= nil and type(opts_2) == 'table' then
for i, _ in pairs(opts_2) do
for k, _ in pairs(REQUEST_OPTION_TYPES) do
if i == k then
new_opts_1, new_opts_2 = opts_1, opts_2
break
end
end
end
end
return new_opts_1, new_opts_2
end

local function stream_commit(stream, txn_opts, opts)
check_remote_arg(stream, 'commit')
check_param_table(opts, REQUEST_OPTION_TYPES)
local res = stream:_request('COMMIT', opts, nil, stream._stream_id)
if opts and opts.is_async then
local new_txn_opts, new_opts = order_opts_commit(txn_opts, opts)
check_param_table(new_opts, REQUEST_OPTION_TYPES)

-- The options are in the wrong order or mixed up. Example:
-- stream:commit({opts}) => ok
-- stream:commit({txn_opts}) => ok
-- stream:commit({txn_opts}, {opts}) => ok
-- stream:commit({opts}, {txn_opts}) => error
-- stream:commit({txn_opts, opts}) => error
-- stream:commit({opts, txn_opts}) => error
-- The opts is netbox options. The txn_opts is transaction options.
if new_txn_opts and new_opts and new_txn_opts ~= txn_opts then
box.error(box.error.ILLEGAL_PARAMS, "options are either in the " ..
"wrong order or mixed up")
end

local is_sync
if new_txn_opts then
if type(new_txn_opts) ~= 'table' then
box.error(box.error.ILLEGAL_PARAMS, "options should be a table")
end
is_sync = new_txn_opts.is_sync
if type(is_sync) ~= "boolean" and is_sync ~= nil then
box.error(box.error.ILLEGAL_PARAMS, "is_sync must be a boolean")
end
if is_sync == false then
box.error(box.error.ILLEGAL_PARAMS, "is_sync can only be true")
end
end
local res = stream:_request('COMMIT', new_opts, nil, stream._stream_id,
is_sync)
if new_opts and new_opts.is_async then
return res
end
end
Expand Down

0 comments on commit c3c0d43

Please sign in to comment.