Skip to content

Commit

Permalink
fiber: make cord_cojoin cancellable
Browse files Browse the repository at this point in the history
We may need to cancel fiber that waits for cord to finish. For this
purpose let's cancel fiber started by cord_costart inside the cord.

Note that there is a race between stopping cancel_event in cord and
triggering it using ev_async_send in joining thread. AFAIU it is safe.

We also need to fix stopping wal cord to address stack-use-after-return
issue shown below. Is arises because we did not stop async which resides
in wal endpoint and endpoint resides on stack. Later when we stop the
introduced cancel_event we access not stopped async which at this moment
gone out of scope.

It is simple, we only need to destroy wal endpoint. But then we got
assertion on deleting endpoint as endpoints in other cords are not
destroyed and when we delete wal endpoint we access rlist links which
reside in freed memory of other endpoints. So if we want to cleanup
cleanly we need to stop vinyl loop properly which is reverting the
commit e463128 ("vinyl: cancel reader and writer threads on
shutdown"). So this issue needs more attention. Let's postpone it by
temporary suppressing ASAN issue.

```
==3224698==ERROR: AddressSanitizer: stack-use-after-return on address 0x7f654b3b0170 at pc 0x555a2817c282 bp 0x7f654ca55b30 sp 0x7f654ca55b28
	WRITE of size 4 at 0x7f654b3b0170 thread T3
	#0 0x555a2817c281 in ev_async_stop /home/shiny/dev/tarantool/third_party/libev/ev.c:5492:37
	tarantool#1 0x555a27827738 in cord_thread_func /home/shiny/dev/tarantool/src/lib/core/fiber.c:1990:2
	tarantool#2 0x7f65574aa9ea in start_thread /usr/src/debug/glibc/glibc/nptl/pthread_create.c:444:8
	tarantool#3 0x7f655752e7cb in clone3 /usr/src/debug/glibc/glibc/misc/../sysdeps/unix/sysv/linux/x86_64/clone3.S:78
```

Part of tarantool#8423

NO_DOC=internal
NO_CHANGELOG=internal
  • Loading branch information
nshy committed Dec 12, 2023
1 parent 21112b0 commit 4f6a788
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 3 deletions.
2 changes: 2 additions & 0 deletions asan/asan.supp
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@
# File format:
#fun:*
#src:*
# TODO(gh-8423) expected to be removed when the issue is solved
fun:ev_async_stop
22 changes: 22 additions & 0 deletions src/lib/core/fiber.c
Original file line number Diff line number Diff line change
Expand Up @@ -1806,6 +1806,20 @@ box_region_truncate(size_t size)
return region_truncate(&fiber()->gc, size);
}

/**
* Callback for cancel_event. This event is used to signal to cord that
* its fiber should be cancelled.
*/
static void
cord_cancel_callback(ev_loop *loop, ev_async *watcher, int revents)
{
(void)loop;
(void)watcher;
(void)revents;
if (cord()->main_fiber)
fiber_cancel(cord()->main_fiber);
}

void
cord_create(struct cord *cord, const char *name)
{
Expand Down Expand Up @@ -1846,6 +1860,7 @@ cord_create(struct cord *cord, const char *name)
* event loop iteration.
*/
ev_async_init(&cord->wakeup_event, fiber_schedule_wakeup);
cord->main_fiber = NULL;

ev_idle_init(&cord->idle_event, fiber_schedule_idle);

Expand Down Expand Up @@ -1942,6 +1957,8 @@ void *cord_thread_func(void *p)
{
struct cord_thread_arg *ct_arg = (struct cord_thread_arg *) p;
cord_create(ct_arg->cord, (ct_arg->name));
ev_async_init(&cord()->cancel_event, cord_cancel_callback);
ev_async_start(loop(), &cord()->cancel_event);
/** Can't possibly be the main thread */
assert(cord()->id != main_thread_id);
tt_pthread_mutex_lock(&ct_arg->start_mutex);
Expand Down Expand Up @@ -1970,6 +1987,7 @@ void *cord_thread_func(void *p)
if (!changed)
handler->callback(handler->argument);

ev_async_stop(loop(), &cord()->cancel_event);
cord_exit(cord());

return res;
Expand Down Expand Up @@ -2110,6 +2128,8 @@ cord_cojoin(struct cord *cord)
*/
do {
assert(cord->id != 0);
if (fiber_is_cancelled())
ev_async_send(cord->loop, &cord->cancel_event);
fiber_yield();
} while (!ctx.task_complete);
}
Expand Down Expand Up @@ -2143,6 +2163,7 @@ cord_costart_thread_func(void *arg)
struct fiber *f = fiber_new("main", ctx.run);
if (f == NULL)
return NULL;
cord()->main_fiber = f;

TRIGGER(break_ev_loop, break_ev_loop_f);
/*
Expand All @@ -2163,6 +2184,7 @@ cord_costart_thread_func(void *arg)
assert(fiber_is_dead(f));
fiber()->f_ret = fiber_join(f);
fiber_check_gc();
cord()->main_fiber = NULL;

return NULL;
}
Expand Down
11 changes: 8 additions & 3 deletions src/lib/core/fiber.h
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,10 @@ struct cord {
/** Slice for current fiber execution in seconds. */
struct fiber_slice slice;
char name[FIBER_NAME_INLINE];
/** Cord main fiber started in case of cord_costart. */
struct fiber *main_fiber;
/** An event triggered to cancel cord main fiber. */
ev_async cancel_event;
};

