Skip to content

Commit

Permalink
Self-defined parallel for is added.
Browse files Browse the repository at this point in the history
  • Loading branch information
tatsy committed Mar 15, 2016
1 parent e6334d9 commit 64ec47f
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 2 deletions.
2 changes: 2 additions & 0 deletions sources/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ configure_file(${CMAKE_CURRENT_LIST_DIR}/spica_dirs.h.in
@ONLY)

set(SOURCES ${SOURCES}
${CMAKE_CURRENT_LIST_DIR}/parallel.cc
${CMAKE_CURRENT_LIST_DIR}/engine.cc
${CMAKE_CURRENT_LIST_DIR}/memory.cc
${CMAKE_CURRENT_LIST_DIR}/spectrum.cc
Expand All @@ -16,6 +17,7 @@ configure_file(${CMAKE_CURRENT_LIST_DIR}/spica_dirs.h.in

set(HEADERS ${HEADERS}
${CMAKE_CURRENT_LIST_DIR}/common.h
${CMAKE_CURRENT_LIST_DIR}/parallel.h
${CMAKE_CURRENT_LIST_DIR}/engine.h
${CMAKE_CURRENT_LIST_DIR}/memory.h
${CMAKE_CURRENT_LIST_DIR}/forward_decl.h
Expand Down
95 changes: 95 additions & 0 deletions sources/core/parallel.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#include "parallel.h"

#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>

class WorkerTask;

static std::vector<int> threadTasks;
static std::mutex workerListMutex;
static std::condition_variable condval;
static WorkerTask* workerTask;

class WorkerTask {
public:
WorkerTask(const std::function<void(int)>& f, int csize, int tasks)
: func{ f }
, chunkSize{ csize }
, nTasks { tasks } {
}

bool finished() const {
return currentIndex >= nTasks;
}

const std::function<void(int)>& func;
const int chunkSize;
const int nTasks;
int currentIndex = 0;
};

class WorkerTaskManager {
public:
WorkerTaskManager(const std::function<void(int)>& f,
int chunkSize, int nTasks) {
if (workerTask == nullptr) {
workerTask = new WorkerTask(f, chunkSize, nTasks);
}
}

~WorkerTaskManager() {
delete workerTask;
workerTask = nullptr;
}
};

static void workerThreadFunc(int threadIndex) {
std::unique_lock<std::mutex> lock(workerListMutex);
if (!workerTask->finished()) {
if (threadTasks.empty()) {
condval.wait(lock);
} else {
int indexStart = workerTask->currentIndex;
int indexEnd = std::min(workerTask->nTasks, indexStart + workerTask->chunkSize);
workerTask->currentIndex = indexEnd;
lock.unlock();

for (int i = indexStart; i < indexEnd; i++) {
workerTask->func(threadTasks[i]);
}
lock.lock();
}
}
}

void parallel_for(int start, int end, const std::function<void(int)>& func,
ParallelSchedule schedule) {
const int nTasks = (end - start);
const int nThreads = numAvailableThreads();
const int chunkSize = (nTasks + nThreads - 1) / nThreads;
WorkerTaskManager manager(func, chunkSize, nTasks);

std::vector<std::thread> threads;
for (int i = 0; i < nThreads; i++) {
threads.emplace_back(workerThreadFunc, i + 1);
}

workerListMutex.lock();
threadTasks.resize(nTasks);
for (int i = 0; i < nTasks; i++) {
threadTasks[i] = start + i;
}
workerListMutex.unlock();
condval.notify_all();

for (auto& t : threads) {
t.join();
}
}

int numAvailableThreads() {
return std::max(1u, std::thread::hardware_concurrency());
}
20 changes: 20 additions & 0 deletions sources/core/parallel.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#ifdef _MSC_VER
#pragma once
#endif

#ifndef _SPICA_PARALLEL_H_
#define _SPICA_PARALLEL_H_

#include <functional>

enum class ParallelSchedule {
Static = 0x01,
Dynamic = 0x02
};

void parallel_for(int start, int end, const std::function<void(int)>& func,
ParallelSchedule schedule = ParallelSchedule::Dynamic);

int numAvailableThreads();

#endif // _SPICA_PARALLEL_H_
6 changes: 4 additions & 2 deletions sources/integrator/integrator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "integrator.h"

#include "../core/memory.h"
#include "../core/parallel.h"
#include "../core/interaction.h"

#include "../bxdf/bxdf.h"
Expand Down Expand Up @@ -72,7 +73,8 @@ void SamplerIntegrator::render(const Scene& scene,

auto arenas = std::make_unique<MemoryArena[]>(kNumThreads);
for (int t = 0; t < taskPerThread; t++) {
ompfor (int threadID = 0; threadID < kNumThreads; threadID++) {
// ompfor (int threadID = 0; threadID < kNumThreads; threadID++) {
parallel_for(0, kNumThreads, [&](int threadID) {
samplers[threadID]->startNextSample();
if (t < tasks[threadID].size()) {
const int y = tasks[threadID][t];
Expand All @@ -88,7 +90,7 @@ void SamplerIntegrator::render(const Scene& scene,
}
}
arenas[threadID].reset();
}
});
}
arenas.reset(nullptr);
camera_->film()->save(i + 1);
Expand Down

0 comments on commit 64ec47f

Please sign in to comment.