Skip to content

Commit

Permalink
vinyl: use cbus for communication between scheduler and worker threads
Browse files Browse the repository at this point in the history
We need cbus for forwarding deferred DELETE statements generated in a
worker thread during primary index compaction to the tx thread where
they can be inserted into secondary indexes. Since pthread mutex/cond
and cbus are incompatible by their nature, let's rework communication
channel between the tx and worker threads using cbus.

Needed for #2129
  • Loading branch information
locker committed Aug 1, 2018
1 parent 46f50aa commit f4625e6
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 106 deletions.
215 changes: 130 additions & 85 deletions src/box/vy_scheduler.c
Expand Up @@ -46,6 +46,7 @@
#include "errinj.h"
#include "fiber.h"
#include "fiber_cond.h"
#include "cbus.h"
#include "salad/stailq.h"
#include "say.h"
#include "vy_lsm.h"
Expand All @@ -55,14 +56,34 @@
#include "vy_run.h"
#include "vy_write_iterator.h"
#include "trivia/util.h"
#include "tt_pthread.h"

/* Min and max values for vy_scheduler::timeout. */
#define VY_SCHEDULER_TIMEOUT_MIN 1
#define VY_SCHEDULER_TIMEOUT_MAX 60

static void *vy_worker_f(void *);
static int vy_worker_f(va_list);
static int vy_scheduler_f(va_list);
static void vy_task_execute_f(struct cmsg *);
static void vy_task_complete_f(struct cmsg *);

static const struct cmsg_hop vy_task_execute_route[] = {
{ vy_task_execute_f, NULL },
};

static const struct cmsg_hop vy_task_complete_route[] = {
{ vy_task_complete_f, NULL },
};

/** Vinyl worker thread. */
struct vy_worker {
struct cord cord;
/** Pipe from tx to the worker thread. */
struct cpipe worker_pipe;
/** Pipe from the worker thread to tx. */
struct cpipe tx_pipe;
/** Link in vy_scheduler::idle_workers. */
struct stailq_entry in_idle;
};

struct vy_task;

Expand All @@ -89,10 +110,22 @@ struct vy_task_ops {
};

struct vy_task {
/**
* CBus message used for sending the task to/from
* a worker thread.
*/
struct cmsg cmsg;
/** Virtual method table. */
const struct vy_task_ops *ops;
/** Pointer to the scheduler. */
struct vy_scheduler *scheduler;
/** Worker thread this task is assigned to. */
struct vy_worker *worker;
/**
* Fiber that is currently executing this task in
* a worker thread.
*/
struct fiber *fiber;
/** Return code of ->execute. */
int status;
/** If ->execute fails, the error is stored here. */
Expand Down Expand Up @@ -126,8 +159,6 @@ struct vy_task {
*/
double bloom_fpr;
int64_t page_size;
/** Link in vy_scheduler::pending_tasks. */
struct stailq_entry in_pending;
/** Link in vy_scheduler::processed_tasks. */
struct stailq_entry in_processed;
};
Expand Down Expand Up @@ -240,16 +271,6 @@ vy_compact_heap_less(struct heap_node *a, struct heap_node *b)
#undef HEAP_LESS
#undef HEAP_NAME

static void
vy_scheduler_async_cb(ev_loop *loop, struct ev_async *watcher, int events)
{
(void)loop;
(void)events;
struct vy_scheduler *scheduler = container_of(watcher,
struct vy_scheduler, scheduler_async);
fiber_cond_signal(&scheduler->scheduler_cond);
}

static void
vy_scheduler_start_workers(struct vy_scheduler *scheduler)
{
Expand All @@ -260,17 +281,19 @@ vy_scheduler_start_workers(struct vy_scheduler *scheduler)
scheduler->is_worker_pool_running = true;
scheduler->idle_worker_count = scheduler->worker_pool_size;
scheduler->worker_pool = calloc(scheduler->worker_pool_size,
sizeof(struct cord));
sizeof(*scheduler->worker_pool));
if (scheduler->worker_pool == NULL)
panic("failed to allocate vinyl worker pool");

ev_async_start(scheduler->scheduler_loop, &scheduler->scheduler_async);
for (int i = 0; i < scheduler->worker_pool_size; i++) {
char name[FIBER_NAME_MAX];
snprintf(name, sizeof(name), "vinyl.writer.%d", i);
if (cord_start(&scheduler->worker_pool[i], name,
vy_worker_f, scheduler) != 0)
struct vy_worker *worker = &scheduler->worker_pool[i];
if (cord_costart(&worker->cord, name, vy_worker_f, worker) != 0)
panic("failed to start vinyl worker thread");
cpipe_create(&worker->worker_pipe, name);
stailq_add_tail_entry(&scheduler->idle_workers,
worker, in_idle);
}
}

