Skip to content

Commit

Permalink
Only run threading if pthread is available
Browse files Browse the repository at this point in the history
make sure that if pthread is being used for parallel
computing, only try to run it when pthread is actually
available.
Mark all these implementations to be fixed, i.e. ported
to use OpenMP
  • Loading branch information
vigsterkr committed Jan 25, 2017
1 parent 91c79df commit 39d0271
Show file tree
Hide file tree
Showing 15 changed files with 134 additions and 72 deletions.
4 changes: 0 additions & 4 deletions src/shogun/classifier/svm/SVM.cpp
Expand Up @@ -19,10 +19,6 @@

#include <string.h>

#ifdef HAVE_PTHREAD
#include <pthread.h>
#endif

using namespace shogun;

CSVM::CSVM(int32_t num_sv)
Expand Down
69 changes: 46 additions & 23 deletions src/shogun/classifier/svm/SVMLight.cpp
Expand Up @@ -1049,7 +1049,13 @@ void CSVMLight::compute_matrices_for_optimization_parallel(
float64_t *a, float64_t *lin, float64_t *c, int32_t varnum, int32_t totdoc,
float64_t *aicache, QP *qp)
{
if (parallel->get_num_threads()<=1)
// TODO: port to use OpenMP backend instead of pthread
#ifdef HAVE_PTHREAD
int32_t num_threads=parallel->get_num_threads();
#else
int32_t num_threads=1;
#endif
if (num_threads < 2)
{
compute_matrices_for_optimization(docs, label, exclude_from_eq_const, eq_target,
chosen, active2dnum, key, a, lin, c,
Expand Down Expand Up @@ -1079,7 +1085,7 @@ void CSVMLight::compute_matrices_for_optimization_parallel(
qp->opt_g0[i]=lin[key[i]];
}

ASSERT(parallel->get_num_threads()>1)
ASSERT(num_threads>1)
int32_t *KI=SG_MALLOC(int32_t, varnum*varnum);
int32_t *KJ=SG_MALLOC(int32_t, varnum*varnum);
int32_t Knum=0 ;
Expand All @@ -1099,11 +1105,11 @@ void CSVMLight::compute_matrices_for_optimization_parallel(
}
ASSERT(Knum<=varnum*(varnum+1)/2)

pthread_t* threads = SG_MALLOC(pthread_t, parallel->get_num_threads()-1);
S_THREAD_PARAM_KERNEL* params = SG_MALLOC(S_THREAD_PARAM_KERNEL, parallel->get_num_threads()-1);
int32_t step= Knum/parallel->get_num_threads();
pthread_t* threads = SG_MALLOC(pthread_t, num_threads-1);
S_THREAD_PARAM_KERNEL* params = SG_MALLOC(S_THREAD_PARAM_KERNEL, num_threads-1);
int32_t step= Knum/num_threads;
//SG_DEBUG("\nkernel-step size: %i\n", step)
for (int32_t t=0; t<parallel->get_num_threads()-1; t++)
for (int32_t t=0; t<num_threads-1; t++)
{
params[t].svmlight = this;
params[t].start = t*step;
Expand All @@ -1113,10 +1119,10 @@ void CSVMLight::compute_matrices_for_optimization_parallel(
params[t].Kval=Kval ;
pthread_create(&threads[t], NULL, CSVMLight::compute_kernel_helper, (void*)&params[t]);
}
for (i=params[parallel->get_num_threads()-2].end; i<Knum; i++)
for (i=params[num_threads-2].end; i<Knum; i++)
Kval[i]=compute_kernel(KI[i],KJ[i]) ;

for (int32_t t=0; t<parallel->get_num_threads()-1; t++)
for (int32_t t=0; t<num_threads-1; t++)
pthread_join(threads[t], NULL);

SG_FREE(params);
Expand Down Expand Up @@ -1458,7 +1464,13 @@ void CSVMLight::update_linear_component(

if (num_working>0)
{
if (parallel->get_num_threads() < 2)
// TODO: port to use OpenMP backend instead of pthread
#ifdef HAVE_PTHREAD
int32_t num_threads=parallel->get_num_threads();
#else
int32_t num_threads=1;
#endif
if (num_threads < 2)
{
for (jj=0;(j=active2dnum[jj])>=0;jj++) {
lin[j]+=kernel->compute_optimized(docs[j]);
Expand All @@ -1470,13 +1482,13 @@ void CSVMLight::update_linear_component(
int32_t num_elem = 0 ;
for (jj=0;(j=active2dnum[jj])>=0;jj++) num_elem++ ;

pthread_t* threads = SG_MALLOC(pthread_t, parallel->get_num_threads()-1);
S_THREAD_PARAM_SVMLIGHT* params = SG_MALLOC(S_THREAD_PARAM_SVMLIGHT, parallel->get_num_threads()-1);
pthread_t* threads = SG_MALLOC(pthread_t, num_threads-1);
S_THREAD_PARAM_SVMLIGHT* params = SG_MALLOC(S_THREAD_PARAM_SVMLIGHT, num_threads-1);
int32_t start = 0 ;
int32_t step = num_elem/parallel->get_num_threads();
int32_t step = num_elem/num_threads;
int32_t end = step ;

for (int32_t t=0; t<parallel->get_num_threads()-1; t++)
for (int32_t t=0; t<num_threads-1; t++)
{
params[t].kernel = kernel ;
params[t].lin = lin ;
Expand All @@ -1489,11 +1501,11 @@ void CSVMLight::update_linear_component(
pthread_create(&threads[t], NULL, update_linear_component_linadd_helper, (void*)&params[t]) ;
}

for (jj=params[parallel->get_num_threads()-2].end;(j=active2dnum[jj])>=0;jj++) {
for (jj=params[num_threads-2].end;(j=active2dnum[jj])>=0;jj++) {
lin[j]+=kernel->compute_optimized(docs[j]);
}
void* ret;
for (int32_t t=0; t<parallel->get_num_threads()-1; t++)
for (int32_t t=0; t<num_threads-1; t++)
pthread_join(threads[t], &ret) ;

SG_FREE(params);
Expand Down Expand Up @@ -1631,8 +1643,14 @@ void CSVMLight::update_linear_component_mkl_linadd(
kernel->add_to_normal(docs[i], (a[i]-a_old[i])*(float64_t)label[i]);
}
}
// TODO: port to use OpenMP backend instead of pthread
#ifdef HAVE_PTHREAD
int32_t num_threads=parallel->get_num_threads();
#else
int32_t num_threads=1;
#endif

if (parallel->get_num_threads() < 2)
if (num_threads < 2)
{
// determine contributions of different kernels
for (int32_t i=0; i<num; i++)
Expand All @@ -1641,11 +1659,11 @@ void CSVMLight::update_linear_component_mkl_linadd(
#ifdef HAVE_PTHREAD
else
{
pthread_t* threads = SG_MALLOC(pthread_t, parallel->get_num_threads()-1);
S_THREAD_PARAM_SVMLIGHT* params = SG_MALLOC(S_THREAD_PARAM_SVMLIGHT, parallel->get_num_threads()-1);
int32_t step= num/parallel->get_num_threads();
pthread_t* threads = SG_MALLOC(pthread_t, num_threads-1);
S_THREAD_PARAM_SVMLIGHT* params = SG_MALLOC(S_THREAD_PARAM_SVMLIGHT, num_threads-1);
int32_t step= num/num_threads;

for (int32_t t=0; t<parallel->get_num_threads()-1; t++)
for (int32_t t=0; t<num_threads-1; t++)
{
params[t].kernel = kernel;
params[t].W = W;
Expand All @@ -1654,10 +1672,10 @@ void CSVMLight::update_linear_component_mkl_linadd(
pthread_create(&threads[t], NULL, CSVMLight::update_linear_component_mkl_linadd_helper, (void*)&params[t]);
}

for (int32_t i=params[parallel->get_num_threads()-2].end; i<num; i++)
for (int32_t i=params[num_threads-2].end; i<num; i++)
kernel->compute_by_subkernel(i,&W[i*num_kernels]);

for (int32_t t=0; t<parallel->get_num_threads()-1; t++)
for (int32_t t=0; t<num_threads-1; t++)
pthread_join(threads[t], NULL);

SG_FREE(params);
Expand Down Expand Up @@ -2128,7 +2146,12 @@ void CSVMLight::reactivate_inactive_examples(

if (num_modified>0)
{
int32_t num_threads=parallel->get_num_threads();
// TODO: port to use OpenMP backend instead of pthread
#ifdef HAVE_PTHREAD
int32_t num_threads=parallel->get_num_threads();
#else
int32_t num_threads=1;
#endif
ASSERT(num_threads>0)
if (num_threads < 2)
{
Expand Down
4 changes: 0 additions & 4 deletions src/shogun/clustering/Hierarchical.cpp
Expand Up @@ -15,10 +15,6 @@
#include <shogun/mathematics/Math.h>
#include <shogun/base/Parallel.h>

#ifdef HAVE_PTHREAD
#include <pthread.h>
#endif

using namespace shogun;

#ifndef DOXYGEN_SHOULD_SKIP_THIS
Expand Down
4 changes: 0 additions & 4 deletions src/shogun/converter/KernelLocallyLinearEmbedding.cpp
Expand Up @@ -12,10 +12,6 @@
#include <shogun/io/SGIO.h>
#include <shogun/lib/tapkee/tapkee_shogun.hpp>

#ifdef HAVE_PTHREAD
#include <pthread.h>
#endif

using namespace shogun;

CKernelLocallyLinearEmbedding::CKernelLocallyLinearEmbedding() :
Expand Down
1 change: 1 addition & 0 deletions src/shogun/distance/Distance.cpp
Expand Up @@ -349,6 +349,7 @@ SGMatrix<T> CDistance::get_distance_matrix()

result=SG_MALLOC(T, total_num);

// TODO: port this to use OpenMP
#ifdef HAVE_PTHREAD
int32_t num_threads=parallel->get_num_threads();
#else
Expand Down
18 changes: 12 additions & 6 deletions src/shogun/features/DotFeatures.cpp
Expand Up @@ -77,15 +77,18 @@ void CDotFeatures::dense_dot_range(float64_t* output, int32_t start, int32_t sto
int32_t num_vectors=stop-start;
ASSERT(num_vectors>0)

// TODO: port to use OpenMP backend instead of pthread
#ifdef HAVE_PTHREAD
int32_t num_threads=parallel->get_num_threads();
#else
int32_t num_threads=1;
#endif
ASSERT(num_threads>0)

CSignal::clear_cancel();

#ifdef HAVE_PTHREAD
if (num_threads < 2)
{
#endif
DF_THREAD_PARAM params;
params.df=this;
params.sub_index=NULL;
Expand All @@ -98,8 +101,8 @@ void CDotFeatures::dense_dot_range(float64_t* output, int32_t start, int32_t sto
params.bias=b;
params.progress=false; //true;
dense_dot_range_helper((void*) &params);
#ifdef HAVE_PTHREAD
}
#ifdef HAVE_PTHREAD
else
{
pthread_t* threads = SG_MALLOC(pthread_t, num_threads-1);
Expand Down Expand Up @@ -155,15 +158,18 @@ void CDotFeatures::dense_dot_range_subset(int32_t* sub_index, int32_t num, float
ASSERT(sub_index)
ASSERT(output)

// TODO: port to use OpenMP backend instead of pthread
#ifdef HAVE_PTHREAD
int32_t num_threads=parallel->get_num_threads();
#else
int32_t num_threads=1;
#endif
ASSERT(num_threads>0)

CSignal::clear_cancel();

#ifdef HAVE_PTHREAD
if (num_threads < 2)
{
#endif
DF_THREAD_PARAM params;
params.df=this;
params.sub_index=sub_index;
Expand All @@ -176,8 +182,8 @@ void CDotFeatures::dense_dot_range_subset(int32_t* sub_index, int32_t num, float
params.bias=b;
params.progress=false; //true;
dense_dot_range_helper((void*) &params);
#ifdef HAVE_PTHREAD
}
#ifdef HAVE_PTHREAD
else
{
pthread_t* threads = SG_MALLOC(pthread_t, num_threads-1);
Expand Down
18 changes: 12 additions & 6 deletions src/shogun/features/hashed/HashedWDFeaturesTransposed.cpp
Expand Up @@ -211,18 +211,21 @@ void CHashedWDFeaturesTransposed::dense_dot_range(float64_t* output, int32_t sta
int32_t num_vectors=stop-start;
ASSERT(num_vectors>0)

// TODO: port to use OpenMP backend instead of pthread
#ifdef HAVE_PTHREAD
int32_t num_threads=parallel->get_num_threads();
#else
int32_t num_threads=1;
#endif
ASSERT(num_threads>0)

CSignal::clear_cancel();

if (dim != w_dim)
SG_ERROR("Dimensions don't match, vec_len=%d, w_dim=%d\n", dim, w_dim)

#ifdef HAVE_PTHREAD
if (num_threads < 2)
{
#endif
HASHEDWD_THREAD_PARAM params;
params.hf=this;
params.sub_index=NULL;
Expand All @@ -235,8 +238,8 @@ void CHashedWDFeaturesTransposed::dense_dot_range(float64_t* output, int32_t sta
params.progress=false; //true;
params.index=index;
dense_dot_range_helper((void*) &params);
#ifdef HAVE_PTHREAD
}
#ifdef HAVE_PTHREAD
else
{
pthread_t* threads = SG_MALLOC(pthread_t, num_threads-1);
Expand Down Expand Up @@ -295,18 +298,21 @@ void CHashedWDFeaturesTransposed::dense_dot_range_subset(int32_t* sub_index, int

uint32_t* index=SG_MALLOC(uint32_t, num);

// TODO: port to use OpenMP backend instead of pthread
#ifdef HAVE_PTHREAD
int32_t num_threads=parallel->get_num_threads();
#else
int32_t num_threads=1;
#endif
ASSERT(num_threads>0)

CSignal::clear_cancel();

if (dim != w_dim)
SG_ERROR("Dimensions don't match, vec_len=%d, w_dim=%d\n", dim, w_dim)

#ifdef HAVE_PTHREAD
if (num_threads < 2)
{
#endif
HASHEDWD_THREAD_PARAM params;
params.hf=this;
params.sub_index=sub_index;
Expand All @@ -319,8 +325,8 @@ void CHashedWDFeaturesTransposed::dense_dot_range_subset(int32_t* sub_index, int
params.progress=false; //true;
params.index=index;
dense_dot_range_helper((void*) &params);
#ifdef HAVE_PTHREAD
}
#ifdef HAVE_PTHREAD
else
{
pthread_t* threads = SG_MALLOC(pthread_t, num_threads-1);
Expand Down
10 changes: 10 additions & 0 deletions src/shogun/kernel/CombinedKernel.cpp
Expand Up @@ -446,7 +446,12 @@ void CCombinedKernel::emulate_compute_batch(
{
k->init_optimization(num_suppvec, IDX, weights);

// TODO: port to use OpenMP backend instead of pthread
#ifdef HAVE_PTHREAD
int32_t num_threads=parallel->get_num_threads();
#else
int32_t num_threads=1;
#endif
ASSERT(num_threads>0)

if (num_threads < 2)
Expand Down Expand Up @@ -503,7 +508,12 @@ void CCombinedKernel::emulate_compute_batch(

if (k->get_combined_kernel_weight()!=0)
{ // compute the usual way for any non-optimized kernel
// TODO: port to use OpenMP backend instead of pthread
#ifdef HAVE_PTHREAD
int32_t num_threads=parallel->get_num_threads();
#else
int32_t num_threads=1;
#endif
ASSERT(num_threads>0)

if (num_threads < 2)
Expand Down
5 changes: 0 additions & 5 deletions src/shogun/kernel/string/SpectrumRBFKernel.cpp
Expand Up @@ -30,11 +30,6 @@

#include <assert.h>

#ifdef HAVE_PTHREAD
#include <pthread.h>
#endif


using namespace shogun;

CSpectrumRBFKernel::CSpectrumRBFKernel()
Expand Down
Expand Up @@ -1244,18 +1244,19 @@ void CWeightedDegreePositionStringKernel::compute_batch(

int32_t num_feat=((CStringFeatures<char>*) rhs)->get_max_vector_length();
ASSERT(num_feat>0)
// TODO: port to use OpenMP backend instead of pthread
#ifdef HAVE_PTHREAD
int32_t num_threads=parallel->get_num_threads();
#else
int32_t num_threads=1;
#endif
ASSERT(num_threads>0)
int32_t* vec=SG_MALLOC(int32_t, num_threads*num_feat);

if (num_threads < 2)
{
#ifdef WIN32
for (int32_t j=0; j<num_feat; j++)
#else
CSignal::clear_cancel();
for (int32_t j=0; j<num_feat && !CSignal::cancel_computations(); j++)
#endif
{
init_optimization(num_suppvec, IDX, alphas, j);
S_THREAD_PARAM_WDS<DNATrie> params;
Expand Down

0 comments on commit 39d0271

Please sign in to comment.