Skip to content

Commit

Permalink
box: add is_sync option
Browse files Browse the repository at this point in the history
Added the new `is_sync` parameter to `box.begin()`, `box.commit()`, and
`box.atomic()`. To make the transaction synchronous, set the `is_sync`
option to `true`. `box.commit()` can't change `is_sync` to `false` if
the transaction was opened as synchronous or has writes to synchronous
spaces. By default, `is_sync = false`.
Examples:
```Lua
-- Sync transactions
box.atomic({is_sync = true}, function() ... end)

box.begin({is_sync = true}) ... box.commit({is_sync = false})

box.begin({is_sync = false}) ... box.commit({is_sync = true})

box.begin({is_sync = true}) ... box.commit({is_sync = true})

-- Async transactions
box.atomic({is_sync = false}, function() ... end)

box.begin({is_sync = false}) ... box.commit({is_sync = false})

box.begin() ... box.commit()

```

Closes tarantool#8650

NO_DOC=internal
  • Loading branch information
yanshtunder committed Jul 31, 2023
1 parent bb74d6c commit d8a981b
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 4 deletions.
7 changes: 7 additions & 0 deletions changelogs/unreleased/gh-8650-add-is_sync-option.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
## feature/box

* Added the new `is_sync` parameter to `box.begin()`, `box.commit()`, and
`box.atomic()`. To make the transaction synchronous, set the `is_sync`
option to `true`. `box.commit()` can't change `is_sync` to `false` if
the transaction was opened as synchronous or has writes to synchronous
spaces (gh-8650).
1 change: 1 addition & 0 deletions extra/exports
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ box_txn_rollback_to_savepoint
box_txn_savepoint
box_txn_set_isolation
box_txn_set_timeout
box_txn_set_sync
box_update
box_upsert
clock_monotonic
Expand Down
11 changes: 11 additions & 0 deletions src/box/lua/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,17 @@ static const char *lua_sources[] = {
static int
lbox_commit(lua_State *L)
{
int top = lua_gettop(L);
bool is_sync = false;
if (top >= 1 || lua_istable(L, 1)) {
lua_getfield(L, 1, "is_sync"); /* Get is_sync. */
if (!lua_isboolean(L, -1))
return luaL_error(L, "is_sync must be a boolean");
is_sync = lua_toboolean(L, lua_gettop(L));
lua_pop(L, 1);
}

assert(box_txn_set_sync(is_sync) == 0);
if (box_txn_commit() != 0)
return luaT_error(L);
return 0;
Expand Down
22 changes: 19 additions & 3 deletions src/box/lua/schema.lua
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ ffi.cdef[[
int
box_txn_begin();
int
box_txn_commit();
int
box_txn_set_timeout(double timeout);
int
box_txn_set_sync(bool is_sync);
void
memtx_tx_story_gc_step();
int
Expand Down Expand Up @@ -403,16 +407,25 @@ local begin_options = {
return true
end,
txn_isolation = normalize_txn_isolation_level,
is_sync = function(is_sync)
if type(is_sync) ~= "boolean" then
box.error(box.error.ILLEGAL_PARAMS,
"is_sync must be a boolean")
end
return true
end,
}

box.begin = function(options)
local timeout
local txn_isolation
local is_sync
if options then
check_param_table(options, begin_options)
timeout = options.timeout
txn_isolation = options.txn_isolation and
normalize_txn_isolation_level(options.txn_isolation)
is_sync = options.is_sync
end
if builtin.box_txn_begin() == -1 then
box.error()
Expand All @@ -425,6 +438,9 @@ box.begin = function(options)
box.rollback()
box.error()
end
if is_sync then
assert(builtin.box_txn_set_sync(is_sync) == 0)
end
end

box.is_in_txn = builtin.box_txn
Expand Down Expand Up @@ -464,9 +480,9 @@ local function atomic_tail(status, ...)
if not status then
box.rollback()
error((...), 2)
end
box.commit()
return ...
end
box.commit()
return ...
end

box.atomic = function(arg0, arg1, ...)
Expand Down
34 changes: 33 additions & 1 deletion src/box/txn.c
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ txn_begin(void)
rlist_create(&txn->savepoints);
txn->fiber = NULL;
txn->timeout = TIMEOUT_INFINITY;
txn->is_sync = false;
txn->rollback_timer = NULL;
fiber_set_txn(fiber(), txn);
trigger_create(&txn->fiber_on_yield, txn_on_yield, NULL, NULL);
Expand Down Expand Up @@ -850,10 +851,26 @@ txn_journal_entry_new(struct txn *txn)
if (stmt->row == NULL)
continue;

/*
* Look at the flags stmt->row->flags. If the transaction
* is synchronous, then set is_sync = true. This should
* only be done on replicas. The master sets these flags
* and independently decides whether the transaction is
* synchronous or not.
*/

if (stmt->row->flags == (IPROTO_FLAG_COMMIT |
IPROTO_FLAG_WAIT_SYNC | IPROTO_FLAG_WAIT_ACK)) {
is_sync = true;
}

if (stmt->row->type != IPROTO_NOP) {
is_fully_nop = false;
is_sync = is_sync || (stmt->space != NULL &&
space_is_sync(stmt->space));
space_is_sync(stmt->space)) ||
txn->is_sync;

txn->is_sync = false;
}

if (stmt->row->replica_id == 0)
Expand Down Expand Up @@ -1277,6 +1294,21 @@ box_txn_set_timeout(double timeout)
return 0;
}

int
box_txn_set_sync(bool is_sync)
{
struct txn *txn = in_txn();
/*
* Do nothing if transaction is not started,
* it's the same as BEGIN + COMMIT.
*/
if (!txn)
return 0;
if (is_sync)
txn->is_sync = is_sync;
return 0;
}

/** Wait for a linearization point for a transaction. */
static int
txn_make_linearizable(struct txn *txn)
Expand Down
10 changes: 10 additions & 0 deletions src/box/txn.h
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,8 @@ struct txn {
bool is_schema_changed;
/** Timeout for transaction, or TIMEOUT_INFINITY if not set. */
double timeout;
/** True in case box.begin() set to true. */
bool is_sync;
/**
* Timer that is alarmed if the transaction did not have time
* to complete within the timeout specified when it was created.
Expand Down Expand Up @@ -1024,6 +1026,14 @@ box_txn_set_timeout(double timeout);
API_EXPORT int
box_txn_set_isolation(uint32_t level);

/**
* Set @a sync for a transaction.
*
* @retval 0 if success
* @retval -1 if failed, diag is set.
*/
API_EXPORT int
box_txn_set_sync(bool is_sync);
/** \endcond public */

typedef struct txn_savepoint box_txn_savepoint_t;
Expand Down
155 changes: 155 additions & 0 deletions test/box-luatest/gh_8650_add_is_sync_option_test.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
local t = require('luatest')
local cluster = require('luatest.replica_set')
local server = require('luatest.server')

local g = t.group('gh_8650')

g.before_each(function(cg)
cg.cluster = cluster:new({})

local box_cfg = {
replication = {
server.build_listen_uri('master', cg.cluster.id)
},
replication_synchro_timeout = 0.2,
replication_synchro_quorum = 2
}
cg.master = cg.cluster:build_server({alias = 'master',
box_cfg = box_cfg})
local box_cfg = {
replication = {
cg.master.net_box_uri,
server.build_listen_uri('replica', cg.cluster.id)
},
replication_synchro_timeout = 0.2,
replication_synchro_quorum = 2,
read_only = true
}
cg.replica = cg.cluster:build_server({alias = 'replica',
box_cfg = box_cfg})

cg.cluster:add_server(cg.master)
cg.cluster:add_server(cg.replica)
cg.cluster:start()
end)

g.after_each(function(cg)
cg.cluster:drop()
end)

g.test_box_begin_commit_is_sync = function(cg)
cg.master:exec(function()
box.schema.space.create('test')
box.space.test:create_index('pk')
box.ctl.promote()

box.begin({is_sync = false})
box.space.test:insert{1, 'is_sync = true'}
box.commit({is_sync = true})
box.begin({is_sync = false})
box.space.test:insert{2, 'is_sync = false'}
box.commit({is_sync = false})
box.begin({is_sync = true})
box.space.test:insert{3, 'is_sync = true'}
box.commit({is_sync = true})
box.begin({is_sync = true})
box.space.test:insert{4, 'is_sync = true'}
box.commit({is_sync = false})
end)

t.assert_equals(cg.master:exec(function()
return box.space.test:select()
end), {{1, 'is_sync = true'}, {2, 'is_sync = false'},
{3, 'is_sync = true'}, {4, 'is_sync = true'}})

t.assert_equals(cg.replica:exec(function()
return box.space.test:select()
end), {{1, 'is_sync = true'}, {2, 'is_sync = false'},
{3, 'is_sync = true'}, {4, 'is_sync = true'}})

cg.replica:stop()

-- Async transaction
cg.master:exec(function()
box.begin({is_sync = false})
box.space.test:insert{5, 'is_sync = false'}
box.commit({is_sync = false})
end)

-- Sync transaction
t.assert_error_msg_content_equals('Quorum collection for a synchronous ' ..
'transaction is timed out',
function()
cg.master:exec(function()
box.begin({is_sync = true})
box.space.test:insert{6, 'is_sync = true'}
box.commit({is_sync = false})
end)
end)

t.assert_equals(cg.master:exec(function()
return box.space.test:select()
end), {{1, 'is_sync = true'}, {2, 'is_sync = false'},
{3, 'is_sync = true'}, {4, 'is_sync = true'},
{5, "is_sync = false"}})
end

g.test_box_atomic_is_sync = function(cg)
cg.master:exec(function()
box.schema.space.create('test')
box.space.test:create_index('pk')
box.ctl.promote()

box.atomic(function()
box.space.test:insert{1, 'is_sync = false'}
box.space.test:insert{2, 'is_sync = false'}
end)
box.atomic({is_sync = true}, function()
box.space.test:insert{3, 'is_sync = true'}
box.space.test:insert{4, 'is_sync = true'}
end)
box.atomic({is_sync = false}, function()
box.space.test:insert{5, 'is_sync = false'}
box.space.test:insert{6, 'is_sync = false'}
end)
end)

t.assert_equals(cg.master:exec(function()
return box.space.test:select()
end), {{1, 'is_sync = false'}, {2, 'is_sync = false'},
{3, 'is_sync = true'}, {4, 'is_sync = true'},
{5, 'is_sync = false'}, {6, 'is_sync = false'}})

t.assert_equals(cg.replica:exec(function()
return box.space.test:select()
end), {{1, 'is_sync = false'}, {2, 'is_sync = false'},
{3, 'is_sync = true'}, {4, 'is_sync = true'},
{5, 'is_sync = false'}, {6, 'is_sync = false'}})

cg.replica:stop()

-- Async transaction
cg.master:exec(function()
box.atomic({is_sync = false}, function()
box.space.test:insert{7, 'is_sync = false'}
end)
end)

-- Sync transaction
t.assert_error_msg_content_equals('Quorum collection for a synchronous ' ..
'transaction is timed out',
function()
cg.master:exec(function()
box.atomic({is_sync = true}, function()
box.space.test:insert{8, 'is_sync = true'}
end)
end)
end)

t.assert_equals(cg.master:exec(function()
return box.space.test:select()
end), {{1, 'is_sync = false'}, {2, 'is_sync = false'},
{3, 'is_sync = true'}, {4, 'is_sync = true'},
{5, "is_sync = false"}, {6, 'is_sync = false'},
{7, 'is_sync = false'}})
end

0 comments on commit d8a981b

Please sign in to comment.