Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LSHSearch Parallelization #700

Merged
merged 26 commits into from Jul 8, 2016
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
72999dd
Implements parallel query processing for LSH
mentekid Jun 19, 2016
95d417f
Implements parallel query processing for LSH
mentekid Jun 19, 2016
afcc881
Book-keeping of number of threads used
mentekid Jun 19, 2016
5db5423
Adds parallelization to bichromatic search
mentekid Jun 19, 2016
abef504
Adds Bichromatic Parallel Test
mentekid Jun 19, 2016
4cbd43e
Merge branch 'master' into lsh-parallelization
mentekid Jun 24, 2016
a60ff91
Adds parallelism option to CMakeFiles, removes most omp.h dependence
mentekid Jun 27, 2016
6152527
Removes numThreadsUsed variable, changes how maxThreads is initialized
mentekid Jun 27, 2016
7cf77cd
Changes placeholder code for OpenMP
mentekid Jun 27, 2016
2ca48c6
Fixes CMakeFiles and code to make openMP actually transparent
mentekid Jun 27, 2016
a6aca41
CMake trickery to maybe make travis not crash
mentekid Jun 27, 2016
3d536c7
Simplifies OpenMP transparency code
mentekid Jun 27, 2016
c04b073
Adds -Wno-unknown-pragmas
mentekid Jun 27, 2016
65983d1
Commit to switch branch
mentekid Jun 28, 2016
b95a3ce
Changes BaseCase to include loop over candidate set. Changes loop var…
mentekid Jul 1, 2016
0d38271
Removes commented-out old BaseCase code
mentekid Jul 1, 2016
3af80c3
Merges multiprobe LSH
mentekid Jul 1, 2016
b02e2f3
Changes BaseCase to include the loop over candidate set. Places paral…
mentekid Jul 6, 2016
a1e9c28
Merges 3fe0b72
mentekid Jul 6, 2016
ad8e6d3
Modifies CMakeLists to remove -DHAS_OPENMP
mentekid Jul 7, 2016
c4c8ff9
Removes old code and comments from CMakeLists.txt
mentekid Jul 7, 2016
074d726
Restores size_t for openMP loop counters, changes CmakeLists to requi…
mentekid Jul 8, 2016
f982ca5
Removes maxThreads functionality from LSHSearch class
mentekid Jul 8, 2016
1fb998f
Removes HAS_OPENMP definition in CMakeFiles
mentekid Jul 8, 2016
b92d465
Workaround for OpenMP 2.0 (based on dt_utils.cpp)
mentekid Jul 8, 2016
2fee61e
Transforms omp loop to reduction
mentekid Jul 8, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
43 changes: 38 additions & 5 deletions src/mlpack/methods/lsh/lsh_search.hpp
Expand Up @@ -272,6 +272,40 @@ class LSHSearch
arma::uvec& referenceIndices,
size_t numTablesToSearch);

// /**
// * This is a helper function that computes the distance of the query to the
// * neighbor candidates and appropriately stores the best 'k' candidates. This
// * is specific to the monochromatic search case, where the query set is the
// * reference set.
// *
// * @param queryIndex The index of the query in question
// * @param referenceIndex The index of the neighbor candidate in question
// * @param neighbors Matrix holding output neighbors.
// * @param distances Matrix holding output distances.
// */
// void BaseCase(const size_t queryIndex,
// const size_t referenceIndex,
// arma::Mat<size_t>& neighbors,
// arma::mat& distances) const;
//
// /**
// * This is a helper function that computes the distance of the query to the
// * neighbor candidates and appropriately stores the best 'k' candidates. This
// * is specific to bichromatic search, where the query set is not the same as
// * the reference set.
// *
// * @param queryIndex The index of the query in question
// * @param referenceIndex The index of the neighbor candidate in question
// * @param querySet Set of query points.
// * @param neighbors Matrix holding output neighbors.
// * @param distances Matrix holding output distances.
// */
// void BaseCase(const size_t queryIndex,
// const size_t referenceIndex,
// const arma::mat& querySet,
// arma::Mat<size_t>& neighbors,
// arma::mat& distances) const;

/**
* This is a helper function that computes the distance of the query to the
* neighbor candidates and appropriately stores the best 'k' candidates. This
Expand All @@ -283,8 +317,9 @@ class LSHSearch
* @param neighbors Matrix holding output neighbors.
* @param distances Matrix holding output distances.
*/
// TODO: change documentation above
void BaseCase(const size_t queryIndex,
const size_t referenceIndex,
const arma::uvec& referenceIndices,
arma::Mat<size_t>& neighbors,
arma::mat& distances) const;

