Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use modern v8::Platform worker threads APIs. #69

Merged
merged 1 commit into from
May 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ static struct {
}

void DrainVMTasks(Isolate* isolate) {
platform_->DrainBackgroundTasks(isolate);
platform_->DrainTasks(isolate);
}

void CancelVMTasks(Isolate* isolate) {
Expand Down
2 changes: 1 addition & 1 deletion src/node.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ class Environment;
class MultiIsolatePlatform : public v8::Platform {
public:
virtual ~MultiIsolatePlatform() { }
virtual void DrainBackgroundTasks(v8::Isolate* isolate) = 0;
virtual void DrainTasks(v8::Isolate* isolate) = 0;
virtual void CancelPendingDelayedTasks(v8::Isolate* isolate) = 0;

// These will be called by the `IsolateData` creation/destruction functions.
Expand Down
76 changes: 38 additions & 38 deletions src/node_platform.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,48 +16,50 @@ using v8::Platform;
using v8::Task;
using v8::TracingController;

static void BackgroundRunner(void* data) {
TaskQueue<Task>* background_tasks = static_cast<TaskQueue<Task>*>(data);
while (std::unique_ptr<Task> task = background_tasks->BlockingPop()) {
namespace {

static void WorkerThreadMain(void* data) {
TaskQueue<Task>* pending_worker_tasks = static_cast<TaskQueue<Task>*>(data);
while (std::unique_ptr<Task> task = pending_worker_tasks->BlockingPop()) {
task->Run();
background_tasks->NotifyOfCompletion();
pending_worker_tasks->NotifyOfCompletion();
}
}

BackgroundTaskRunner::BackgroundTaskRunner(int thread_pool_size) {
} // namespace

WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) {
for (int i = 0; i < thread_pool_size; i++) {
std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
if (uv_thread_create(t.get(), BackgroundRunner, &background_tasks_) != 0)
if (uv_thread_create(t.get(), WorkerThreadMain,
&pending_worker_tasks_) != 0) {
break;
}
threads_.push_back(std::move(t));
}
}

void BackgroundTaskRunner::PostTask(std::unique_ptr<Task> task) {
background_tasks_.Push(std::move(task));
}

void BackgroundTaskRunner::PostIdleTask(std::unique_ptr<v8::IdleTask> task) {
UNREACHABLE();
void WorkerThreadsTaskRunner::PostTask(std::unique_ptr<Task> task) {
pending_worker_tasks_.Push(std::move(task));
}

void BackgroundTaskRunner::PostDelayedTask(std::unique_ptr<v8::Task> task,
double delay_in_seconds) {
void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<v8::Task> task,
double delay_in_seconds) {
UNREACHABLE();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to verify: Even though CallDelayedOnWorkerThread() is being added to the API, this is still unreachable code atm?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. This doesn't change the previous logic. Having this be UNREACHABLE() was likely incorrect before and still is (but I'm not going to change that).

In practice the only v8 caller is devtools which I assume doesn't trigger in node so node has been getting away with this UNREACHABLE for now...

Copy link

@gahaas gahaas May 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unfortunate that CallDelayedOnWorkerThread() was used in V8 for the first time just before we were about to remove the API. I guess we can implement it the same as for foreground tasks if we ever have to.


void BackgroundTaskRunner::BlockingDrain() {
background_tasks_.BlockingDrain();
void WorkerThreadsTaskRunner::BlockingDrain() {
pending_worker_tasks_.BlockingDrain();
}

void BackgroundTaskRunner::Shutdown() {
background_tasks_.Stop();
void WorkerThreadsTaskRunner::Shutdown() {
pending_worker_tasks_.Stop();
for (size_t i = 0; i < threads_.size(); i++) {
CHECK_EQ(0, uv_thread_join(threads_[i].get()));
}
}

size_t BackgroundTaskRunner::NumberOfAvailableBackgroundThreads() const {
int WorkerThreadsTaskRunner::NumberOfWorkerThreads() const {
return threads_.size();
}

Expand Down Expand Up @@ -120,8 +122,8 @@ NodePlatform::NodePlatform(int thread_pool_size,
TracingController* controller = new TracingController();
tracing_controller_.reset(controller);
}
background_task_runner_ =
std::make_shared<BackgroundTaskRunner>(thread_pool_size);
worker_thread_task_runner_ =
std::make_shared<WorkerThreadsTaskRunner>(thread_pool_size);
}

void NodePlatform::RegisterIsolate(IsolateData* isolate_data, uv_loop_t* loop) {
Expand All @@ -147,16 +149,16 @@ void NodePlatform::UnregisterIsolate(IsolateData* isolate_data) {
}

void NodePlatform::Shutdown() {
background_task_runner_->Shutdown();
worker_thread_task_runner_->Shutdown();

{
Mutex::ScopedLock lock(per_isolate_mutex_);
per_isolate_.clear();
}
}

size_t NodePlatform::NumberOfAvailableBackgroundThreads() {
return background_task_runner_->NumberOfAvailableBackgroundThreads();
int NodePlatform::NumberOfWorkerThreads() {
return worker_thread_task_runner_->NumberOfWorkerThreads();
}

void PerIsolatePlatformData::RunForegroundTask(std::unique_ptr<Task> task) {
Expand Down Expand Up @@ -188,15 +190,12 @@ void PerIsolatePlatformData::CancelPendingDelayedTasks() {
scheduled_delayed_tasks_.clear();
}

void NodePlatform::DrainBackgroundTasks(Isolate* isolate) {
void NodePlatform::DrainTasks(Isolate* isolate) {
std::shared_ptr<PerIsolatePlatformData> per_isolate = ForIsolate(isolate);

do {
// Right now, there is no way to drain only background tasks associated
// with a specific isolate, so this sometimes does more work than
// necessary. In the long run, that functionality is probably going to
// be available anyway, though.
background_task_runner_->BlockingDrain();
// Worker tasks aren't associated with an Isolate.
worker_thread_task_runner_->BlockingDrain();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason for updating the comment?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wanted to remove
"In the long run, that functionality is probably going to be available anyway, though."
I can leave the rest if you think it's better than my shorter version.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can feel free to remove it, but that sentence was more or less a reminder to myself that we might want this, so I was curious if there’s anything inherently wrong with the idea?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be worker threads for each core regardless of the number of isolates. Isolates are bound to a single task runner (usually mapped to a single thread), but worker threads are shared so we removed the Isolate* parameter (both Chrome and node didn't use it).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for context, the original idea was that when an Isolate is being removed (e.g. one for a WebWorker-style thread), we want the tasks associated with it to be drained first before disposing it; so what we’d have to do is to is to somehow keep track of which tasks were created for which Isolate, even the background tasks. I might be totally off about this. :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tasks posted to workers should either not use state bound to the Isolate or be cancellable.

Isolate::DeInit() makes sure to wait for cancelled tasks to be flagged as cancelled or be done running via CancelAndWait() @ https://cs.chromium.org/chromium/src/v8/src/isolate.cc?type=cs&q=file:isolate.cc+CancelAndWait&sq=package:chromium

As such, there's no need to drain tasks before getting rid of an Isolate, I think. It would be a good idea to mention this specifically in v8::Platform, make sure it's true for existing callers, and remove the draining logic in node IMO (but, to be clear, I will not do this -- just looking to clean up the old APIs before transitioning).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this was helpful!

but, to be clear, I will not do this

I can take care of at least documenting it. :)

Copy link

@gahaas gahaas May 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, at the moment every task posted by V8 should be cancelable, so V8 can kind of clean up the task queue by itself when the isolate is tearing down. The reason is that tasks should not own the memory they operate on. This invariant avoids synchronization issues between isolate shutdown and task destruction. Therefore the isolate (directly or indirectly) owns the memory of the task, which is why the task has to be cancelled before the isolate goes away.

I will add some documentation for this in V8 as well.

} while (per_isolate->FlushForegroundTasksInternal());
}

Expand Down Expand Up @@ -230,11 +229,17 @@ bool PerIsolatePlatformData::FlushForegroundTasksInternal() {
return did_work;
}

void NodePlatform::CallOnBackgroundThread(Task* task,
ExpectedRuntime expected_runtime) {
background_task_runner_->PostTask(std::unique_ptr<Task>(task));
void NodePlatform::CallOnWorkerThread(std::unique_ptr<v8::Task> task) {
worker_thread_task_runner_->PostTask(std::move(task));
}

void NodePlatform::CallDelayedOnWorkerThread(std::unique_ptr<v8::Task> task,
double delay_in_seconds) {
worker_thread_task_runner_->PostDelayedTask(std::move(task),
delay_in_seconds);
}


std::shared_ptr<PerIsolatePlatformData>
NodePlatform::ForIsolate(Isolate* isolate) {
Mutex::ScopedLock lock(per_isolate_mutex_);
Expand Down Expand Up @@ -264,11 +269,6 @@ void NodePlatform::CancelPendingDelayedTasks(v8::Isolate* isolate) {

bool NodePlatform::IdleTasksEnabled(Isolate* isolate) { return false; }

std::shared_ptr<v8::TaskRunner>
NodePlatform::GetBackgroundTaskRunner(Isolate* isolate) {
return background_task_runner_;
}

std::shared_ptr<v8::TaskRunner>
NodePlatform::GetForegroundTaskRunner(Isolate* isolate) {
return ForIsolate(isolate);
Expand Down
30 changes: 14 additions & 16 deletions src/node_platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,23 +89,22 @@ class PerIsolatePlatformData :
std::vector<DelayedTaskPointer> scheduled_delayed_tasks_;
};

// This acts as the single background task runner for all Isolates.
class BackgroundTaskRunner : public v8::TaskRunner {
// This acts as the single worker thread task runner for all Isolates.
class WorkerThreadsTaskRunner {
public:
explicit BackgroundTaskRunner(int thread_pool_size);
explicit WorkerThreadsTaskRunner(int thread_pool_size);

void PostTask(std::unique_ptr<v8::Task> task) override;
void PostIdleTask(std::unique_ptr<v8::IdleTask> task) override;
void PostTask(std::unique_ptr<v8::Task> task);
void PostDelayedTask(std::unique_ptr<v8::Task> task,
double delay_in_seconds) override;
bool IdleTasksEnabled() override { return false; };
double delay_in_seconds);

void BlockingDrain();
void Shutdown();

size_t NumberOfAvailableBackgroundThreads() const;
int NumberOfWorkerThreads() const;

private:
TaskQueue<v8::Task> background_tasks_;
TaskQueue<v8::Task> pending_worker_tasks_;
std::vector<std::unique_ptr<uv_thread_t>> threads_;
};

Expand All @@ -114,14 +113,15 @@ class NodePlatform : public MultiIsolatePlatform {
NodePlatform(int thread_pool_size, v8::TracingController* tracing_controller);
virtual ~NodePlatform() {}

void DrainBackgroundTasks(v8::Isolate* isolate) override;
void DrainTasks(v8::Isolate* isolate) override;
void CancelPendingDelayedTasks(v8::Isolate* isolate) override;
void Shutdown();

// v8::Platform implementation.
size_t NumberOfAvailableBackgroundThreads() override;
void CallOnBackgroundThread(v8::Task* task,
ExpectedRuntime expected_runtime) override;
int NumberOfWorkerThreads() override;
void CallOnWorkerThread(std::unique_ptr<v8::Task> task) override;
void CallDelayedOnWorkerThread(std::unique_ptr<v8::Task> task,
double delay_in_seconds) override;
void CallOnForegroundThread(v8::Isolate* isolate, v8::Task* task) override;
void CallDelayedOnForegroundThread(v8::Isolate* isolate, v8::Task* task,
double delay_in_seconds) override;
Expand All @@ -135,8 +135,6 @@ class NodePlatform : public MultiIsolatePlatform {
void RegisterIsolate(IsolateData* isolate_data, uv_loop_t* loop) override;
void UnregisterIsolate(IsolateData* isolate_data) override;

std::shared_ptr<v8::TaskRunner> GetBackgroundTaskRunner(
v8::Isolate* isolate) override;
std::shared_ptr<v8::TaskRunner> GetForegroundTaskRunner(
v8::Isolate* isolate) override;

Expand All @@ -148,7 +146,7 @@ class NodePlatform : public MultiIsolatePlatform {
std::shared_ptr<PerIsolatePlatformData>> per_isolate_;

std::unique_ptr<v8::TracingController> tracing_controller_;
std::shared_ptr<BackgroundTaskRunner> background_task_runner_;
std::shared_ptr<WorkerThreadsTaskRunner> worker_thread_task_runner_;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be a shared_ptr now.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean? It is an std::shared_ptr..?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry. I meant, can it be a unique_ptr now?

};

} // namespace node
Expand Down