Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #3605, Port KernelMachine to OpenMP #3738

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
228 changes: 75 additions & 153 deletions src/shogun/machine/KernelMachine.cpp
Expand Up @@ -18,22 +18,11 @@
#include <shogun/kernel/CustomKernel.h>
#include <shogun/labels/Labels.h>

using namespace shogun;

#ifndef DOXYGEN_SHOULD_SKIP_THIS
struct S_THREAD_PARAM_KERNEL_MACHINE
{
CKernelMachine* kernel_machine;
float64_t* result;
int32_t start;
int32_t end;
#ifdef HAVE_OPENMP
#include <omp.h>
#endif

/* if non-null, start and end correspond to indices in this vector */
index_t* indices;
index_t indices_len;
bool verbose;
};
#endif // DOXYGEN_SHOULD_SKIP_THIS
using namespace shogun;

CKernelMachine::CKernelMachine() : CMachine()
{
Expand Down Expand Up @@ -334,63 +323,49 @@ SGVector<float64_t> CKernelMachine::apply_get_outputs(CFeatures* data)
else
{
// TODO: port to use OpenMP backend instead of pthread
#ifdef HAVE_PTHREAD
int32_t num_threads=parallel->get_num_threads();
int32_t num_threads;
int32_t step;
#pragma omp parallel shared(num_threads, step)
{
#ifdef HAVE_OPENMP
#pragma omp single
{
num_threads = omp_get_num_threads();
step = num_vectors/num_threads;
num_threads--;
}
int32_t thread_num = omp_get_thread_num();
#else
int32_t num_threads=1;
num_threads = 0;
step = num_vectors;
int32_t thread_num = 0;
#endif
ASSERT(num_threads>0)

if (num_threads < 2)
{
S_THREAD_PARAM_KERNEL_MACHINE params;
params.kernel_machine=this;
params.result = output.vector;
params.start=0;
params.end=num_vectors;
params.verbose=true;
params.indices = NULL;
params.indices_len = 0;
apply_helper((void*) &params);
}
#ifdef HAVE_PTHREAD
else
{
pthread_t* threads = SG_MALLOC(pthread_t, num_threads-1);
S_THREAD_PARAM_KERNEL_MACHINE* params = SG_MALLOC(S_THREAD_PARAM_KERNEL_MACHINE, num_threads);
int32_t step= num_vectors/num_threads;

int32_t t;
bool verbose=(thread_num == 0);
float64_t* result = output.vector;
index_t* indices = NULL;
index_t indices_len = 0;

int32_t t_start = thread_num*step;
int32_t t_stop = (thread_num == num_threads) ? num_vectors : (thread_num+1)*step;

for (t=0; t<num_threads-1; t++)
#ifdef WIN32
for (int32_t i = t_start; i<t_stop; i++)
#else
for (int32_t i = t_start; i<t_stop && !CSignal::cancel_computations(); i++)
#endif
{
params[t].kernel_machine = this;
params[t].result = output.vector;
params[t].start = t*step;
params[t].end = (t+1)*step;
params[t].verbose = false;
params[t].indices = NULL;
params[t].indices_len = 0;
pthread_create(&threads[t], NULL,
CKernelMachine::apply_helper, (void*)&params[t]);
if (verbose)
{
int32_t num_vectors = t_stop - t_start;
int32_t v = i-t_start;
if ( (v% (num_vectors/100+1)) == 0)
SG_SPROGRESS(v, 0.0, num_vectors-1)
}
/* eventually use index mapping if exists */
index_t idx = indices ? indices[i] : i;
result[i] = this->apply_one(idx);
}

params[t].kernel_machine = this;
params[t].result = output.vector;
params[t].start = t*step;
params[t].end = num_vectors;
params[t].verbose = true;
params[t].indices = NULL;
params[t].indices_len = 0;
apply_helper((void*) &params[t]);

for (t=0; t<num_threads-1; t++)
pthread_join(threads[t], NULL);

SG_FREE(params);
SG_FREE(threads);
}
#endif
}

#ifndef WIN32
Expand Down Expand Up @@ -426,35 +401,6 @@ float64_t CKernelMachine::apply_one(int32_t num)
}
}