Expand All @@ -300,8 +335,9 @@ class LSHSearch
* @param neighbors Matrix holding output neighbors.
* @param distances Matrix holding output distances.
*/
//TODO: change documentation above.
void BaseCase(const size_t queryIndex,
const size_t referenceIndex,
const arma::uvec& referenceIndices,
const arma::mat& querySet,
arma::Mat<size_t>& neighbors,
arma::mat& distances) const;
Expand Down Expand Up @@ -373,9 +409,6 @@ class LSHSearch
//! The maximum number of threads allowed.
size_t maxThreads;

//! The number of threads currently in use.
size_t numThreadsUsed;

}; // class LSHSearch

} // namespace neighbor
Expand Down
177 changes: 111 additions & 66 deletions src/mlpack/methods/lsh/lsh_search_impl.hpp
Expand Up @@ -293,6 +293,7 @@ void LSHSearch<SortPolicy>::InsertNeighbor(arma::mat& distances,
neighbors(pos, queryIndex) = neighbor;
}

/*
// Base case where the query set is the reference set. (So, we can't return
// ourselves as the nearest neighbor.)
template<typename SortPolicy>
Expand All @@ -319,8 +320,13 @@ void LSHSearch<SortPolicy>::BaseCase(const size_t queryIndex,

// SortDistance() returns (size_t() - 1) if we shouldn't add it.
if (insertPosition != (size_t() - 1))
InsertNeighbor(distances, neighbors, queryIndex, insertPosition,
referenceIndex, distance);
{
#pragma omp critical
{
InsertNeighbor(distances, neighbors, queryIndex, insertPosition,
referenceIndex, distance);
}
}
}

// Base case for bichromatic search.
Expand All @@ -345,10 +351,79 @@ void LSHSearch<SortPolicy>::BaseCase(const size_t queryIndex,

// SortDistance() returns (size_t() - 1) if we shouldn't add it.
if (insertPosition != (size_t() - 1))
InsertNeighbor(distances, neighbors, queryIndex, insertPosition,
referenceIndex, distance);
{
#pragma omp critical
{
InsertNeighbor(distances, neighbors, queryIndex, insertPosition,
referenceIndex, distance);
}
}
}
*/

// Base case where the query set is the reference set. (So, we can't return
// ourselves as the nearest neighbor.)
template<typename SortPolicy>
inline force_inline
void LSHSearch<SortPolicy>::BaseCase(const size_t queryIndex,
const arma::uvec& referenceIndices,
arma::Mat<size_t>& neighbors,
arma::mat& distances) const
{
for (size_t j = 0; j < referenceIndices.n_elem; ++j)
{
const size_t referenceIndex = referenceIndices[j];
// If the points are the same, skip this point.
if (queryIndex == referenceIndex)
continue;

const double distance = metric::EuclideanDistance::Evaluate(
referenceSet->unsafe_col(queryIndex),
referenceSet->unsafe_col(referenceIndex));

// If this distance is better than any of the current candidates, the
// SortDistance() function will give us the position to insert it into.
arma::vec queryDist = distances.unsafe_col(queryIndex);
arma::Col<size_t> queryIndices = neighbors.unsafe_col(queryIndex);
size_t insertPosition = SortPolicy::SortDistance(queryDist, queryIndices,
distance);

// SortDistance() returns (size_t() - 1) if we shouldn't add it.
if (insertPosition != (size_t() - 1))
InsertNeighbor(distances, neighbors, queryIndex, insertPosition,
referenceIndex, distance);
}
}

