Skip to content
This repository has been archived by the owner on Dec 15, 2018. It is now read-only.

Commit

Permalink
Bug 1083101 - Win32 implementation of the JobScheduler. r=jrmuizel
Browse files Browse the repository at this point in the history
  • Loading branch information
nical committed Sep 28, 2015
1 parent a5f1897 commit 120cce1
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 106 deletions.
30 changes: 29 additions & 1 deletion gfx/2d/JobScheduler.cpp
Expand Up @@ -23,7 +23,7 @@ bool JobScheduler::Init(uint32_t aNumThreads, uint32_t aNumQueues)
}

for (uint32_t i = 0; i < aNumThreads; ++i) {
sSingleton->mWorkerThreads.push_back(new WorkerThread(sSingleton->mDrawingQueues[i%aNumQueues]));
sSingleton->mWorkerThreads.push_back(WorkerThread::Create(sSingleton->mDrawingQueues[i%aNumQueues]));
}
return true;
}
Expand Down Expand Up @@ -232,5 +232,33 @@ SyncObject::AddSubsequent(Job* aJob)
{
}

WorkerThread::WorkerThread(MultiThreadedJobQueue* aJobQueue)
: mQueue(aJobQueue)
{
aJobQueue->RegisterThread();
}

void
WorkerThread::Run()
{
SetName("gfx worker");

for (;;) {
Job* commands = nullptr;
if (!mQueue->WaitForJob(commands)) {
mQueue->UnregisterThread();
return;
}

JobStatus status = JobScheduler::ProcessJob(commands);

if (status == JobStatus::Error) {
// Don't try to handle errors for now, but that's open to discussions.
// I expect errors to be mostly OOM issues.
MOZ_CRASH();
}
}
}

} //namespace
} //namespace
22 changes: 22 additions & 0 deletions gfx/2d/JobScheduler.h
Expand Up @@ -15,11 +15,14 @@
#include "mozilla/gfx/JobScheduler_posix.h"
#endif

#include <vector>

