Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

src: ready background workers before bootstrap #23110

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 18 additions & 18 deletions deps/uv/src/unix/thread.c
Expand Up @@ -83,7 +83,7 @@ int pthread_barrier_init(pthread_barrier_t* barrier,
}

int pthread_barrier_wait(pthread_barrier_t* barrier) {
int rc;
int rc, last;
_uv_barrier* b;

if (barrier == NULL || barrier->b == NULL)
Expand All @@ -95,29 +95,29 @@ int pthread_barrier_wait(pthread_barrier_t* barrier) {
return rc;

/* Increment the count. If this is the first thread to reach the threshold,
wake up waiters, unlock the mutex, then return
PTHREAD_BARRIER_SERIAL_THREAD. */
wake up waiters, unlock the mutex. */
if (++b->in == b->threshold) {
b->in = 0;
b->out = b->threshold - 1;
b->out = b->threshold;
rc = pthread_cond_signal(&b->cond);
assert(rc == 0);

pthread_mutex_unlock(&b->mutex);
return PTHREAD_BARRIER_SERIAL_THREAD;
} else {
/* Otherwise, wait for other threads until in is set to 0. */
do {
if (pthread_cond_wait(&b->cond, &b->mutex) != 0)
abort();
} while (b->in != 0);
}
/* Otherwise, wait for other threads until in is set to 0,
then return 0 to indicate this is not the first thread. */
do {
if ((rc = pthread_cond_wait(&b->cond, &b->mutex)) != 0)
break;
} while (b->in != 0);

/* mark thread exit */
b->out--;
pthread_cond_signal(&b->cond);

/* Mark thread exit. */
last = (--b->out == 0);
if (!last)
pthread_cond_signal(&b->cond); /* not needed on the last thread */

pthread_mutex_unlock(&b->mutex);
return rc;

/* If this the last thread to exit, return PTHREAD_BARRIER_SERIAL_THREAD. */
return last ? PTHREAD_BARRIER_SERIAL_THREAD : 0;
}

int pthread_barrier_destroy(pthread_barrier_t* barrier) {
Expand Down
43 changes: 43 additions & 0 deletions deps/uv/test/test-barrier.c
Expand Up @@ -104,3 +104,46 @@ TEST_IMPL(barrier_3) {

return 0;
}

static void serial_worker(void* data) {
uv_barrier_t* barrier = data;
if (uv_barrier_wait(barrier) > 0) {
uv_barrier_destroy(barrier);
}
uv_sleep(100); /* wait a bit before terminating. */
}

/* Ensure that uv_barrier_wait returns positive only after all threads have
exited the barrier. If this value is returned too early and the barrier is
destroyed prematurely, then this test may see a crash. */
TEST_IMPL(barrier_serial_thread) {
uv_thread_t threads[4];
uv_barrier_t barrier;
unsigned long i;

ASSERT(0 == uv_barrier_init(&barrier, ARRAY_SIZE(threads) + 1));

for (i = 0; i < ARRAY_SIZE(threads); ++i) {
ASSERT(0 == uv_thread_create(&threads[i], serial_worker, &barrier));
}

if (uv_barrier_wait(&barrier) > 0) {
uv_barrier_destroy(&barrier);
}

for (i = 0; i < ARRAY_SIZE(threads); ++i) {
ASSERT(0 == uv_thread_join(&threads[i]));
}

return 0;
}

/* Single thread uv_barrier_wait should return correct return value. */
TEST_IMPL(barrier_serial_thread_single) {
uv_barrier_t barrier;

ASSERT(0 == uv_barrier_init(&barrier, 1));
ASSERT(0 < uv_barrier_wait(&barrier));
uv_barrier_destroy(&barrier);
return 0;
}
4 changes: 4 additions & 0 deletions deps/uv/test/test-list.h
Expand Up @@ -37,6 +37,8 @@ TEST_DECLARE (default_loop_close)
TEST_DECLARE (barrier_1)
TEST_DECLARE (barrier_2)
TEST_DECLARE (barrier_3)
TEST_DECLARE (barrier_serial_thread)
TEST_DECLARE (barrier_serial_thread_single)
TEST_DECLARE (condvar_1)
TEST_DECLARE (condvar_2)
TEST_DECLARE (condvar_3)
Expand Down Expand Up @@ -456,6 +458,8 @@ TASK_LIST_START
TEST_ENTRY (barrier_1)
TEST_ENTRY (barrier_2)
TEST_ENTRY (barrier_3)
TEST_ENTRY (barrier_serial_thread)
TEST_ENTRY (barrier_serial_thread_single)
TEST_ENTRY (condvar_1)
TEST_ENTRY (condvar_2)
TEST_ENTRY (condvar_3)
Expand Down
31 changes: 29 additions & 2 deletions src/node_platform.cc
Expand Up @@ -18,10 +18,23 @@ using v8::TracingController;

namespace {

struct PlatformWorkerData {
TaskQueue<Task>* task_queue;
uv_barrier_t* barrier;
int id;
};

static void PlatformWorkerThread(void* data) {
PlatformWorkerData* worker_data = static_cast<PlatformWorkerData*>(data);
TaskQueue<Task>* pending_worker_tasks = worker_data->task_queue;
TRACE_EVENT_METADATA1("__metadata", "thread_name", "name",
"PlatformWorkerThread");
TaskQueue<Task>* pending_worker_tasks = static_cast<TaskQueue<Task>*>(data);

if (uv_barrier_wait(worker_data->barrier) > 0) {
uv_barrier_destroy(worker_data->barrier);
delete worker_data->barrier;
worker_data->barrier = nullptr;
}
while (std::unique_ptr<Task> task = pending_worker_tasks->BlockingPop()) {
task->Run();
pending_worker_tasks->NotifyOfCompletion();
Expand Down Expand Up @@ -148,17 +161,31 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler {
};

WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) {
uv_barrier_t* barrier = new uv_barrier_t;
uv_barrier_init(barrier, thread_pool_size + 1);

delayed_task_scheduler_.reset(
new DelayedTaskScheduler(&pending_worker_tasks_));
threads_.push_back(delayed_task_scheduler_->Start());

for (int i = 0; i < thread_pool_size; i++) {
// FIXME(ofrobots): need to delete upon shutdown.
PlatformWorkerData* worker_data = new PlatformWorkerData{
&pending_worker_tasks_, barrier, i
};
std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
if (uv_thread_create(t.get(), PlatformWorkerThread,
&pending_worker_tasks_) != 0) {
worker_data) != 0) {
break;
}
threads_.push_back(std::move(t));
}

// Wait for all the worker threads to be initialized.
if (uv_barrier_wait(barrier) > 0) {
uv_barrier_destroy(barrier);
delete barrier;
}
}

void WorkerThreadsTaskRunner::PostTask(std::unique_ptr<Task> task) {
Expand Down