extern __thread struct cord *cord_ptr;
Expand Down Expand Up @@ -903,14 +907,15 @@ int
cord_costart(struct cord *cord, const char *name, fiber_func f, void *arg);

/**
* Yield until \a cord has terminated.
*
* On success:
* Yield until \a cord has terminated. If fiber is cancelled
* then cancel is progarated to the cord main fiber if cord is started
* using cord_costart.
*
* If \a cord has terminated with an uncaught exception
* the exception is moved to the current fiber's diagnostics
* area, otherwise the current fiber's diagnostics area is
* cleared.
*
* @param cord cord
* @sa pthread_join()
*
Expand Down
46 changes: 46 additions & 0 deletions test/unit/fiber.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,25 @@ waker_f(va_list ap)
return 0;
}

static int
canceller_f(va_list ap)
{
struct fiber *main_fiber = (struct fiber *)fiber()->f_arg;
fiber_cancel(main_fiber);
return 0;
}

static int
watcher_f(va_list ap)
{
fiber_sleep(1);
if (fiber_is_cancelled())
return 0;

fail("watcher timeout", "triggered");
unreachable();
}

static void
fiber_join_test()
{
Expand Down Expand Up @@ -399,6 +418,32 @@ cord_cojoin_test(void)
footer();
}

static void
cord_cojoin_cancel_test(void)
{
header();

struct cord cord;
fail_if(cord_costart(&cord, "cord", wait_cancel_f, NULL) != 0);

struct fiber *canceller_fiber = fiber_new("canceller", canceller_f);
fail_if(canceller_fiber == NULL);
canceller_fiber->f_arg = fiber();
fiber_wakeup(canceller_fiber);

struct fiber *watcher_fiber = fiber_new("watcher", watcher_f);
fail_if(watcher_fiber == NULL);
fiber_set_joinable(watcher_fiber, true);
fiber_wakeup(watcher_fiber);

fail_if(cord_cojoin(&cord) != 0);

fiber_cancel(watcher_fiber);
fiber_join(watcher_fiber);

footer();
}

static void
cord_cancel_and_join_test(void)
{
Expand Down Expand Up @@ -548,6 +593,7 @@ main_f(va_list ap)
fiber_flags_respect_test();
fiber_wait_on_deadline_test();
cord_cojoin_test();
cord_cojoin_cancel_test();
cord_cancel_and_join_test();
fiber_test_defaults();
fiber_test_leak_modes();
Expand Down
2 changes: 2 additions & 0 deletions test/unit/fiber.result
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ OutOfMemory: Failed to allocate 42 bytes in allocator for exception
*** fiber_wait_on_deadline_test: done ***
*** cord_cojoin_test ***
*** cord_cojoin_test: done ***
*** cord_cojoin_cancel_test ***
*** cord_cojoin_cancel_test: done ***
*** cord_cancel_and_join_test ***
*** cord_cancel_and_join_test: done ***
*** fiber_test_defaults ***
Expand Down

0 comments on commit 4f6a788

Please sign in to comment.