Expand All @@ -280,16 +303,12 @@ vy_scheduler_stop_workers(struct vy_scheduler *scheduler)
assert(scheduler->is_worker_pool_running);
scheduler->is_worker_pool_running = false;

/* Wake up worker threads. */
tt_pthread_mutex_lock(&scheduler->mutex);
pthread_cond_broadcast(&scheduler->worker_cond);
tt_pthread_mutex_unlock(&scheduler->mutex);

/* Wait for worker threads to exit. */
for (int i = 0; i < scheduler->worker_pool_size; i++)
cord_join(&scheduler->worker_pool[i]);
ev_async_stop(scheduler->scheduler_loop, &scheduler->scheduler_async);

for (int i = 0; i < scheduler->worker_pool_size; i++) {
struct vy_worker *worker = &scheduler->worker_pool[i];
cbus_stop_loop(&worker->worker_pipe);
cpipe_destroy(&worker->worker_pipe);
cord_join(&worker->cord);
}
free(scheduler->worker_pool);
scheduler->worker_pool = NULL;
}
Expand All @@ -310,19 +329,14 @@ vy_scheduler_create(struct vy_scheduler *scheduler, int write_threads,
if (scheduler->scheduler_fiber == NULL)
panic("failed to allocate vinyl scheduler fiber");

scheduler->scheduler_loop = loop();
fiber_cond_create(&scheduler->scheduler_cond);
ev_async_init(&scheduler->scheduler_async, vy_scheduler_async_cb);

scheduler->worker_pool_size = write_threads;
mempool_create(&scheduler->task_pool, cord_slab_cache(),
sizeof(struct vy_task));
stailq_create(&scheduler->pending_tasks);
stailq_create(&scheduler->idle_workers);
stailq_create(&scheduler->processed_tasks);

tt_pthread_cond_init(&scheduler->worker_cond, NULL);
tt_pthread_mutex_init(&scheduler->mutex, NULL);

vy_dump_heap_create(&scheduler->dump_heap);
vy_compact_heap_create(&scheduler->compact_heap);

Expand All @@ -344,9 +358,6 @@ vy_scheduler_destroy(struct vy_scheduler *scheduler)
if (scheduler->is_worker_pool_running)
vy_scheduler_stop_workers(scheduler);

tt_pthread_cond_destroy(&scheduler->worker_cond);
tt_pthread_mutex_destroy(&scheduler->mutex);

diag_destroy(&scheduler->diag);
mempool_destroy(&scheduler->task_pool);
fiber_cond_destroy(&scheduler->dump_cond);
Expand Down Expand Up @@ -647,6 +658,8 @@ vy_run_discard(struct vy_run *run)
static int
vy_task_write_run(struct vy_task *task)
{
enum { YIELD_LOOPS = 32 };

struct vy_lsm *lsm = task->lsm;
struct vy_stmt_stream *wi = task->wi;

Expand All @@ -668,6 +681,7 @@ vy_task_write_run(struct vy_task *task)
if (wi->iface->start(wi) != 0)
goto fail_abort_writer;
int rc;
int loops = 0;
struct tuple *stmt = NULL;
while ((rc = wi->iface->next(wi, &stmt)) == 0 && stmt != NULL) {
inj = errinj(ERRINJ_VY_RUN_WRITE_STMT_TIMEOUT, ERRINJ_DOUBLE);
Expand All @@ -678,7 +692,9 @@ vy_task_write_run(struct vy_task *task)
if (rc != 0)
break;

if (!task->scheduler->is_worker_pool_running) {
if (++loops % YIELD_LOOPS == 0)
fiber_sleep(0);
if (fiber_is_cancelled()) {
diag_set(FiberIsCancelled);
rc = -1;
break;
Expand Down Expand Up @@ -1315,6 +1331,62 @@ vy_task_compact_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm,
return -1;
}

/**
* Fiber function that actually executes a vinyl task.
* After finishing a task, it sends it back to tx.
*/
static int
vy_task_f(va_list va)
{
struct vy_task *task = va_arg(va, struct vy_task *);
task->status = task->ops->execute(task);
if (task->status != 0) {
struct diag *diag = diag_get();
assert(!diag_is_empty(diag));
diag_move(diag, &task->diag);
}
cmsg_init(&task->cmsg, vy_task_complete_route);
cpipe_push(&task->worker->tx_pipe, &task->cmsg);
task->fiber = NULL;
return 0;
}

/**
* Callback invoked by a worker thread upon receiving a task.
* It schedules a fiber which actually executes the task, so
* as not to block the event loop.
*/
static void
vy_task_execute_f(struct cmsg *cmsg)
{
struct vy_task *task = container_of(cmsg, struct vy_task, cmsg);
assert(task->fiber == NULL);
task->fiber = fiber_new("task", vy_task_f);
if (task->fiber == NULL) {
task->status = -1;
diag_move(diag_get(), &task->diag);
cmsg_init(&task->cmsg, vy_task_complete_route);
cpipe_push(&task->worker->tx_pipe, &task->cmsg);
} else {
fiber_start(task->fiber, task);
}
}

/**
* Callback invoked by the tx thread upon receiving an executed
* task from a worker thread. It adds the task to the processed
* task queue and wakes up the scheduler so that it can complete
* it.
*/
static void
vy_task_complete_f(struct cmsg *cmsg)
{
struct vy_task *task = container_of(cmsg, struct vy_task, cmsg);
stailq_add_tail_entry(&task->scheduler->processed_tasks,
task, in_processed);
fiber_cond_signal(&task->scheduler->scheduler_cond);
}

/**
* Create a task for dumping an LSM tree. The new task is returned
* in @ptask. If there's no LSM tree that needs to be dumped @ptask
Expand Down Expand Up @@ -1503,13 +1575,10 @@ vy_scheduler_f(va_list va)
struct stailq processed_tasks;
struct vy_task *task, *next;
int tasks_failed = 0, tasks_done = 0;
bool was_empty;

/* Get the list of processed tasks. */
stailq_create(&processed_tasks);
tt_pthread_mutex_lock(&scheduler->mutex);
stailq_concat(&processed_tasks, &scheduler->processed_tasks);
tt_pthread_mutex_unlock(&scheduler->mutex);

/* Complete and delete all processed tasks. */
stailq_foreach_entry_safe(task, next, &processed_tasks,
Expand All @@ -1518,6 +1587,8 @@ vy_scheduler_f(va_list va)
tasks_failed++;
else
tasks_done++;
stailq_add_entry(&scheduler->idle_workers,
task->worker, in_idle);
vy_task_delete(task);
scheduler->idle_worker_count++;
assert(scheduler->idle_worker_count <=
Expand Down Expand Up @@ -1553,15 +1624,13 @@ vy_scheduler_f(va_list va)
goto wait;

/* Queue the task and notify workers if necessary. */
tt_pthread_mutex_lock(&scheduler->mutex);
was_empty = stailq_empty(&scheduler->pending_tasks);
stailq_add_tail_entry(&scheduler->pending_tasks,
task, in_pending);
if (was_empty)
tt_pthread_cond_signal(&scheduler->worker_cond);
tt_pthread_mutex_unlock(&scheduler->mutex);

assert(!stailq_empty(&scheduler->idle_workers));
task->worker = stailq_shift_entry(&scheduler->idle_workers,
struct vy_worker, in_idle);
scheduler->idle_worker_count--;
cmsg_init(&task->cmsg, vy_task_execute_route);
cpipe_push(&task->worker->worker_pipe, &task->cmsg);

fiber_reschedule();
continue;
error:
Expand Down Expand Up @@ -1597,41 +1666,17 @@ vy_scheduler_f(va_list va)
return 0;
}

static void *
vy_worker_f(void *arg)
static int
vy_worker_f(va_list ap)
{
struct vy_scheduler *scheduler = arg;
struct vy_task *task = NULL;

tt_pthread_mutex_lock(&scheduler->mutex);
while (scheduler->is_worker_pool_running) {
/* Wait for a task */
if (stailq_empty(&scheduler->pending_tasks)) {
/* Wake scheduler up if there are no more tasks */
ev_async_send(scheduler->scheduler_loop,
&scheduler->scheduler_async);
tt_pthread_cond_wait(&scheduler->worker_cond,
&scheduler->mutex);
continue;
}
task = stailq_shift_entry(&scheduler->pending_tasks,
struct vy_task, in_pending);
tt_pthread_mutex_unlock(&scheduler->mutex);
assert(task != NULL);

/* Execute task */
task->status = task->ops->execute(task);
if (task->status != 0) {
struct diag *diag = diag_get();
assert(!diag_is_empty(diag));
diag_move(diag, &task->diag);
}

/* Return processed task to scheduler */
tt_pthread_mutex_lock(&scheduler->mutex);
stailq_add_tail_entry(&scheduler->processed_tasks,
task, in_processed);
}
tt_pthread_mutex_unlock(&scheduler->mutex);
return NULL;
struct vy_worker *worker = va_arg(ap, struct vy_worker *);
struct cbus_endpoint endpoint;

cpipe_create(&worker->tx_pipe, "tx");
cbus_endpoint_create(&endpoint, cord_name(&worker->cord),
fiber_schedule_cb, fiber());
cbus_loop(&endpoint);
cbus_endpoint_destroy(&endpoint, cbus_process);
cpipe_destroy(&worker->tx_pipe);
return 0;
}

0 comments on commit f4625e6

Please sign in to comment.