Skip to content

Commit

Permalink
fiber: Introduce fiber.join() related methods
Browse files Browse the repository at this point in the history
Introduce two functions
* fiber.new() - create fiber, schedules it into the
ready queue but doesn't call it and doesn't yield.
Signature of the method is the same as for fiber.create
* fiber.join() - waits until the specified fiber finishes
its execution and returns result or error. Applicable only
to joinable fibers.
* fiber.set_joinable() - sets the fiber joinable flag

Closes #1397
  • Loading branch information
ilmarkov authored and kostja committed Mar 22, 2018
1 parent 021ce48 commit 48cad35
Show file tree
Hide file tree
Showing 5 changed files with 370 additions and 19 deletions.
7 changes: 5 additions & 2 deletions src/fiber.c
Expand Up @@ -383,9 +383,12 @@ fiber_join(struct fiber *fiber)

if (! fiber_is_dead(fiber)) {
rlist_add_tail_entry(&fiber->wake, fiber(), state);
fiber_yield();

do {
fiber_yield();
} while (! fiber_is_dead(fiber));
}
assert(fiber_is_dead(fiber));

/* Move exception to the caller */
int ret = fiber->f_ret;
if (ret != 0) {
Expand Down
4 changes: 3 additions & 1 deletion src/fiber.h
Expand Up @@ -105,7 +105,9 @@ enum fiber_key {
/** User global privilege and authentication token */
FIBER_KEY_USER = 3,
FIBER_KEY_MSG = 4,
FIBER_KEY_MAX = 5
/** Storage for lua stack */
FIBER_KEY_LUA_STACK = 5,
FIBER_KEY_MAX = 6
};

/** \cond public */
Expand Down
128 changes: 112 additions & 16 deletions src/lua/fiber.c
Expand Up @@ -291,35 +291,39 @@ lbox_fiber_info(struct lua_State *L)
}

static int
lua_fiber_run_f(va_list ap)
lua_fiber_run_f(MAYBE_UNUSED va_list ap)
{
int result;
int coro_ref = va_arg(ap, int);
struct lua_State *L = va_arg(ap, struct lua_State *);

result = luaT_call(L, lua_gettop(L) - 1, 0);
struct lua_State *L = (struct lua_State *)
fiber_get_key(fiber(), FIBER_KEY_LUA_STACK);
int coro_ref = lua_tointeger(L, -1);
lua_pop(L, 1);
result = luaT_call(L, lua_gettop(L) - 1, LUA_MULTRET);

/* Destroy local storage */
int storage_ref = (int)(intptr_t)
fiber_get_key(fiber(), FIBER_KEY_LUA_STORAGE);
if (storage_ref > 0)
luaL_unref(L, LUA_REGISTRYINDEX, storage_ref);
luaL_unref(L, LUA_REGISTRYINDEX, coro_ref);
/*
* If fiber is not joinable
* We can unref child stack here,
* otherwise we have to unref child stack in join
*/
if (fiber()->flags & FIBER_IS_JOINABLE)
lua_pushinteger(L, coro_ref);
else
luaL_unref(L, LUA_REGISTRYINDEX, coro_ref);

return result;
}

/**
* Create, resume and detach a fiber
* given the function and its arguments.
* Utility function for fiber.create and fiber.new
*/
static int
lbox_fiber_create(struct lua_State *L)
static struct fiber *
fiber_create(struct lua_State *L)
{
if (lua_gettop(L) < 1 || !lua_isfunction(L, 1))
luaL_error(L, "fiber.create(function, ...): bad arguments");
if (fiber_checkstack())
luaL_error(L, "fiber.create(): out of fiber stack");

struct lua_State *child_L = lua_newthread(L);
int coro_ref = luaL_ref(L, LUA_REGISTRYINDEX);

Expand All @@ -333,7 +337,45 @@ lbox_fiber_create(struct lua_State *L)
lua_xmove(L, child_L, lua_gettop(L));
/* XXX: 'fiber' is leaked if this throws a Lua error. */
lbox_pushfiber(L, f->fid);
fiber_start(f, coro_ref, child_L);
/* Pass coro_ref via lua stack so that we don't have to pass it
* as an argument of fiber_run function.
* No function will work with child_L until the function is called.
* At that time we can pop coro_ref from stack
*/
lua_pushinteger(child_L, coro_ref);
fiber_set_key(f, FIBER_KEY_LUA_STACK, child_L);
return f;
}

/**
* Create, resume and detach a fiber
* given the function and its arguments.
*/
static int
lbox_fiber_create(struct lua_State *L)
{
if (lua_gettop(L) < 1 || !lua_isfunction(L, 1))
luaL_error(L, "fiber.create(function, ...): bad arguments");
if (fiber_checkstack())
luaL_error(L, "fiber.create(): out of fiber stack");
struct fiber *f = fiber_create(L);
fiber_start(f);
return 1;
}

/**
* Create a fiber, schedule it for execution, but not invoke yet
*/
static int
lbox_fiber_new(struct lua_State *L)
{
if (lua_gettop(L) < 1 || !lua_isfunction(L, 1))
luaL_error(L, "fiber.new(function, ...): bad arguments");
if (fiber_checkstack())
luaL_error(L, "fiber.new(): out of fiber stack");

struct fiber *f = fiber_create(L);
fiber_wakeup(f);
return 1;
}

Expand Down Expand Up @@ -567,6 +609,55 @@ lbox_fiber_wakeup(struct lua_State *L)
return 0;
}