void* CKernelMachine::apply_helper(void* p)
{
S_THREAD_PARAM_KERNEL_MACHINE* params = (S_THREAD_PARAM_KERNEL_MACHINE*) p;
float64_t* result = params->result;
CKernelMachine* kernel_machine = params->kernel_machine;

#ifdef WIN32
for (int32_t vec=params->start; vec<params->end; vec++)
#else
for (int32_t vec=params->start; vec<params->end &&
!CSignal::cancel_computations(); vec++)
#endif
{
if (params->verbose)
{
int32_t num_vectors=params->end - params->start;
int32_t v=vec-params->start;
if ( (v% (num_vectors/100+1))== 0)
SG_SPROGRESS(v, 0.0, num_vectors-1)
}

/* eventually use index mapping if exists */
index_t idx=params->indices ? params->indices[vec] : vec;
result[vec] = kernel_machine->apply_one(idx);
}

return NULL;
}

void CKernelMachine::store_model_features()
{
if (!kernel)
Expand Down Expand Up @@ -554,71 +500,47 @@ SGVector<float64_t> CKernelMachine::apply_locked_get_output(

/* custom kernel never has batch evaluation property so dont do this here */
// TODO: port to use OpenMP backend instead of pthread
#ifdef HAVE_PTHREAD
int32_t num_threads=parallel->get_num_threads();
int32_t num_threads;
int32_t step;
#pragma omp parallel shared(num_threads, step)
{
#ifdef HAVE_OPENMP
#pragma omp single
{
num_threads = omp_get_num_threads();
step = num_inds/num_threads;
num_threads--;
}
int32_t thread_num = omp_get_thread_num();
#else
int32_t num_threads=1;
num_threads = 0;
step = num_inds;
int32_t thread_num = 0;
#endif
ASSERT(num_threads>0)
bool verbose=(thread_num == 0);
float64_t* result = output.vector;

if (num_threads<2)
{
S_THREAD_PARAM_KERNEL_MACHINE params;
params.kernel_machine=this;
params.result=output.vector;

/* use the parameter index vector */
params.start=0;
params.end=num_inds;
params.indices=indices.vector;
params.indices_len=indices.vlen;

params.verbose=true;
apply_helper((void*) &params);
}
#ifdef HAVE_PTHREAD
else
{
pthread_t* threads = SG_MALLOC(pthread_t, num_threads-1);
S_THREAD_PARAM_KERNEL_MACHINE* params=SG_MALLOC(S_THREAD_PARAM_KERNEL_MACHINE, num_threads);
int32_t step= num_inds/num_threads;
int32_t t_start = thread_num*step;
int32_t t_stop = (thread_num == num_threads) ? num_inds : (thread_num+1)*step;

int32_t t;
for (t=0; t<num_threads-1; t++)
#ifdef WIN32
for (int32_t i = t_start; i<t_stop; i++)
#else
for (int32_t i = t_start; i<t_stop && !CSignal::cancel_computations(); i++)
#endif
{
params[t].kernel_machine=this;
params[t].result=output.vector;

/* use the parameter index vector */
params[t].start=t*step;
params[t].end=(t+1)*step;
params[t].indices=indices.vector;
params[t].indices_len=indices.vlen;

params[t].verbose=false;
pthread_create(&threads[t], NULL, CKernelMachine::apply_helper,
(void*)&params[t]);
if (verbose)
{
int32_t num_vectors = t_stop - t_start;
int32_t v = i-t_start;
if ( (v% (num_vectors/100+1)) == 0)
SG_SPROGRESS(v, 0.0, num_vectors-1)
}
/* eventually use index mapping if exists */
index_t idx = indices ? indices[i] : i;
result[i] = this->apply_one(idx);
}

params[t].kernel_machine=this;
params[t].result=output.vector;

/* use the parameter index vector */
params[t].start=t*step;
params[t].end=num_inds;
params[t].indices=indices.vector;
params[t].indices_len=indices.vlen;

params[t].verbose=true;
apply_helper((void*) &params[t]);

for (t=0; t<num_threads-1; t++)
pthread_join(threads[t], NULL);

SG_FREE(params);
SG_FREE(threads);
}
#endif

#ifndef WIN32
if ( CSignal::cancel_computations() )
Expand Down
7 changes: 0 additions & 7 deletions src/shogun/machine/KernelMachine.h
Expand Up @@ -228,13 +228,6 @@ class CKernelMachine : public CMachine
*/
virtual float64_t apply_one(int32_t num);

/** apply example helper, used in threads
*
* @param p params of the thread
* @return nothing really
*/
static void* apply_helper(void* p);

#ifndef SWIG // SWIG should skip this part
/** Trains a locked machine on a set of indices. Error if machine is
* not locked
Expand Down