Skip to content

Commit

Permalink
Added task-parallel computation support
Browse files Browse the repository at this point in the history
  - Speeded up the permutation test
  - Overall speed-up due to
    (a) cache-friendly computation and
    (b) parallel task computation
    is about ~2.5X
  - In execution
    [before] https://gist.github.com/lambday/7d73c8863d7acafb8abe6d751a5332af
    [after]  https://gist.github.com/lambday/103c1cc92123a838672bcd92fab17301
  • Loading branch information
lambday committed Jun 28, 2016
1 parent 84305f0 commit 575c377
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 45 deletions.
10 changes: 5 additions & 5 deletions src/shogun/statistical_testing/MMD.cpp
Expand Up @@ -169,11 +169,11 @@ void CMMD::Self::compute_jobs(ComputationManager& cm) const
{
if (use_gpu)
{
cm.use_gpu().compute();
cm.use_gpu().compute_data_parallel_jobs();
}
else
{
cm.use_cpu().compute();
cm.use_cpu().compute_data_parallel_jobs();
}
}

Expand Down Expand Up @@ -207,8 +207,8 @@ std::pair<float64_t, float64_t> CMMD::Self::compute_statistic_variance()
compute_kernel(cm, blocks, kernel);
compute_jobs(cm);

auto mmds = cm.next_result();
auto vars = cm.next_result();
auto mmds = cm.result(0);
auto vars = cm.result(1);

for (size_t i = 0; i < mmds.size(); ++i)
{
Expand Down Expand Up @@ -284,7 +284,7 @@ SGVector<float64_t> CMMD::Self::sample_null()
for (auto j = 0; j < num_null_samples; ++j)
{
compute_jobs(cm);
auto mmds = cm.next_result();
auto mmds = cm.result(0);
for (size_t i = 0; i < mmds.size(); ++i)
{
auto delta = mmds[i] - statistic[j];
Expand Down
17 changes: 10 additions & 7 deletions src/shogun/statistical_testing/QuadraticTimeMMD.cpp
Expand Up @@ -127,9 +127,9 @@ void CQuadraticTimeMMD::Self::compute_jobs(ComputationManager& cm) const
{
SG_SDEBUG("Entering\n");
if (owner.use_gpu())
cm.use_gpu().compute();
cm.use_gpu().compute_task_parallel_jobs();
else
cm.use_cpu().compute();
cm.use_cpu().compute_task_parallel_jobs();
SG_SDEBUG("Leaving\n");
}

Expand Down Expand Up @@ -188,8 +188,8 @@ std::pair<float64_t, float64_t> CQuadraticTimeMMD::Self::compute_statistic_varia
cm.data(0)=kernel_matrix;

compute_jobs(cm);
auto mmd=cm.next_result();
auto var=cm.next_result();
auto mmd=cm.result(0);
auto var=cm.result(1);
float64_t statistic=mmd[0];
float64_t variance=var[0];
cm.done();
Expand All @@ -211,14 +211,17 @@ SGVector<float64_t> CQuadraticTimeMMD::Self::sample_null()

ComputationManager cm;
create_computation_jobs();
cm.enqueue_job(permutation_job);
cm.num_data(1);
cm.data(0)=kernel_matrix;

for (auto i=0; i<null_samples.vlen; ++i)
cm.enqueue_job(permutation_job);

compute_jobs(cm);

for (auto i=0; i<null_samples.vlen; ++i)
{
compute_jobs(cm);
auto mmd=cm.next_result();
auto mmd=cm.result(i);
float64_t statistic=mmd[0];
null_samples[i]=owner.normalize_statistic(statistic);
}
Expand Down
64 changes: 39 additions & 25 deletions src/shogun/statistical_testing/internals/ComputationManager.cpp
Expand Up @@ -33,66 +33,80 @@ ComputationManager::~ComputationManager()

void ComputationManager::num_data(index_t n)
{
kernel_matrices.resize(n);
data_array.resize(n);
}

SGMatrix<float64_t>& ComputationManager::data(index_t i)
{
return kernel_matrices[i];
return data_array[i];
}

void ComputationManager::enqueue_job(std::function<float64_t(SGMatrix<float64_t>)> job)
{
jobq.push_back(job);
job_array.push_back(job);
}

void ComputationManager::compute()
void ComputationManager::compute_data_parallel_jobs()
{
for(auto job = jobq.begin(); job != jobq.end(); ++job)
result_array.resize(job_array.size());
for(size_t j=0; j<job_array.size(); ++j)
{
const auto& operation = *job;
std::vector<float64_t> results;
const auto& compute_job=job_array[j];
std::vector<float64_t> current_job_results(data_array.size());
if (gpu)
{
// TODO results = operation.compute_using_gpu(kernel_matrices);
// TODO current_job_results = compute_job.compute_using_gpu(data_array);
}
else
{
results.resize(kernel_matrices.size());
#pragma omp parallel for
for (size_t i = 0; i < kernel_matrices.size(); ++i)
{
results[i] = operation(kernel_matrices[i]);
}
for (size_t i=0; i<data_array.size(); ++i)
current_job_results[i]=compute_job(data_array[i]);
}
resultq.push(results);
result_array[j]=current_job_results;
}
}

void ComputationManager::compute_task_parallel_jobs()
{
result_array.resize(job_array.size());
#pragma omp parallel for
for(size_t j=0; j<job_array.size(); ++j)
{
const auto& compute_job=job_array[j];
std::vector<float64_t> current_job_results(data_array.size());
if (gpu)
{
// TODO current_job_results = compute_job.compute_using_gpu(data_array);
}
else
{
for (size_t i=0; i<data_array.size(); ++i)
current_job_results[i]=compute_job(data_array[i]);
}
result_array[j]=current_job_results;
}
}

void ComputationManager::done()
{
jobq.resize(0);
job_array.resize(0);
result_array.resize(0);
}

std::vector<float64_t> ComputationManager::next_result()
std::vector<float64_t>& ComputationManager::result(index_t i)
{
std::vector<float64_t> result;
if (!resultq.empty())
{
result = resultq.front();
resultq.pop();
}
return result;
return result_array[i];
}

ComputationManager& ComputationManager::use_gpu()
{
gpu = true;
gpu=true;
return *this;
}

ComputationManager& ComputationManager::use_cpu()
{
gpu = false;
gpu=false;
return *this;
}
14 changes: 6 additions & 8 deletions src/shogun/statistical_testing/internals/ComputationManager.h
Expand Up @@ -20,7 +20,6 @@
#define COMPUTATION_MANAGER_H__

#include <vector>
#include <queue>
#include <functional>
#include <shogun/lib/common.h>

Expand All @@ -42,19 +41,18 @@ class ComputationManager
SGMatrix<float64_t>& data(index_t i);

void enqueue_job(std::function<float64_t(SGMatrix<float64_t>)> job);
void compute_data_parallel_jobs();
void compute_task_parallel_jobs();
void done();

void compute();

std::vector<float64_t> next_result();
std::vector<float64_t>& result(index_t i);

ComputationManager& use_cpu();
ComputationManager& use_gpu();
private:
bool gpu;
std::vector<SGMatrix<float64_t>> kernel_matrices;
std::vector<std::function<float64_t(SGMatrix<float64_t>)>> jobq;
std::queue<std::vector<float64_t>> resultq;
std::vector<SGMatrix<float64_t>> data_array;
std::vector<std::function<float64_t(SGMatrix<float64_t>)>> job_array;
std::vector<std::vector<float64_t>> result_array;
};

} // namespace internal
Expand Down

0 comments on commit 575c377

Please sign in to comment.