Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LSHSearch Parallelization #700

Merged
merged 26 commits into from Jul 8, 2016
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
72999dd
Implements parallel query processing for LSH
mentekid Jun 19, 2016
95d417f
Implements parallel query processing for LSH
mentekid Jun 19, 2016
afcc881
Book-keeping of number of threads used
mentekid Jun 19, 2016
5db5423
Adds parallelization to bichromatic search
mentekid Jun 19, 2016
abef504
Adds Bichromatic Parallel Test
mentekid Jun 19, 2016
4cbd43e
Merge branch 'master' into lsh-parallelization
mentekid Jun 24, 2016
a60ff91
Adds parallelism option to CMakeFiles, removes most omp.h dependence
mentekid Jun 27, 2016
6152527
Removes numThreadsUsed variable, changes how maxThreads is initialized
mentekid Jun 27, 2016
7cf77cd
Changes placeholder code for OpenMP
mentekid Jun 27, 2016
2ca48c6
Fixes CMakeFiles and code to make openMP actually transparent
mentekid Jun 27, 2016
a6aca41
CMake trickery to maybe make travis not crash
mentekid Jun 27, 2016
3d536c7
Simplifies OpenMP transparency code
mentekid Jun 27, 2016
c04b073
Adds -Wno-unknown-pragmas
mentekid Jun 27, 2016
65983d1
Commit to switch branch
mentekid Jun 28, 2016
b95a3ce
Changes BaseCase to include loop over candidate set. Changes loop var…
mentekid Jul 1, 2016
0d38271
Removes commented-out old BaseCase code
mentekid Jul 1, 2016
3af80c3
Merges multiprobe LSH
mentekid Jul 1, 2016
b02e2f3
Changes BaseCase to include the loop over candidate set. Places paral…
mentekid Jul 6, 2016
a1e9c28
Merges 3fe0b72
mentekid Jul 6, 2016
ad8e6d3
Modifies CMakeLists to remove -DHAS_OPENMP
mentekid Jul 7, 2016
c4c8ff9
Removes old code and comments from CMakeLists.txt
mentekid Jul 7, 2016
074d726
Restores size_t for openMP loop counters, changes CmakeLists to requi…
mentekid Jul 8, 2016
f982ca5
Removes maxThreads functionality from LSHSearch class
mentekid Jul 8, 2016
1fb998f
Removes HAS_OPENMP definition in CMakeFiles
mentekid Jul 8, 2016
b92d465
Workaround for OpenMP 2.0 (based on dt_utils.cpp)
mentekid Jul 8, 2016
2fee61e
Transforms omp loop to reduction
mentekid Jul 8, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 9 additions & 3 deletions CMakeLists.txt
Expand Up @@ -18,15 +18,16 @@ endif()

# First, define all the compilation options.
# We default to debugging mode for developers.
option(DEBUG "Compile with debugging information" ON)
option(PROFILE "Compile with profiling information" ON)
option(DEBUG "Compile with debugging information." ON)
option(PROFILE "Compile with profiling information." ON)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, this reverts 3fe0b72.

option(ARMA_EXTRA_DEBUG "Compile with extra Armadillo debugging symbols." OFF)
option(MATLAB_BINDINGS "Compile MATLAB bindings if MATLAB is found." OFF)
option(TEST_VERBOSE "Run test cases with verbose output." OFF)
option(BUILD_TESTS "Build tests." ON)
option(BUILD_CLI_EXECUTABLES "Build command-line executables" ON)
option(BUILD_SHARED_LIBS
"Compile shared libraries (if OFF, static libraries are compiled)" ON)
"Compile shared libraries (if OFF, static libraries are compiled)." ON)
option(HAS_OPENMP "Use OpenMP for parallel execution." OFF)

enable_testing()

Expand Down Expand Up @@ -117,6 +118,11 @@ if(ARMA_EXTRA_DEBUG)
add_definitions(-DARMA_EXTRA_DEBUG)
endif()