// Base case for bichromatic search.
template<typename SortPolicy>
inline force_inline
void LSHSearch<SortPolicy>::BaseCase(const size_t queryIndex,
const arma::uvec& referenceIndices,
const arma::mat& querySet,
arma::Mat<size_t>& neighbors,
arma::mat& distances) const
{
for (size_t j = 0; j < referenceIndices.n_elem; ++j)
{
const size_t referenceIndex = referenceIndices[j];
const double distance = metric::EuclideanDistance::Evaluate(
querySet.unsafe_col(queryIndex),
referenceSet->unsafe_col(referenceIndex));

// If this distance is better than any of the current candidates, the
// SortDistance() function will give us the position to insert it into.
arma::vec queryDist = distances.unsafe_col(queryIndex);
arma::Col<size_t> queryIndices = neighbors.unsafe_col(queryIndex);
size_t insertPosition = SortPolicy::SortDistance(queryDist, queryIndices,
distance);

// SortDistance() returns (size_t() - 1) if we shouldn't add it.
if (insertPosition != (size_t() - 1))
InsertNeighbor(distances, neighbors, queryIndex, insertPosition,
referenceIndex, distance);
}
}
template<typename SortPolicy>
template<typename VecType>
void LSHSearch<SortPolicy>::ReturnIndicesFromTable(
Expand Down Expand Up @@ -416,45 +491,21 @@ void LSHSearch<SortPolicy>::ReturnIndicesFromTable(
arma::Col<size_t> refPointsConsidered;
refPointsConsidered.zeros(referenceSet->n_cols);

// Define the number of threads used to process this.
size_t numThreadsUsed = std::min(maxThreads, numTablesToSearch);

// Parallelization: By default nested parallelism is off, so this won't be
// parallel. The user might turn nested parallelism on if (for example) they
// have a query-by-query processing scheme and so processing more than one
// query at the same time doesn't make sense for them.

#pragma omp parallel for \
num_threads (numThreadsUsed) \
shared (hashVec, refPointsConsidered) \
schedule(dynamic)
for (size_t i = 0; i < numTablesToSearch; ++i)
for (long long int i = 0; i < numTablesToSearch; ++i)
{

const size_t hashInd = (size_t) hashVec[i];
const size_t tableRow = bucketRowInHashTable[hashInd];

// Pick the indices in the bucket corresponding to 'hashInd'.
if (tableRow != secondHashSize)
{
for (size_t j = 0; j < bucketContentSize[tableRow]; j++)
{
#pragma omp atomic
refPointsConsidered[secondHashTable[tableRow](j)]++;
}
}
}

// Only keep reference points found in at least one bucket. If OpenMP is
// found, do it in parallel
#ifdef OPENMP_FOUND
// TODO: change this to our own function?
referenceIndices = arma::find(refPointsConsidered > 0);
return;
#else
referenceIndices = arma::find(refPointsConsidered > 0);
return;
#endif
// Only keep reference points found in at least one bucket.
referenceIndices = arma::find(refPointsConsidered > 0);
return;
}
else
{
Expand All @@ -467,45 +518,20 @@ void LSHSearch<SortPolicy>::ReturnIndicesFromTable(
// Retrieve candidates.
size_t start = 0;

// Define the number of threads used to process this.
size_t numThreadsUsed = std::min(maxThreads, numTablesToSearch);

// Parallelization: By default nested parallelism is off, so this won't be
// parallel. The user might turn nested parallelism on if (for example) they
// have a query-by-query processing scheme and so processing more than one
// query at the same time doesn't make sense for them.

#pragma omp parallel for \
num_threads (numThreadsUsed) \
shared (hashVec, refPointsConsideredSmall, start) \
schedule(dynamic)
for (size_t i = 0; i < numTablesToSearch; ++i) // For all tables
for (long long int i = 0; i < numTablesToSearch; ++i) // For all tables
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The restriction to long long here to support Windows really irks me, and as far as I can tell there is no reasonable and portable way to get something that's the same size as size_t but signed (there is ssize_t but its support on Windows is unclear to me). I have half a mind to just require OpenMP 3.0 support to get rid of this stupid restriction, which would disable OpenMP support with Visual Studio. I am not sure I am bothered by that; OpenMP 3 is almost a decade old at this point and the Visual Studio team still doesn't have support for it, so I am not sure I want to keep restricting us to such a legacy version. Windows users, if they need parallelism, can always switch to using MinGW or ICC or something like that. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's pretty reasonable. I can go back to using size_t and ask to detect version 3 and up.

{
const size_t hashInd = (size_t) hashVec[i]; // Find the query's bucket.
const size_t tableRow = bucketRowInHashTable[hashInd];

// Store all secondHashTable points in the candidates set.
if (tableRow != secondHashSize)
{
for (size_t j = 0; j < bucketContentSize[tableRow]; ++j)
{
#pragma omp critical
{
refPointsConsideredSmall(start++) = secondHashTable[tableRow][j];
}
}
}
refPointsConsideredSmall(start++) = secondHashTable[tableRow][j];
}

// Only keep unique candidates. If OpenMP is found, do it in parallel.
#ifdef OPENMP_FOUND
// TODO: change this to our own function?
referenceIndices = arma::unique(refPointsConsideredSmall);
return;
#else
referenceIndices = arma::unique(refPointsConsideredSmall);
return;
#endif
// Keep only one copy of each candidate.
referenceIndices = arma::unique(refPointsConsideredSmall);
return;
}
}

Expand Down Expand Up @@ -557,8 +583,9 @@ void LSHSearch<SortPolicy>::Search(const arma::mat& querySet,
num_threads ( numThreadsUsed )\
shared(avgIndicesReturned, resultingNeighbors, distances) \
schedule(dynamic)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two questions---

  • Is the dynamic schedule the right one to use here? My understanding was that the dynamic schedule had more overhead. In this case it seems like the default static schedule would be just fine.
  • I think the num_threads() call here will effectively set the number of threads used to the value of the environment variable OMP_NUM_THREADS, but that would be the same as the default anyway if you hadn't set num_threads(). So it makes me think that the maxThreads member is unnecessary (and the other supporting functionality).

Copy link
Contributor Author

@mentekid mentekid Jul 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the dynamic schedule the right one to use here? My understanding was that the dynamic schedule had more overhead. In this case it seems like the default static schedule would be just fine.

The problem with static scheduling is it doesn't leave room for work-stealing. Since queries get unequal sizes of candidate sets, in static scheduling some threads will finish their chunks quickly and then be useless. In dynamic scheduling, the compiler will detect slackers and give them more work to do.
I went with dynamic without trying static first, because of this. I can try that and see if there's a difference.

I think the num_threads() call here will effectively set the number of threads used to the value of the environment variable OMP_NUM_THREADS, but that would be the same as the default anyway if you hadn't set num_threads(). So it makes me think that the maxThreads member is unnecessary (and the other supporting functionality).

Yes I think I can simplify the code more now that we're not doing nested parallelism.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About static vs dynamic scheduling, I ran some tests:

Sift100k

dynamic: 0.086 +/- 0.008 s
 static: 0.093 +/- 0.014 s

phy

dynamic: 3.35 +/- 0.124 s
 static: 3.46 +/- 0.203 s

Corel

dynamic: 0.228 +/- 0.03 s
 static: 0.234 +/- 0.03 s

Miniboone

dynamic: 0.711 +/- 0.07 s
 static: 0.701 +/- 0.09 s

In the first 3, I'd say dynamic is slightly faster. It's hard to tell for Miniboone because the standard deviation is much larger than the difference. I'll run covertype and pokerhand in a while when my PC is not used.

// Go through every query point.
for (size_t i = 0; i < querySet.n_cols; i++)
// Go through every query point. Use long int because some compilers complain
// for openMP unsigned index variables.
for (long long int i = 0; i < querySet.n_cols; i++)
{

// Hash every query into every hash table and eventually into the
Expand All @@ -574,9 +601,17 @@ void LSHSearch<SortPolicy>::Search(const arma::mat& querySet,

// Sequentially go through all the candidates and save the best 'k'
// candidates.
/*
numTheadsUsed = std::min( (arma::uword) maxThreads, refIndices.n_elem);
#pragma omp parallel for\
num_threads( numThreadsUsed )\
shared(refIndices, resultingNeighbors, distances, querySet)\
schedule(dynamic)
for (size_t j = 0; j < refIndices.n_elem; j++)
BaseCase(i, (size_t) refIndices[j], querySet, resultingNeighbors,
distances);
*/
BaseCase(i, refIndices, querySet, resultingNeighbors, distances);
}

Timer::Stop("computing_neighbors");
Expand Down Expand Up @@ -613,8 +648,9 @@ Search(const size_t k,
num_threads ( numThreadsUsed )\
shared(avgIndicesReturned, resultingNeighbors, distances) \
schedule(dynamic)
// Go through every query point.
for (size_t i = 0; i < referenceSet->n_cols; i++)
// Go through every query point. Use long int because some compilers complain
// for openMP unsigned index variables.
for (long long int i = 0; i < referenceSet->n_cols; i++)
{
// Hash every query into every hash table and eventually into the
// 'secondHashTable' to obtain the neighbor candidates.
Expand All @@ -629,8 +665,17 @@ Search(const size_t k,

// Sequentially go through all the candidates and save the best 'k'
// candidates.

/*
numTheadsUsed = std::min( (arma::uword) maxThreads, refIndices.n_elem);
#pragma omp parallel for\
num_threads( numThreadsUsed )\
shared(refIndices, resultingNeighbors, distances)\
schedule(dynamic)
for (size_t j = 0; j < refIndices.n_elem; j++)
BaseCase(i, (size_t) refIndices[j], resultingNeighbors, distances);
*/
BaseCase(i, refIndices, resultingNeighbors, distances);

}

Expand Down