static int
lbox_fiber_join(struct lua_State *L)
{
struct fiber *fiber = lbox_checkfiber(L, 1);
struct lua_State *child_L = fiber_get_key(fiber, FIBER_KEY_LUA_STACK);
fiber_join(fiber);
struct error *e = NULL;
int num_ret = 0;
int coro_ref = 0;
if (child_L != NULL) {
coro_ref = lua_tointeger(child_L, -1);
lua_pop(child_L, 1);
}
if (fiber->f_ret != 0) {
/*
* After fiber_join the error of fiber being joined was moved to
* current fiber diag so we have to get it from there.
*/
assert(!diag_is_empty(&fiber()->diag));
e = diag_last_error(&fiber()->diag);
lua_pushboolean(L, false);
luaT_pusherror(L, e);
diag_clear(&fiber()->diag);
num_ret = 1;
} else {
lua_pushboolean(L, true);
if (child_L != NULL) {
num_ret = lua_gettop(child_L);
lua_xmove(child_L, L, num_ret);
}
}
if (child_L != NULL)
luaL_unref(L, LUA_REGISTRYINDEX, coro_ref);
return num_ret + 1;
}

static int
lbox_fiber_set_joinable(struct lua_State *L)
{

if (lua_gettop(L) != 2) {
luaL_error(L, "fiber.set_joinable(id, yesno): bad arguments");
}
struct fiber *fiber = lbox_checkfiber(L, 1);
bool yesno = lua_toboolean(L, 2);
fiber_set_joinable(fiber, yesno);
return 0;
}

static const struct luaL_Reg lbox_fiber_meta [] = {
{"id", lbox_fiber_id},
{"name", lbox_fiber_name},
Expand All @@ -575,6 +666,8 @@ static const struct luaL_Reg lbox_fiber_meta [] = {
{"testcancel", lbox_fiber_testcancel},
{"__serialize", lbox_fiber_serialize},
{"__tostring", lbox_fiber_tostring},
{"join", lbox_fiber_join},
{"set_joinable", lbox_fiber_set_joinable},
{"wakeup", lbox_fiber_wakeup},
{"__index", lbox_fiber_index},
{NULL, NULL}
Expand All @@ -589,9 +682,12 @@ static const struct luaL_Reg fiberlib[] = {
{"find", lbox_fiber_find},
{"kill", lbox_fiber_cancel},
{"wakeup", lbox_fiber_wakeup},
{"join", lbox_fiber_join},
{"set_joinable", lbox_fiber_set_joinable},
{"cancel", lbox_fiber_cancel},
{"testcancel", lbox_fiber_testcancel},
{"create", lbox_fiber_create},
{"new", lbox_fiber_new},
{"status", lbox_fiber_status},
{"name", lbox_fiber_name},
{NULL, NULL}
Expand Down
168 changes: 168 additions & 0 deletions test/app/fiber.result
Expand Up @@ -975,6 +975,174 @@ session_type
session_type = nil
---
...
-- gh-1397 fiber.new, fiber.join
test_run:cmd("setopt delimiter ';'")
---
- true
...
function err() box.error(box.error.ILLEGAL_PARAMS, 'oh my') end;
---
...
function test1()
f = fiber.new(err)
f:set_joinable(true)
local st, e = f:join()
return st, e
end;
---
...
st, e = test1();
---
...
st;
---
- false
...
e:unpack();
---
- type: ClientError
code: 1
message: Illegal parameters, oh my
trace:
- file: '[string "function err() box.error(box.error.ILLEGAL_PA..."]'
line: 1
...
flag = false;
---
...
function test2()
f = fiber.new(function() flag = true end)
fiber.set_joinable(f, true)
fiber.join(f)
end;
---
...
test2();
---
...
flag;
---
- true
...
function test3()
f = fiber.new(function() return "hello" end)
fiber.set_joinable(f, true)
return fiber.join(f)
end;
---
...
test3();
---
- true
- hello
...
function test4()
f = fiber.new(function (i) return i + 1 end, 1)
fiber.set_joinable(f, true)
return f:join()
end;
---
...
test4();
---
- true
- 2
...
function test_double_join()
f = fiber.new(function (i) return i + 1 end, 1)
fiber.set_joinable(f, true)
f:join()
return f:join()
end;
---
...
test_double_join();
---
- error: the fiber is dead
...
function test5()
f = fiber.new(function() end)
f:set_joinable(true)
return f, f:status()
end;
---
...
local status;
---
...
f, status = test5();
---
...
status;
---
- suspended
...
f:status();
---
- suspended
...
f:join();
---
- true
...
f:status();
---
- dead
...
function test6()
f = fiber.new(function() end)
f:set_joinable(true)
f:set_joinable(false)
return f, f:status()
end;
---
...
f, status = test6();
---
...
status;
---
- suspended
...
f:status();
---
- dead
...
-- test side fiber in transaction
s = box.schema.space.create("test");
---
...
_ = s:create_index("prim", {parts={1, 'number'}});
---
...
flag = false;
---
...
function test7(i)
box.begin()
s:put{i}
fiber.new(function(inc) s:put{inc + 1} flag = true end, i)
box.rollback()
end;
---
...
f = fiber.create(test7, 1);
---
...
while flag ~= true do fiber.sleep(0.001) end;
---
...
s:select{};
---
- - [2]
...
s:drop();
---
...
test_run:cmd("setopt delimiter ''");
---
- true
...
fiber = nil
---
...
Expand Down

0 comments on commit 48cad35

Please sign in to comment.