# If the user has an OpenMP-enabled compiler, turn OpenMP on
if (HAS_OPENMP)
add_definitions(-DHAS_OPENMP)
endif()

# Now, find the libraries we need to compile against. Several variables can be
# set to manually specify the directory in which each of these libraries
# resides.
Expand Down
5 changes: 5 additions & 0 deletions src/mlpack/core.hpp
Expand Up @@ -232,6 +232,11 @@
#include <mlpack/core/kernels/spherical_kernel.hpp>
#include <mlpack/core/kernels/triangular_kernel.hpp>

// Use OpenMP if compiled with -DHAS_OPENMP.
#ifdef HAS_OPENMP
#include <omp.h>
#endif

// Use Armadillo's C++ version detection.
#ifdef ARMA_USE_CXX11
#define MLPACK_USE_CX11
Expand Down
2 changes: 1 addition & 1 deletion src/mlpack/methods/lsh/lsh_search.hpp
Expand Up @@ -270,7 +270,7 @@ class LSHSearch
template<typename VecType>
void ReturnIndicesFromTable(const VecType& queryPoint,
arma::uvec& referenceIndices,
size_t numTablesToSearch) const;
size_t numTablesToSearch);

/**
* This is a helper function that computes the distance of the query to the
Expand Down
97 changes: 62 additions & 35 deletions src/mlpack/methods/lsh/lsh_search_impl.hpp
Expand Up @@ -14,6 +14,16 @@ using std::cout; using std::endl; //TODO: remove
namespace mlpack {
namespace neighbor {

// Simple small function to set threads to 1 if OpenMP is not used
inline size_t DefineMaxThreads()
{
#ifdef _OPENMP
return omp_get_max_threads();
#else
return 1;
#endif
}

// Construct the object with random tables
template<typename SortPolicy>
LSHSearch<SortPolicy>::
Expand All @@ -31,7 +41,7 @@ LSHSearch(const arma::mat& referenceSet,
secondHashSize(secondHashSize),
bucketSize(bucketSize),
distanceEvaluations(0),
maxThreads(omp_get_max_threads()),
maxThreads(DefineMaxThreads()),
numThreadsUsed(1)
{
// Pass work to training function.
Expand Down Expand Up @@ -344,7 +354,7 @@ template<typename VecType>
void LSHSearch<SortPolicy>::ReturnIndicesFromTable(
const VecType& queryPoint,
arma::uvec& referenceIndices,
size_t numTablesToSearch) const
size_t numTablesToSearch)
{
// Decide on the number of tables to look into.
if (numTablesToSearch == 0) // If no user input is given, search all.
Expand Down Expand Up @@ -406,18 +416,37 @@ void LSHSearch<SortPolicy>::ReturnIndicesFromTable(
arma::Col<size_t> refPointsConsidered;
refPointsConsidered.zeros(referenceSet->n_cols);

for (size_t i = 0; i < hashVec.n_elem; ++i)
// Define the number of threads used to process this.
size_t numThreadsUsed = std::min(maxThreads, numTablesToSearch);

// Parallelization: By default nested parallelism is off, so this won't be
// parallel. The user might turn nested parallelism on if (for example) they
// have a query-by-query processing scheme and so processing more than one
// query at the same time doesn't make sense for them.

#pragma omp parallel for \
num_threads (numThreadsUsed) \
shared (hashVec, refPointsConsidered) \
schedule(dynamic)
for (size_t i = 0; i < numTablesToSearch; ++i)
{

const size_t hashInd = (size_t) hashVec[i];
const size_t tableRow = bucketRowInHashTable[hashInd];

// Pick the indices in the bucket corresponding to 'hashInd'.
if (tableRow != secondHashSize)
{
for (size_t j = 0; j < bucketContentSize[tableRow]; j++)
{
#pragma omp atomic
refPointsConsidered[secondHashTable[tableRow](j)]++;
}
}
}

// Only keep reference points found in at least one bucket.
// TODO: maybe write parallel implementation of this?
referenceIndices = arma::find(refPointsConsidered > 0);
return;
}
Expand All @@ -431,18 +460,39 @@ void LSHSearch<SortPolicy>::ReturnIndicesFromTable(

// Retrieve candidates.
size_t start = 0;

// Define the number of threads used to process this.
size_t numThreadsUsed = std::min(maxThreads, numTablesToSearch);

// Parallelization: By default nested parallelism is off, so this won't be
// parallel. The user might turn nested parallelism on if (for example) they
// have a query-by-query processing scheme and so processing more than one
// query at the same time doesn't make sense for them.

#pragma omp parallel for \
num_threads (numThreadsUsed) \
shared (hashVec, refPointsConsideredSmall, start) \
schedule(dynamic)
for (size_t i = 0; i < numTablesToSearch; ++i) // For all tables
{
const size_t hashInd = (size_t) hashVec[i]; // Find the query's bucket.
const size_t tableRow = bucketRowInHashTable[hashInd];

// Store all secondHashTable points in the candidates set.
if (tableRow != secondHashSize)
{
for (size_t j = 0; j < bucketContentSize[tableRow]; ++j)
refPointsConsideredSmall(start++) = secondHashTable[tableRow][j];
{
#pragma omp critical
{
refPointsConsideredSmall(start++) = secondHashTable[tableRow][j];
}
}
}
}

// Only keep unique candidates.
// TODO: again main bottleneck is here. Parallelize?
referenceIndices = arma::unique(refPointsConsideredSmall);
return;
}
Expand Down Expand Up @@ -489,25 +539,16 @@ void LSHSearch<SortPolicy>::Search(const arma::mat& querySet,

Timer::Start("computing_neighbors");

// Parallelization allows us to process more than one query at a time. To
// control workload and thread access, we use numThreadsUsed and maxThreads to
// make sure we only use as many threads as the user specified.
// Parallelization to process more than one query at a time.
// use as many threads possible but not more than allowed number
size_t numThreadsUsed = std::min( (arma::uword) maxThreads, querySet.n_cols );
#pragma omp parallel for \
if (numThreadsUsed <= maxThreads) \
num_threads (maxThreads-numThreadsUsed)\
num_threads ( numThreadsUsed )\
shared(avgIndicesReturned, resultingNeighbors, distances) \
schedule(dynamic)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two questions---

  • Is the dynamic schedule the right one to use here? My understanding was that the dynamic schedule had more overhead. In this case it seems like the default static schedule would be just fine.
  • I think the num_threads() call here will effectively set the number of threads used to the value of the environment variable OMP_NUM_THREADS, but that would be the same as the default anyway if you hadn't set num_threads(). So it makes me think that the maxThreads member is unnecessary (and the other supporting functionality).

Copy link
Contributor Author

@mentekid mentekid Jul 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the dynamic schedule the right one to use here? My understanding was that the dynamic schedule had more overhead. In this case it seems like the default static schedule would be just fine.

The problem with static scheduling is it doesn't leave room for work-stealing. Since queries get unequal sizes of candidate sets, in static scheduling some threads will finish their chunks quickly and then be useless. In dynamic scheduling, the compiler will detect slackers and give them more work to do.
I went with dynamic without trying static first, because of this. I can try that and see if there's a difference.

I think the num_threads() call here will effectively set the number of threads used to the value of the environment variable OMP_NUM_THREADS, but that would be the same as the default anyway if you hadn't set num_threads(). So it makes me think that the maxThreads member is unnecessary (and the other supporting functionality).

Yes I think I can simplify the code more now that we're not doing nested parallelism.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About static vs dynamic scheduling, I ran some tests:

Sift100k

dynamic: 0.086 +/- 0.008 s
 static: 0.093 +/- 0.014 s

phy

dynamic: 3.35 +/- 0.124 s
 static: 3.46 +/- 0.203 s

Corel

dynamic: 0.228 +/- 0.03 s
 static: 0.234 +/- 0.03 s

Miniboone

dynamic: 0.711 +/- 0.07 s
 static: 0.701 +/- 0.09 s

In the first 3, I'd say dynamic is slightly faster. It's hard to tell for Miniboone because the standard deviation is much larger than the difference. I'll run covertype and pokerhand in a while when my PC is not used.


// Go through every query point.
for (size_t i = 0; i < querySet.n_cols; i++)
{
// Master thread updates the number of threads used
if (i == 0 && omp_get_thread_num() == 0)
{
numThreadsUsed+=omp_get_num_threads();
Log::Info
<< "Using "<< numThreadsUsed << " threads to process queries." << endl;
}

// Hash every query into every hash table and eventually into the
// 'secondHashTable' to obtain the neighbor candidates.
Expand All @@ -526,8 +567,6 @@ void LSHSearch<SortPolicy>::Search(const arma::mat& querySet,
BaseCase(i, (size_t) refIndices[j], querySet, resultingNeighbors,
distances);
}
// parallel region over, reset number of threads to 1
numThreadsUsed = omp_get_num_threads();

Timer::Stop("computing_neighbors");

Expand Down Expand Up @@ -556,24 +595,16 @@ Search(const size_t k,

Timer::Start("computing_neighbors");

// Parallelization allows us to process more than one query at a time. To
// control workload and thread access, we use numThreadsUsed and maxThreads to
// make sure we only use as many threads as the user specified.
// Parallelization to process more than one query at a time.
// use as many threads possible but not more than allowed number
size_t numThreadsUsed = std::min( (arma::uword) maxThreads, referenceSet->n_cols );
#pragma omp parallel for \
if (numThreadsUsed <= maxThreads) \
num_threads (maxThreads-numThreadsUsed)\
num_threads ( numThreadsUsed )\
shared(avgIndicesReturned, resultingNeighbors, distances) \
schedule(dynamic)
// Go through every query point.
for (size_t i = 0; i < referenceSet->n_cols; i++)
{
// Master thread updates the number of threads used
if (i == 0 && omp_get_thread_num() == 0)
{
numThreadsUsed+=omp_get_num_threads();
Log::Info
<< "Using "<< numThreadsUsed << " threads to process queries." << endl;
}
// Hash every query into every hash table and eventually into the
// 'secondHashTable' to obtain the neighbor candidates.
arma::uvec refIndices;
Expand All @@ -592,10 +623,6 @@ Search(const size_t k,

}

// parallel region over, reset number of threads to 1
numThreadsUsed = omp_get_num_threads();


Timer::Stop("computing_neighbors");

distanceEvaluations += avgIndicesReturned;
Expand Down
4 changes: 2 additions & 2 deletions src/mlpack/tests/lsh_test.cpp
Expand Up @@ -499,7 +499,7 @@ BOOST_AUTO_TEST_CASE(ParallelBichromatic)
lshTest.Search(qdata, k, sequentialNeighbors, distances);

// Require both have same results
double recall = ComputeRecall(sequentialNeighbors, parallelNeighbors);
double recall = LSHSearch<>::ComputeRecall(sequentialNeighbors, parallelNeighbors);
BOOST_REQUIRE_EQUAL(recall, 1);
}

Expand Down Expand Up @@ -533,7 +533,7 @@ BOOST_AUTO_TEST_CASE(ParallelMonochromatic)
lshTest.Search(k, sequentialNeighbors, distances);

// Require both have same results
double recall = ComputeRecall(sequentialNeighbors, parallelNeighbors);
double recall = LSHSearch<>::ComputeRecall(sequentialNeighbors, parallelNeighbors);
BOOST_REQUIRE_EQUAL(recall, 1);
}

Expand Down