namespace mozilla {
namespace gfx {

class MultiThreadedJobQueue;
class SyncObject;
class WorkerThread;

class JobScheduler {
public:
Expand Down Expand Up @@ -224,6 +227,25 @@ struct MutexAutoLock {
Mutex* mMutex;
};

/// Base class for worker threads.
class WorkerThread
{
public:
static WorkerThread* Create(MultiThreadedJobQueue* aJobQueue);

virtual ~WorkerThread() {}

void Run();

MultiThreadedJobQueue* GetJobQueue() { return mQueue; }

protected:
explicit WorkerThread(MultiThreadedJobQueue* aJobQueue);

virtual void SetName(const char* aName) {}

MultiThreadedJobQueue* mQueue;
};

} // namespace
} // namespace
Expand Down
102 changes: 46 additions & 56 deletions gfx/2d/JobScheduler_posix.cpp
Expand Up @@ -11,6 +11,52 @@ using namespace std;
namespace mozilla {
namespace gfx {

void* ThreadCallback(void* threadData);

class WorkerThreadPosix : public WorkerThread {
public:
explicit WorkerThreadPosix(MultiThreadedJobQueue* aJobQueue)
: WorkerThread(aJobQueue)
{
pthread_create(&mThread, nullptr, ThreadCallback, static_cast<WorkerThread*>(this));
}

~WorkerThreadPosix()
{
pthread_join(mThread, nullptr);
}

virtual void SetName(const char* aName) override
{
// Call this from the thread itself because of Mac.
#ifdef XP_MACOSX
pthread_setname_np(aName);
#elif defined(__DragonFly__) || defined(__FreeBSD__) || defined(__OpenBSD__)
pthread_set_name_np(mThread, aName);
#elif defined(__NetBSD__)
pthread_setname_np(mThread, "%s", (void*)aName);
#else
pthread_setname_np(mThread, aName);
#endif
}

protected:
pthread_t mThread;
};

void* ThreadCallback(void* threadData)
{
WorkerThread* thread = static_cast<WorkerThread*>(threadData);
thread->Run();
return nullptr;
}

WorkerThread*
WorkerThread::Create(MultiThreadedJobQueue* aJobQueue)
{
return new WorkerThreadPosix(aJobQueue);
}

MultiThreadedJobQueue::MultiThreadedJobQueue()
: mThreadsCount(0)
, mShuttingDown(false)
Expand Down Expand Up @@ -108,62 +154,6 @@ MultiThreadedJobQueue::UnregisterThread()
}
}

void* ThreadCallback(void* threadData)
{
WorkerThread* thread = (WorkerThread*)threadData;
thread->Run();
return nullptr;
}

WorkerThread::WorkerThread(MultiThreadedJobQueue* aJobQueue)
: mQueue(aJobQueue)
{
aJobQueue->RegisterThread();
pthread_create(&mThread, nullptr, ThreadCallback, this);
}

WorkerThread::~WorkerThread()
{
pthread_join(mThread, nullptr);
}

void
WorkerThread::SetName(const char* aName)
{
// Call this from the thread itself because of Mac.
#ifdef XP_MACOSX
pthread_setname_np(aName);
#elif defined(__DragonFly__) || defined(__FreeBSD__) || defined(__OpenBSD__)
pthread_set_name_np(mThread, aName);
#elif defined(__NetBSD__)
pthread_setname_np(mThread, "%s", (void*)aName);
#else
pthread_setname_np(mThread, aName);
#endif
}

void
WorkerThread::Run()
{
SetName("gfx worker");

for (;;) {
Job* commands = nullptr;
if (!mQueue->WaitForJob(commands)) {
mQueue->UnregisterThread();
return;
}

JobStatus status = JobScheduler::ProcessJob(commands);

if (status == JobStatus::Error) {
// Don't try to handle errors for now, but that's open to discussions.
// I expect errors to be mostly OOM issues.
MOZ_CRASH();
}
}
}

EventObject::EventObject()
: mIsSet(false)
{}
Expand Down
22 changes: 1 addition & 21 deletions gfx/2d/JobScheduler_posix.h
Expand Up @@ -22,6 +22,7 @@ namespace gfx {

class Job;
class PosixCondVar;
class WorkerThread;

class Mutex {
public:
Expand Down Expand Up @@ -131,27 +132,6 @@ class MultiThreadedJobQueue {
friend class WorkerThread;
};

/// Worker thread that continuously dequeues Jobs from a MultiThreadedJobQueue
/// and process them.
///
/// The public interface of this class must remain identical to its equivalent
/// in JobScheduler_win32.h
class WorkerThread {
public:
explicit WorkerThread(MultiThreadedJobQueue* aJobQueue);

~WorkerThread();

void Run();

MultiThreadedJobQueue* GetJobQueue() { return mQueue; }
protected:
void SetName(const char* name);

MultiThreadedJobQueue* mQueue;
pthread_t mThread;
};

/// An object that a thread can synchronously wait on.
/// Usually set by a SetEventJob.
class EventObject : public external::AtomicRefCounted<EventObject>
Expand Down
143 changes: 143 additions & 0 deletions gfx/2d/JobScheduler_win32.cpp
@@ -0,0 +1,143 @@
/* -*- Mode: C++; tab-width: 20; indent-tabs-mode: nil; c-basic-offset: 2 -*-
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

#include "JobScheduler.h"
#include "mozilla/gfx/Logging.h"

using namespace std;

namespace mozilla {
namespace gfx {

DWORD __stdcall ThreadCallback(void* threadData);

class WorkerThreadWin32 : public WorkerThread {
public:
explicit WorkerThreadWin32(MultiThreadedJobQueue* aJobQueue)
: WorkerThread(aJobQueue)
{
mThread = ::CreateThread(nullptr, 0, ThreadCallback, static_cast<WorkerThread*>(this), 0, nullptr);
}

~WorkerThreadWin32()
{
::WaitForSingleObject(mThread, INFINITE);
::CloseHandle(mThread);
}

protected:
HANDLE mThread;
};

DWORD __stdcall ThreadCallback(void* threadData)
{
WorkerThread* thread = static_cast<WorkerThread*>(threadData);
thread->Run();
return 0;
}

WorkerThread*
WorkerThread::Create(MultiThreadedJobQueue* aJobQueue)
{
return new WorkerThreadWin32(aJobQueue);
}

bool
MultiThreadedJobQueue::PopJob(Job*& aOutJob, AccessType aAccess)
{
for (;;) {
while (aAccess == BLOCKING && mJobs.empty()) {
{
MutexAutoLock lock(&mMutex);
if (mShuttingDown) {
return false;
}
}

HANDLE handles[] = { mAvailableEvent, mShutdownEvent };
::WaitForMultipleObjects(2, handles, FALSE, INFINITE);
}

MutexAutoLock lock(&mMutex);

if (mShuttingDown) {
return false;
}

if (mJobs.empty()) {
if (aAccess == NON_BLOCKING) {
return false;
}
continue;
}

Job* task = mJobs.front();
MOZ_ASSERT(task);

mJobs.pop_front();

if (mJobs.empty()) {
::ResetEvent(mAvailableEvent);
}

aOutJob = task;
return true;
}
}

void
MultiThreadedJobQueue::SubmitJob(Job* aJob)
{
MOZ_ASSERT(aJob);
MutexAutoLock lock(&mMutex);
mJobs.push_back(aJob);
::SetEvent(mAvailableEvent);
}

void
MultiThreadedJobQueue::ShutDown()
{
{
MutexAutoLock lock(&mMutex);
mShuttingDown = true;
}
while (mThreadsCount) {
::SetEvent(mAvailableEvent);
::WaitForSingleObject(mShutdownEvent, INFINITE);
}
}

size_t
MultiThreadedJobQueue::NumJobs()
{
MutexAutoLock lock(&mMutex);
return mJobs.size();
}

bool
MultiThreadedJobQueue::IsEmpty()
{
MutexAutoLock lock(&mMutex);
return mJobs.empty();
}

void
MultiThreadedJobQueue::RegisterThread()
{
mThreadsCount += 1;
}

void
MultiThreadedJobQueue::UnregisterThread()
{
MutexAutoLock lock(&mMutex);
mThreadsCount -= 1;
if (mThreadsCount == 0) {
::SetEvent(mShutdownEvent);
}
}

} // namespace
} // namespace

0 comments on commit 120cce1

Please sign in to comment.