From 3fa3fd1b6d546b9cc4bb3656b9c94dbf492dcc81 Mon Sep 17 00:00:00 2001 From: Rok Novosel Date: Tue, 28 Jul 2020 10:50:02 +0200 Subject: [PATCH 01/13] Multithreaded build for C++, Python and Lua. --- annoy-dev-1.rockspec | 4 +- examples/s_compile_cpp.sh | 2 +- src/annoygomodule.h | 4 +- src/annoylib.h | 171 ++++++++++++++++++++++++++----- src/annoyluamodule.cc | 7 +- src/annoymodule.cc | 9 +- test/multithreaded_build_test.py | 23 +++++ 7 files changed, 182 insertions(+), 38 deletions(-) create mode 100644 test/multithreaded_build_test.py diff --git a/annoy-dev-1.rockspec b/annoy-dev-1.rockspec index 8e3acea7..58cc796c 100644 --- a/annoy-dev-1.rockspec +++ b/annoy-dev-1.rockspec @@ -44,14 +44,14 @@ build = { unix = { modules = { ['annoy'] = { - libraries = {"stdc++"}, + libraries = {"stdc++", "pthread"}, }, }, }, mingw32 = { modules = { ['annoy'] = { - libraries = {"stdc++"}, + libraries = {"stdc++", "pthread"}, }, }, }, diff --git a/examples/s_compile_cpp.sh b/examples/s_compile_cpp.sh index 687a6082..9a496392 100755 --- a/examples/s_compile_cpp.sh +++ b/examples/s_compile_cpp.sh @@ -2,6 +2,6 @@ echo "compiling precision example..." -cmd="g++ precision_test.cpp -o precision_test -std=c++11" +cmd="g++ precision_test.cpp -o precision_test -std=c++11 -pthread" eval $cmd echo "Done" diff --git a/src/annoygomodule.h b/src/annoygomodule.h index 005ed065..a96f7df0 100644 --- a/src/annoygomodule.h +++ b/src/annoygomodule.h @@ -16,8 +16,8 @@ class AnnoyIndex { void addItem(int item, const float* w) { ptr->add_item(item, w); }; - void build(int q) { - ptr->build(q); + void build(int q, int n_threads=1) { + ptr->build(q, n_threads); }; bool save(const char* filename, bool prefault) { return ptr->save(filename, prefault); diff --git a/src/annoylib.h b/src/annoylib.h index 00592165..9c37485f 100644 --- a/src/annoylib.h +++ b/src/annoylib.h @@ -57,6 +57,9 @@ typedef signed __int64 int64_t; #include #include #include +#include +#include +#include #ifdef _MSC_VER // Needed for Visual Studio to disable runtime checks for mempcy @@ -133,6 +136,8 @@ using std::vector; using std::pair; using std::numeric_limits; using std::make_pair; +using std::mutex; +using std::thread; inline bool remap_memory_and_truncate(void** _ptr, int _fd, size_t old_size, size_t new_size) { #ifdef __linux__ @@ -811,13 +816,32 @@ struct Manhattan : Minkowski { } }; +class ThreadBarrier { +public: + explicit ThreadBarrier(std::size_t count) : _count(count) { } + + void wait() { + std::unique_lock lock(_mutex); + if (--_count == 0) { + _cv.notify_all(); + } else { + _cv.wait(lock, [this] { return _count == 0; }); + } + } + +private: + std::mutex _mutex; + std::condition_variable _cv; + std::size_t _count; +}; + template class AnnoyIndexInterface { public: // Note that the methods with an **error argument will allocate memory and write the pointer to that string if error is non-NULL virtual ~AnnoyIndexInterface() {}; virtual bool add_item(S item, const T* w, char** error=NULL) = 0; - virtual bool build(int q, char** error=NULL) = 0; + virtual bool build(int q, int n_threads=1, char** error=NULL) = 0; virtual bool unbuild(char** error=NULL) = 0; virtual bool save(const char* filename, bool prefault=false, char** error=NULL) = 0; virtual void unload() = 0; @@ -850,12 +874,13 @@ template const int _f; size_t _s; S _n_items; - Random _random; void* _nodes; // Could either be mmapped, or point to a memory buffer that we reallocate S _n_nodes; S _nodes_size; vector _roots; S _K; + bool _is_seeded; + int _seed; bool _loaded; bool _verbose; int _fd; @@ -863,7 +888,7 @@ template bool _built; public: - AnnoyIndex(int f) : _f(f), _random() { + AnnoyIndex(int f) : _f(f) { _s = offsetof(Node, v) + _f * sizeof(T); // Size of each node _verbose = false; _built = false; @@ -929,7 +954,7 @@ template return true; } - bool build(int q, char** error=NULL) { + bool build(int q, int n_threads=1, char** error=NULL) { if (_loaded) { set_error_from_string(error, "You can't build a loaded index"); return false; @@ -940,23 +965,32 @@ template return false; } + if (n_threads < 1) { + set_error_from_string(error, "You can't build an index with less than 1 thread"); + return false; + } + D::template preprocess(_nodes, _s, _n_items, _f); _n_nodes = _n_items; - while (1) { - if (q == -1 && _n_nodes >= _n_items * 2) - break; - if (q != -1 && _roots.size() >= (size_t)q) - break; - if (_verbose) showUpdate("pass %zd...\n", _roots.size()); - vector indices; - for (S i = 0; i < _n_items; i++) { - if (_get(i)->n_descendants >= 1) // Issue #223 - indices.push_back(i); + std::mutex _nodes_mutex; + ThreadBarrier barrier(n_threads); + vector threads(n_threads); + int work_per_thread = (int)floor(q / (double)n_threads); + int work_remainder = q % n_threads; + for (int i = 0; i < n_threads; i++) { + int trees_per_thread = -1; + if (q > -1) { + // First thread picks up the remainder of the work + trees_per_thread = i == 0 ? work_per_thread + work_remainder : work_per_thread; } - _roots.push_back(_make_tree(indices, true)); + threads[i] = std::thread(&AnnoyIndex::_thread_build, this, trees_per_thread, i, std::ref(barrier), std::ref(_nodes_mutex)); + } + + for (auto& thread : threads) { + thread.join(); } // Also, copy the roots into the last segment of the array @@ -1035,6 +1069,7 @@ template _n_nodes = 0; _nodes_size = 0; _on_disk = false; + _is_seeded = false; _roots.clear(); } @@ -1142,7 +1177,8 @@ template } void set_seed(int seed) { - _random.set_seed(seed); + _is_seeded = true; + _seed = seed; } protected: @@ -1172,7 +1208,87 @@ template return get_node_ptr(_nodes, _s, i); } - S _make_tree(const vector& indices, bool is_root) { + void _thread_build(int q, int thread_idx, ThreadBarrier& barrier, std::mutex& _nodes_mutex) { + Random _random; + // Each thread needs its own seed, otherwise each thread would be building the same tree(s) + int seed = _is_seeded ? _seed + thread_idx : thread_idx; + _random.set_seed(seed); + + vector > thread_trees; + vector thread_roots; + while (1) { + if (q == -1) { + size_t thread_n_nodes = 0; + for (size_t tree_idx = 0; tree_idx < thread_trees.size(); tree_idx++) { + thread_n_nodes += thread_trees[tree_idx].size(); + } + if (thread_n_nodes >= 2 * (size_t)_n_items) { + break; + } + } else { + if (thread_roots.size() >= (size_t)q) { + break; + } + } + + if (_verbose) showUpdate("pass %zd...\n", thread_roots.size()); + + vector indices; + for (S i = 0; i < _n_items; i++) { + if (_get(i)->n_descendants >= 1) // Issue #223 + indices.push_back(i); + } + + vector split_nodes; + // Each thread is essentially pretending to build only one tree that will get inserted + // right after the already inserted items. Indices of split nodes start with _n_items, n_items + 1, ... + // We do not want to mutate the _nodes array during tree construction due to reallocation issues. That is + // why each thread stores the trees locally until all threads are ready to insert them into _nodes. + S root_node = _make_tree(indices, split_nodes, true, _random); + thread_roots.push_back(root_node); + thread_trees.push_back(split_nodes); + } + + // Wait for all threads to finish before we can start inserting tree nodes into global _nodes array + barrier.wait(); + + _nodes_mutex.lock(); + // When a thread wants to insert local tree nodes into global _nodes it has to stop pretending that there is + // going to be only one tree. Each thread has to update all split nodes children that are pointing to other split nodes + // because their indices will change once inserted into global _nodes. + for (size_t tree_idx = 0; tree_idx < thread_trees.size(); tree_idx++) { + vector split_nodes = thread_trees[tree_idx]; + // Offset from _n_items where split nodes will get inserted + S split_nodes_offset = _n_nodes - _n_items; + _allocate_size(_n_nodes + split_nodes.size()); + + for (size_t node_idx = 0; node_idx < split_nodes.size(); node_idx++) { + Node* split_node = split_nodes[node_idx]; + bool is_root = (size_t)thread_roots[tree_idx] == (_n_items + node_idx); + + // Inverted condition from _make_tree to detect split nodes + if ((split_node->n_descendants > _K) || (is_root && (size_t)_n_items > (size_t)_K && split_node->n_descendants > 1)) { + for (size_t child_idx = 0; child_idx < 2; child_idx++) { + // Update children offset if it is pointing to a split node + if (split_node->children[child_idx] >= _n_items) { + split_node->children[child_idx] += split_nodes_offset; + } + } + } + + mempcpy(_get(_n_nodes), split_node, _s); + free(split_node); + + _n_nodes += 1; + } + + thread_roots[tree_idx] += split_nodes_offset; + } + _roots.insert(_roots.end(), thread_roots.begin(), thread_roots.end()); + _nodes_mutex.unlock(); + } + + S _make_tree(const vector& indices, vector& split_nodes, bool is_root, Random& _random) { // The basic rule is that if we have <= _K items, then it's a leaf node, otherwise it's a split node. // There's some regrettable complications caused by the problem that root nodes have to be "special": // 1. We identify root nodes by the arguable logic that _n_items == n->n_descendants, regardless of how many descendants they actually have @@ -1182,9 +1298,8 @@ template return indices[0]; if (indices.size() <= (size_t)_K && (!is_root || (size_t)_n_items <= (size_t)_K || indices.size() == 1)) { - _allocate_size(_n_nodes + 1); - S item = _n_nodes++; - Node* m = _get(item); + Node* m = (Node*)malloc(_s); + memset(m, 0, _s); m->n_descendants = is_root ? _n_items : (S)indices.size(); // Using std::copy instead of a loop seems to resolve issues #3 and #13, @@ -1193,7 +1308,9 @@ template // Only copy when necessary to avoid crash in MSVC 9. #293 if (!indices.empty()) memcpy(m->children, &indices[0], indices.size() * sizeof(S)); - return item; + + split_nodes.push_back(m); + return _n_items + (split_nodes.size() - 1); } vector children; @@ -1205,7 +1322,8 @@ template } vector children_indices[2]; - Node* m = (Node*)alloca(_s); + Node* m = (Node*)malloc(_s); + memset(m, 0, _s); D::create_split(children, _f, _s, _random, m); for (size_t i = 0; i < indices.size(); i++) { @@ -1246,14 +1364,11 @@ template m->n_descendants = is_root ? _n_items : (S)indices.size(); for (int side = 0; side < 2; side++) { // run _make_tree for the smallest child first (for cache locality) - m->children[side^flip] = _make_tree(children_indices[side^flip], false); + m->children[side^flip] = _make_tree(children_indices[side^flip], split_nodes, false, _random); } - _allocate_size(_n_nodes + 1); - S item = _n_nodes++; - memcpy(_get(item), m, _s); - - return item; + split_nodes.push_back(m); + return _n_items + (split_nodes.size() - 1); } void _get_all_nns(const T* v, size_t n, int search_k, vector* result, vector* distances) const { diff --git a/src/annoyluamodule.cc b/src/annoyluamodule.cc index 76fec7c9..3b5a0a69 100644 --- a/src/annoyluamodule.cc +++ b/src/annoyluamodule.cc @@ -118,9 +118,14 @@ class LuaAnnoy { } static int build(lua_State* L) { + int nargs = lua_gettop(L); Impl* self = getAnnoy(L, 1); int n_trees = luaL_checkinteger(L, 2); - self->build(n_trees); + int n_threads = 1; + if (nargs >= 3) { + n_threads = luaL_checkinteger(L, 3); + } + self->build(n_trees, n_threads); lua_pushboolean(L, true); return 1; } diff --git a/src/annoymodule.cc b/src/annoymodule.cc index f15a0cc6..830db2c0 100644 --- a/src/annoymodule.cc +++ b/src/annoymodule.cc @@ -84,7 +84,7 @@ class HammingWrapper : public AnnoyIndexInterface { _pack(w, &w_internal[0]); return _index.add_item(item, &w_internal[0], error); }; - bool build(int q, char** error) { return _index.build(q, error); }; + bool build(int q, int n_threads, char** error) { return _index.build(q, n_threads, error); }; bool unbuild(char** error) { return _index.unbuild(error); }; bool save(const char* filename, bool prefault, char** error) { return _index.save(filename, prefault, error); }; void unload() { _index.unload(); }; @@ -408,16 +408,17 @@ py_an_on_disk_build(py_annoy *self, PyObject *args, PyObject *kwargs) { static PyObject * py_an_build(py_annoy *self, PyObject *args, PyObject *kwargs) { int q; + int n_threads = 1; if (!self->ptr) return NULL; - static char const * kwlist[] = {"n_trees", NULL}; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "i", (char**)kwlist, &q)) + static char const * kwlist[] = {"n_trees", "n_threads", NULL}; + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "i|i", (char**)kwlist, &q, &n_threads)) return NULL; bool res; char* error; Py_BEGIN_ALLOW_THREADS; - res = self->ptr->build(q, &error); + res = self->ptr->build(q, n_threads, &error); Py_END_ALLOW_THREADS; if (!res) { PyErr_SetString(PyExc_Exception, error); diff --git a/test/multithreaded_build_test.py b/test/multithreaded_build_test.py new file mode 100644 index 00000000..f46b79de --- /dev/null +++ b/test/multithreaded_build_test.py @@ -0,0 +1,23 @@ +import numpy +import unittest +from annoy import AnnoyIndex + + +class MultithreadedBuildTest(unittest.TestCase): + def _test_building_with_threads(self, n_threads): + n, f = 10000, 10 + n_trees = 31 + i = AnnoyIndex(f, 'euclidean') + for j in range(n): + i.add_item(j, numpy.random.normal(size=f)) + self.assertTrue(i.build(n_trees, n_threads=n_threads)) + self.assertEqual(n_trees, i.get_n_trees()) + + def test_two_threads(self): + self._test_building_with_threads(2) + + def test_four_threads(self): + self._test_building_with_threads(4) + + def test_eight_threads(self): + self._test_building_with_threads(8) From d3b5914a37a38bb79bdf51588338a2424213450f Mon Sep 17 00:00:00 2001 From: Rok Novosel Date: Tue, 28 Jul 2020 11:04:55 +0200 Subject: [PATCH 02/13] Add std=c++11 to extra_compile_args. --- setup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 147483ee..983b04d9 100644 --- a/setup.py +++ b/setup.py @@ -35,7 +35,7 @@ long_description = readme_note + fobj.read() # Various platform-dependent extras -extra_compile_args = ['-D_CRT_SECURE_NO_WARNINGS'] +extra_compile_args = ['-D_CRT_SECURE_NO_WARNINGS', '-std=c++11'] extra_link_args = [] # Not all CPUs have march as a tuning parameter @@ -51,7 +51,7 @@ # #349: something with OS X Mojave causes libstd not to be found if platform.system() == 'Darwin': - extra_compile_args += ['-std=c++11', '-mmacosx-version-min=10.9'] + extra_compile_args += ['-mmacosx-version-min=10.9'] extra_link_args += ['-stdlib=libc++', '-mmacosx-version-min=10.9'] # Manual configuration, you're on your own here. From f4f9ae1b7043fbaa539d0b4f88094ed0e1a52671 Mon Sep 17 00:00:00 2001 From: Rok Novosel Date: Tue, 28 Jul 2020 11:13:07 +0200 Subject: [PATCH 03/13] Use std::isnan. --- src/annoylib.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/annoylib.h b/src/annoylib.h index 9c37485f..168af719 100644 --- a/src/annoylib.h +++ b/src/annoylib.h @@ -600,7 +600,7 @@ struct DotProduct : Angular { for (S i = 0; i < node_count; i++) { Node* node = get_node_ptr(nodes, _s, i); T norm = sqrt(dot(node->v, node->v, f)); - if (isnan(norm)) norm = 0; + if (std::isnan(norm)) norm = 0; node->dot_factor = norm; } @@ -619,7 +619,7 @@ struct DotProduct : Angular { T node_norm = node->dot_factor; T dot_factor = sqrt(pow(max_norm, static_cast(2.0)) - pow(node_norm, static_cast(2.0))); - if (isnan(dot_factor)) dot_factor = 0; + if (std::isnan(dot_factor)) dot_factor = 0; node->dot_factor = dot_factor; } From 4dd2fca84cde690483d304b76cd980d1a32b6d07 Mon Sep 17 00:00:00 2001 From: Rok Novosel Date: Tue, 28 Jul 2020 11:33:07 +0200 Subject: [PATCH 04/13] Remove typo. --- src/annoylib.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/annoylib.h b/src/annoylib.h index 168af719..4f3afdb6 100644 --- a/src/annoylib.h +++ b/src/annoylib.h @@ -1276,7 +1276,7 @@ template } } - mempcpy(_get(_n_nodes), split_node, _s); + memcpy(_get(_n_nodes), split_node, _s); free(split_node); _n_nodes += 1; From c913236e8e14e7704871e40ab4eae157523e374a Mon Sep 17 00:00:00 2001 From: Rok Novosel Date: Tue, 28 Jul 2020 12:28:14 +0200 Subject: [PATCH 05/13] Replace isnan with macro. --- setup.py | 4 ++-- src/annoylib.h | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/setup.py b/setup.py index 983b04d9..71c2eaa5 100644 --- a/setup.py +++ b/setup.py @@ -35,7 +35,7 @@ long_description = readme_note + fobj.read() # Various platform-dependent extras -extra_compile_args = ['-D_CRT_SECURE_NO_WARNINGS', '-std=c++11'] +extra_compile_args = ['-D_CRT_SECURE_NO_WARNINGS'] extra_link_args = [] # Not all CPUs have march as a tuning parameter @@ -47,7 +47,7 @@ extra_compile_args += cputune if os.name != 'nt': - extra_compile_args += ['-O3', '-ffast-math', '-fno-associative-math'] + extra_compile_args += ['-std=c++11', '-O3', '-ffast-math', '-fno-associative-math'] # #349: something with OS X Mojave causes libstd not to be found if platform.system() == 'Darwin': diff --git a/src/annoylib.h b/src/annoylib.h index 4f3afdb6..d4772e5f 100644 --- a/src/annoylib.h +++ b/src/annoylib.h @@ -106,6 +106,7 @@ inline void set_error_from_string(char **error, const char* msg) { #ifndef _MSC_VER #define popcount __builtin_popcountll +#define isnan(x) std::isnan(x) #else // See #293, #358 #define isnan(x) _isnan(x) #define popcount cole_popcount @@ -600,7 +601,7 @@ struct DotProduct : Angular { for (S i = 0; i < node_count; i++) { Node* node = get_node_ptr(nodes, _s, i); T norm = sqrt(dot(node->v, node->v, f)); - if (std::isnan(norm)) norm = 0; + if (isnan(norm)) norm = 0; node->dot_factor = norm; } @@ -619,7 +620,7 @@ struct DotProduct : Angular { T node_norm = node->dot_factor; T dot_factor = sqrt(pow(max_norm, static_cast(2.0)) - pow(node_norm, static_cast(2.0))); - if (std::isnan(dot_factor)) dot_factor = 0; + if (isnan(dot_factor)) dot_factor = 0; node->dot_factor = dot_factor; } From 1297412caa88abdf478da7b32f445fbcbb1830c1 Mon Sep 17 00:00:00 2001 From: Rok Novosel Date: Tue, 28 Jul 2020 13:20:10 +0200 Subject: [PATCH 06/13] Replace isnan with checks before sqrt. --- src/annoylib.h | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/annoylib.h b/src/annoylib.h index d4772e5f..2a8ab3df 100644 --- a/src/annoylib.h +++ b/src/annoylib.h @@ -106,9 +106,7 @@ inline void set_error_from_string(char **error, const char* msg) { #ifndef _MSC_VER #define popcount __builtin_popcountll -#define isnan(x) std::isnan(x) #else // See #293, #358 -#define isnan(x) _isnan(x) #define popcount cole_popcount #endif @@ -600,8 +598,8 @@ struct DotProduct : Angular { // Step one: compute the norm of each vector and store that in its extra dimension (f-1) for (S i = 0; i < node_count; i++) { Node* node = get_node_ptr(nodes, _s, i); - T norm = sqrt(dot(node->v, node->v, f)); - if (isnan(norm)) norm = 0; + T d = dot(node->v, node->v, f); + T norm = d < 0 ? 0 : sqrt(d); node->dot_factor = norm; } @@ -618,9 +616,8 @@ struct DotProduct : Angular { for (S i = 0; i < node_count; i++) { Node* node = get_node_ptr(nodes, _s, i); T node_norm = node->dot_factor; - - T dot_factor = sqrt(pow(max_norm, static_cast(2.0)) - pow(node_norm, static_cast(2.0))); - if (isnan(dot_factor)) dot_factor = 0; + T squared_norm_diff = pow(max_norm, static_cast(2.0)) - pow(node_norm, static_cast(2.0)); + T dot_factor = squared_norm_diff < 0 ? 0 : sqrt(squared_norm_diff); node->dot_factor = dot_factor; } From 46132027c8b34fc01ea3a12e78fb35442619f062 Mon Sep 17 00:00:00 2001 From: Rok Novosel Date: Tue, 28 Jul 2020 17:35:06 +0200 Subject: [PATCH 07/13] Better work per thread calculation. --- src/annoylib.h | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/src/annoylib.h b/src/annoylib.h index 2a8ab3df..7985b164 100644 --- a/src/annoylib.h +++ b/src/annoylib.h @@ -972,19 +972,12 @@ template _n_nodes = _n_items; - std::mutex _nodes_mutex; + std::mutex nodes_mutex; ThreadBarrier barrier(n_threads); vector threads(n_threads); - int work_per_thread = (int)floor(q / (double)n_threads); - int work_remainder = q % n_threads; for (int i = 0; i < n_threads; i++) { - int trees_per_thread = -1; - if (q > -1) { - // First thread picks up the remainder of the work - trees_per_thread = i == 0 ? work_per_thread + work_remainder : work_per_thread; - } - - threads[i] = std::thread(&AnnoyIndex::_thread_build, this, trees_per_thread, i, std::ref(barrier), std::ref(_nodes_mutex)); + int trees_per_thread = q == -1 ? -1 : (int)floor((q + i) / n_threads); + threads[i] = std::thread(&AnnoyIndex::_thread_build, this, trees_per_thread, i, std::ref(barrier), std::ref(nodes_mutex)); } for (auto& thread : threads) { @@ -1206,7 +1199,7 @@ template return get_node_ptr(_nodes, _s, i); } - void _thread_build(int q, int thread_idx, ThreadBarrier& barrier, std::mutex& _nodes_mutex) { + void _thread_build(int q, int thread_idx, ThreadBarrier& barrier, std::mutex& nodes_mutex) { Random _random; // Each thread needs its own seed, otherwise each thread would be building the same tree(s) int seed = _is_seeded ? _seed + thread_idx : thread_idx; @@ -1250,7 +1243,7 @@ template // Wait for all threads to finish before we can start inserting tree nodes into global _nodes array barrier.wait(); - _nodes_mutex.lock(); + nodes_mutex.lock(); // When a thread wants to insert local tree nodes into global _nodes it has to stop pretending that there is // going to be only one tree. Each thread has to update all split nodes children that are pointing to other split nodes // because their indices will change once inserted into global _nodes. @@ -1283,7 +1276,7 @@ template thread_roots[tree_idx] += split_nodes_offset; } _roots.insert(_roots.end(), thread_roots.begin(), thread_roots.end()); - _nodes_mutex.unlock(); + nodes_mutex.unlock(); } S _make_tree(const vector& indices, vector& split_nodes, bool is_root, Random& _random) { From add300cc8863bd8ab1f553d1f88d11f3ac3a7e64 Mon Sep 17 00:00:00 2001 From: Rok Novosel Date: Fri, 31 Jul 2020 14:37:29 +0200 Subject: [PATCH 08/13] Simplify the multithreaded solution. Use n_threads=-1 as default which uses all CPU cores. Rename n_threads to n_jobs in the Python interface. --- README.rst | 2 +- examples/s_compile_cpp.sh | 2 +- setup.py | 2 +- src/annoygomodule.h | 2 +- src/annoylib.h | 204 ++++++++++++++----------------- src/annoyluamodule.cc | 2 +- src/annoymodule.cc | 10 +- test/multithreaded_build_test.py | 7 +- 8 files changed, 108 insertions(+), 123 deletions(-) diff --git a/README.rst b/README.rst index ffb7b8df..cd56057d 100644 --- a/README.rst +++ b/README.rst @@ -78,7 +78,7 @@ Full Python API * ``AnnoyIndex(f, metric)`` returns a new index that's read-write and stores vector of ``f`` dimensions. Metric can be ``"angular"``, ``"euclidean"``, ``"manhattan"``, ``"hamming"``, or ``"dot"``. * ``a.add_item(i, v)`` adds item ``i`` (any nonnegative integer) with vector ``v``. Note that it will allocate memory for ``max(i)+1`` items. -* ``a.build(n_trees)`` builds a forest of ``n_trees`` trees. More trees gives higher precision when querying. After calling ``build``, no more items can be added. +* ``a.build(n_trees, n_jobs=-1)`` builds a forest of ``n_trees`` trees. More trees gives higher precision when querying. After calling ``build``, no more items can be added. ``n_jobs`` specifies the number of threads used to build the trees. ``n_jobs=-1`` uses all available CPU cores. * ``a.save(fn, prefault=False)`` saves the index to disk and loads it (see next function). After saving, no more items can be added. * ``a.load(fn, prefault=False)`` loads (mmaps) an index from disk. If `prefault` is set to `True`, it will pre-read the entire file into memory (using mmap with `MAP_POPULATE`). Default is `False`. * ``a.unload()`` unloads. diff --git a/examples/s_compile_cpp.sh b/examples/s_compile_cpp.sh index 9a496392..2c69d55d 100755 --- a/examples/s_compile_cpp.sh +++ b/examples/s_compile_cpp.sh @@ -2,6 +2,6 @@ echo "compiling precision example..." -cmd="g++ precision_test.cpp -o precision_test -std=c++11 -pthread" +cmd="g++ precision_test.cpp -o precision_test -std=c++14 -pthread" eval $cmd echo "Done" diff --git a/setup.py b/setup.py index 71c2eaa5..466801d9 100644 --- a/setup.py +++ b/setup.py @@ -47,7 +47,7 @@ extra_compile_args += cputune if os.name != 'nt': - extra_compile_args += ['-std=c++11', '-O3', '-ffast-math', '-fno-associative-math'] + extra_compile_args += ['-std=c++14', '-O3', '-ffast-math', '-fno-associative-math'] # #349: something with OS X Mojave causes libstd not to be found if platform.system() == 'Darwin': diff --git a/src/annoygomodule.h b/src/annoygomodule.h index a96f7df0..a815d23b 100644 --- a/src/annoygomodule.h +++ b/src/annoygomodule.h @@ -16,7 +16,7 @@ class AnnoyIndex { void addItem(int item, const float* w) { ptr->add_item(item, w); }; - void build(int q, int n_threads=1) { + void build(int q, int n_threads=-1) { ptr->build(q, n_threads); }; bool save(const char* filename, bool prefault) { diff --git a/src/annoylib.h b/src/annoylib.h index 7985b164..95efbd57 100644 --- a/src/annoylib.h +++ b/src/annoylib.h @@ -57,9 +57,9 @@ typedef signed __int64 int64_t; #include #include #include -#include #include -#include +#include +#include #ifdef _MSC_VER // Needed for Visual Studio to disable runtime checks for mempcy @@ -814,32 +814,13 @@ struct Manhattan : Minkowski { } }; -class ThreadBarrier { -public: - explicit ThreadBarrier(std::size_t count) : _count(count) { } - - void wait() { - std::unique_lock lock(_mutex); - if (--_count == 0) { - _cv.notify_all(); - } else { - _cv.wait(lock, [this] { return _count == 0; }); - } - } - -private: - std::mutex _mutex; - std::condition_variable _cv; - std::size_t _count; -}; - template class AnnoyIndexInterface { public: // Note that the methods with an **error argument will allocate memory and write the pointer to that string if error is non-NULL virtual ~AnnoyIndexInterface() {}; virtual bool add_item(S item, const T* w, char** error=NULL) = 0; - virtual bool build(int q, int n_threads=1, char** error=NULL) = 0; + virtual bool build(int q, int n_threads=-1, char** error=NULL) = 0; virtual bool unbuild(char** error=NULL) = 0; virtual bool save(const char* filename, bool prefault=false, char** error=NULL) = 0; virtual void unload() = 0; @@ -952,7 +933,7 @@ template return true; } - bool build(int q, int n_threads=1, char** error=NULL) { + bool build(int q, int n_threads=-1, char** error=NULL) { if (_loaded) { set_error_from_string(error, "You can't build a loaded index"); return false; @@ -963,21 +944,33 @@ template return false; } - if (n_threads < 1) { - set_error_from_string(error, "You can't build an index with less than 1 thread"); - return false; + if (n_threads == -1) { + // If the hardware_concurrency() value is not well defined or not computable, it returns ​0. + // We guard against this by using at least 1 thread. + n_threads = std::max(1, (int)std::thread::hardware_concurrency()); } D::template preprocess(_nodes, _s, _n_items, _f); _n_nodes = _n_items; - std::mutex nodes_mutex; - ThreadBarrier barrier(n_threads); + std::shared_timed_mutex nodes_mutex; + std::mutex n_nodes_mutex; + std::mutex roots_mutex; + std::mutex nodes_size_mutex; vector threads(n_threads); - for (int i = 0; i < n_threads; i++) { - int trees_per_thread = q == -1 ? -1 : (int)floor((q + i) / n_threads); - threads[i] = std::thread(&AnnoyIndex::_thread_build, this, trees_per_thread, i, std::ref(barrier), std::ref(nodes_mutex)); + for (int thread_idx = 0; thread_idx < n_threads; thread_idx++) { + int trees_per_thread = q == -1 ? -1 : (int)floor((q + thread_idx) / n_threads); + threads[thread_idx] = std::thread( + &AnnoyIndex::_thread_build, + this, + trees_per_thread, + thread_idx, + std::ref(nodes_mutex), + std::ref(n_nodes_mutex), + std::ref(nodes_size_mutex), + std::ref(roots_mutex) + ); } for (auto& thread : threads) { @@ -1173,49 +1166,61 @@ template } protected: + void _reallocate_nodes(S n) { + const double reallocation_factor = 1.3; + S new_nodes_size = std::max(n, (S) ((_nodes_size + 1) * reallocation_factor)); + void *old = _nodes; + + if (_on_disk) { + if (!remap_memory_and_truncate(&_nodes, _fd, + static_cast(_s) * static_cast(_nodes_size), + static_cast(_s) * static_cast(new_nodes_size)) && + _verbose) + showUpdate("File truncation error\n"); + } else { + _nodes = realloc(_nodes, _s * new_nodes_size); + memset((char *) _nodes + (_nodes_size * _s) / sizeof(char), 0, (new_nodes_size - _nodes_size) * _s); + } + + _nodes_size = new_nodes_size; + if (_verbose) showUpdate("Reallocating to %d nodes: old_address=%p, new_address=%p\n", new_nodes_size, old, _nodes); + } + void _allocate_size(S n) { if (n > _nodes_size) { - const double reallocation_factor = 1.3; - S new_nodes_size = std::max(n, (S) ((_nodes_size + 1) * reallocation_factor)); - void *old = _nodes; - - if (_on_disk) { - if (!remap_memory_and_truncate(&_nodes, _fd, - static_cast(_s) * static_cast(_nodes_size), - static_cast(_s) * static_cast(new_nodes_size)) && - _verbose) - showUpdate("File truncation error\n"); - } else { - _nodes = realloc(_nodes, _s * new_nodes_size); - memset((char *) _nodes + (_nodes_size * _s) / sizeof(char), 0, (new_nodes_size - _nodes_size) * _s); - } - - _nodes_size = new_nodes_size; - if (_verbose) showUpdate("Reallocating to %d nodes: old_address=%p, new_address=%p\n", new_nodes_size, old, _nodes); + _reallocate_nodes(n); } } + void _allocate_size(S n, std::shared_timed_mutex& nodes_mutex, std::mutex& nodes_size_mutex) { + nodes_size_mutex.lock(); + if (n > _nodes_size) { + nodes_mutex.lock(); + _reallocate_nodes(n); + nodes_mutex.unlock(); + } + nodes_size_mutex.unlock(); + } + inline Node* _get(const S i) const { return get_node_ptr(_nodes, _s, i); } - void _thread_build(int q, int thread_idx, ThreadBarrier& barrier, std::mutex& nodes_mutex) { + void _thread_build(int q, int thread_idx, std::shared_timed_mutex& nodes_mutex, std::mutex& n_nodes_mutex, std::mutex& nodes_size_mutex, std::mutex& roots_mutex) { Random _random; // Each thread needs its own seed, otherwise each thread would be building the same tree(s) int seed = _is_seeded ? _seed + thread_idx : thread_idx; _random.set_seed(seed); - vector > thread_trees; vector thread_roots; while (1) { if (q == -1) { - size_t thread_n_nodes = 0; - for (size_t tree_idx = 0; tree_idx < thread_trees.size(); tree_idx++) { - thread_n_nodes += thread_trees[tree_idx].size(); - } - if (thread_n_nodes >= 2 * (size_t)_n_items) { + n_nodes_mutex.lock(); + if (_n_nodes >= 2 * _n_items) { + n_nodes_mutex.unlock(); break; } + n_nodes_mutex.unlock(); } else { if (thread_roots.size() >= (size_t)q) { break; @@ -1226,60 +1231,22 @@ template vector indices; for (S i = 0; i < _n_items; i++) { - if (_get(i)->n_descendants >= 1) // Issue #223 + nodes_mutex.lock_shared(); + if (_get(i)->n_descendants >= 1) { // Issue #223 indices.push_back(i); - } - - vector split_nodes; - // Each thread is essentially pretending to build only one tree that will get inserted - // right after the already inserted items. Indices of split nodes start with _n_items, n_items + 1, ... - // We do not want to mutate the _nodes array during tree construction due to reallocation issues. That is - // why each thread stores the trees locally until all threads are ready to insert them into _nodes. - S root_node = _make_tree(indices, split_nodes, true, _random); - thread_roots.push_back(root_node); - thread_trees.push_back(split_nodes); - } - - // Wait for all threads to finish before we can start inserting tree nodes into global _nodes array - barrier.wait(); - - nodes_mutex.lock(); - // When a thread wants to insert local tree nodes into global _nodes it has to stop pretending that there is - // going to be only one tree. Each thread has to update all split nodes children that are pointing to other split nodes - // because their indices will change once inserted into global _nodes. - for (size_t tree_idx = 0; tree_idx < thread_trees.size(); tree_idx++) { - vector split_nodes = thread_trees[tree_idx]; - // Offset from _n_items where split nodes will get inserted - S split_nodes_offset = _n_nodes - _n_items; - _allocate_size(_n_nodes + split_nodes.size()); - - for (size_t node_idx = 0; node_idx < split_nodes.size(); node_idx++) { - Node* split_node = split_nodes[node_idx]; - bool is_root = (size_t)thread_roots[tree_idx] == (_n_items + node_idx); - - // Inverted condition from _make_tree to detect split nodes - if ((split_node->n_descendants > _K) || (is_root && (size_t)_n_items > (size_t)_K && split_node->n_descendants > 1)) { - for (size_t child_idx = 0; child_idx < 2; child_idx++) { - // Update children offset if it is pointing to a split node - if (split_node->children[child_idx] >= _n_items) { - split_node->children[child_idx] += split_nodes_offset; - } - } } - - memcpy(_get(_n_nodes), split_node, _s); - free(split_node); - - _n_nodes += 1; + nodes_mutex.unlock_shared(); } - thread_roots[tree_idx] += split_nodes_offset; + thread_roots.push_back(_make_tree(indices, true, _random, nodes_mutex, n_nodes_mutex, nodes_size_mutex)); } + + roots_mutex.lock(); _roots.insert(_roots.end(), thread_roots.begin(), thread_roots.end()); - nodes_mutex.unlock(); + roots_mutex.unlock(); } - S _make_tree(const vector& indices, vector& split_nodes, bool is_root, Random& _random) { + S _make_tree(const vector& indices, bool is_root, Random& _random, std::shared_timed_mutex& nodes_mutex, std::mutex& n_nodes_mutex, std::mutex& nodes_size_mutex) { // The basic rule is that if we have <= _K items, then it's a leaf node, otherwise it's a split node. // There's some regrettable complications caused by the problem that root nodes have to be "special": // 1. We identify root nodes by the arguable logic that _n_items == n->n_descendants, regardless of how many descendants they actually have @@ -1289,8 +1256,13 @@ template return indices[0]; if (indices.size() <= (size_t)_K && (!is_root || (size_t)_n_items <= (size_t)_K || indices.size() == 1)) { - Node* m = (Node*)malloc(_s); - memset(m, 0, _s); + n_nodes_mutex.lock(); + _allocate_size(_n_nodes + 1, nodes_mutex, nodes_size_mutex); + S item = _n_nodes++; + n_nodes_mutex.unlock(); + + nodes_mutex.lock_shared(); + Node* m = _get(item); m->n_descendants = is_root ? _n_items : (S)indices.size(); // Using std::copy instead of a loop seems to resolve issues #3 and #13, @@ -1300,10 +1272,14 @@ template if (!indices.empty()) memcpy(m->children, &indices[0], indices.size() * sizeof(S)); - split_nodes.push_back(m); - return _n_items + (split_nodes.size() - 1); + nodes_mutex.unlock_shared(); + return item; } + vector children_indices[2]; + Node* m = (Node*)alloca(_s); + + nodes_mutex.lock_shared(); vector children; for (size_t i = 0; i < indices.size(); i++) { S j = indices[i]; @@ -1312,9 +1288,6 @@ template children.push_back(n); } - vector children_indices[2]; - Node* m = (Node*)malloc(_s); - memset(m, 0, _s); D::create_split(children, _f, _s, _random, m); for (size_t i = 0; i < indices.size(); i++) { @@ -1327,6 +1300,7 @@ template showUpdate("No node for index %d?\n", j); } } + nodes_mutex.unlock_shared(); // If we didn't find a hyperplane, just randomize sides as a last option while (children_indices[0].size() == 0 || children_indices[1].size() == 0) { @@ -1355,11 +1329,19 @@ template m->n_descendants = is_root ? _n_items : (S)indices.size(); for (int side = 0; side < 2; side++) { // run _make_tree for the smallest child first (for cache locality) - m->children[side^flip] = _make_tree(children_indices[side^flip], split_nodes, false, _random); + m->children[side^flip] = _make_tree(children_indices[side^flip], false, _random, nodes_mutex, n_nodes_mutex, nodes_size_mutex); } - split_nodes.push_back(m); - return _n_items + (split_nodes.size() - 1); + n_nodes_mutex.lock(); + _allocate_size(_n_nodes + 1, nodes_mutex, nodes_size_mutex); + S item = _n_nodes++; + n_nodes_mutex.unlock(); + + nodes_mutex.lock_shared(); + memcpy(_get(item), m, _s); + nodes_mutex.unlock_shared(); + + return item; } void _get_all_nns(const T* v, size_t n, int search_k, vector* result, vector* distances) const { diff --git a/src/annoyluamodule.cc b/src/annoyluamodule.cc index 3b5a0a69..0da84587 100644 --- a/src/annoyluamodule.cc +++ b/src/annoyluamodule.cc @@ -121,7 +121,7 @@ class LuaAnnoy { int nargs = lua_gettop(L); Impl* self = getAnnoy(L, 1); int n_trees = luaL_checkinteger(L, 2); - int n_threads = 1; + int n_threads = -1; if (nargs >= 3) { n_threads = luaL_checkinteger(L, 3); } diff --git a/src/annoymodule.cc b/src/annoymodule.cc index 830db2c0..294cbba8 100644 --- a/src/annoymodule.cc +++ b/src/annoymodule.cc @@ -408,17 +408,17 @@ py_an_on_disk_build(py_annoy *self, PyObject *args, PyObject *kwargs) { static PyObject * py_an_build(py_annoy *self, PyObject *args, PyObject *kwargs) { int q; - int n_threads = 1; + int n_jobs = -1; if (!self->ptr) return NULL; - static char const * kwlist[] = {"n_trees", "n_threads", NULL}; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "i|i", (char**)kwlist, &q, &n_threads)) + static char const * kwlist[] = {"n_trees", "n_jobs", NULL}; + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "i|i", (char**)kwlist, &q, &n_jobs)) return NULL; bool res; char* error; Py_BEGIN_ALLOW_THREADS; - res = self->ptr->build(q, n_threads, &error); + res = self->ptr->build(q, n_jobs, &error); Py_END_ALLOW_THREADS; if (!res) { PyErr_SetString(PyExc_Exception, error); @@ -528,7 +528,7 @@ static PyMethodDef AnnoyMethods[] = { {"get_item_vector",(PyCFunction)py_an_get_item_vector, METH_VARARGS, "Returns the vector for item `i` that was previously added."}, {"add_item",(PyCFunction)py_an_add_item, METH_VARARGS | METH_KEYWORDS, "Adds item `i` (any nonnegative integer) with vector `v`.\n\nNote that it will allocate memory for `max(i)+1` items."}, {"on_disk_build",(PyCFunction)py_an_on_disk_build, METH_VARARGS | METH_KEYWORDS, "Build will be performed with storage on disk instead of RAM."}, - {"build",(PyCFunction)py_an_build, METH_VARARGS | METH_KEYWORDS, "Builds a forest of `n_trees` trees.\n\nMore trees give higher precision when querying. After calling `build`,\nno more items can be added."}, + {"build",(PyCFunction)py_an_build, METH_VARARGS | METH_KEYWORDS, "Builds a forest of `n_trees` trees.\n\nMore trees give higher precision when querying. After calling `build`,\nno more items can be added. `n_jobs` specifies the number of threads used to build the trees. `n_jobs=-1` uses all available CPU cores."}, {"unbuild",(PyCFunction)py_an_unbuild, METH_NOARGS, "Unbuilds the tree in order to allows adding new items.\n\nbuild() has to be called again afterwards in order to\nrun queries."}, {"unload",(PyCFunction)py_an_unload, METH_NOARGS, "Unloads an index from disk."}, {"get_distance",(PyCFunction)py_an_get_distance, METH_VARARGS, "Returns the distance between items `i` and `j`."}, diff --git a/test/multithreaded_build_test.py b/test/multithreaded_build_test.py index f46b79de..0fc9f24f 100644 --- a/test/multithreaded_build_test.py +++ b/test/multithreaded_build_test.py @@ -4,15 +4,18 @@ class MultithreadedBuildTest(unittest.TestCase): - def _test_building_with_threads(self, n_threads): + def _test_building_with_threads(self, n_jobs): n, f = 10000, 10 n_trees = 31 i = AnnoyIndex(f, 'euclidean') for j in range(n): i.add_item(j, numpy.random.normal(size=f)) - self.assertTrue(i.build(n_trees, n_threads=n_threads)) + self.assertTrue(i.build(n_trees, n_jobs=n_jobs)) self.assertEqual(n_trees, i.get_n_trees()) + def test_one_thread(self): + self._test_building_with_threads(1) + def test_two_threads(self): self._test_building_with_threads(2) From 51e5768b3d632126adc2f205ed11ab95e76a96fe Mon Sep 17 00:00:00 2001 From: Rok Novosel Date: Fri, 31 Jul 2020 14:51:00 +0200 Subject: [PATCH 09/13] Bump macosx version to 10.12. --- setup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 466801d9..d589bf5e 100644 --- a/setup.py +++ b/setup.py @@ -51,8 +51,8 @@ # #349: something with OS X Mojave causes libstd not to be found if platform.system() == 'Darwin': - extra_compile_args += ['-mmacosx-version-min=10.9'] - extra_link_args += ['-stdlib=libc++', '-mmacosx-version-min=10.9'] + extra_compile_args += ['-mmacosx-version-min=10.12'] + extra_link_args += ['-stdlib=libc++', '-mmacosx-version-min=10.12'] # Manual configuration, you're on your own here. manual_compiler_args = os.environ.get('ANNOY_COMPILER_ARGS', None) From 284dda956df8da436046a3c3c8c7d5971ebb3b06 Mon Sep 17 00:00:00 2001 From: Rok Novosel Date: Sun, 2 Aug 2020 18:13:25 +0200 Subject: [PATCH 10/13] Extracted threading specific code into single threaded and multi threaded policies. Added ANNOYLIB_MULTITHREADED_BUILD which controls which threading policy to use. --- examples/precision_test.cpp | 2 +- examples/s_compile_cpp.sh | 2 +- setup.py | 12 +- src/annoygomodule.h | 10 +- src/annoylib.h | 241 ++++++++++++++++++++++-------------- src/annoyluamodule.cc | 8 +- src/annoymodule.cc | 17 ++- 7 files changed, 181 insertions(+), 111 deletions(-) diff --git a/examples/precision_test.cpp b/examples/precision_test.cpp index 2c006487..2c9bb907 100644 --- a/examples/precision_test.cpp +++ b/examples/precision_test.cpp @@ -25,7 +25,7 @@ int precision(int f=40, int n=1000000){ //****************************************************** //Building the tree - AnnoyIndex t = AnnoyIndex(f); + AnnoyIndex t = AnnoyIndex(f); std::cout << "Building index ... be patient !!" << std::endl; std::cout << "\"Trees that are slow to grow bear the best fruit\" (Moliere)" << std::endl; diff --git a/examples/s_compile_cpp.sh b/examples/s_compile_cpp.sh index 2c69d55d..b1dd341d 100755 --- a/examples/s_compile_cpp.sh +++ b/examples/s_compile_cpp.sh @@ -2,6 +2,6 @@ echo "compiling precision example..." -cmd="g++ precision_test.cpp -o precision_test -std=c++14 -pthread" +cmd="g++ precision_test.cpp -DANNOYLIB_MULTITHREADED_BUILD -o precision_test -std=c++14 -pthread" eval $cmd echo "Done" diff --git a/setup.py b/setup.py index d589bf5e..7d46d619 100644 --- a/setup.py +++ b/setup.py @@ -19,6 +19,7 @@ import codecs import os import platform +import sys readme_note = """\ .. note:: @@ -47,7 +48,16 @@ extra_compile_args += cputune if os.name != 'nt': - extra_compile_args += ['-std=c++14', '-O3', '-ffast-math', '-fno-associative-math'] + extra_compile_args += ['-O3', '-ffast-math', '-fno-associative-math'] + +# Add multithreaded build flag for all platforms using Python 3 and +# for non-Windows Python 2 platforms +python_major_version = sys.version_info[0] +if python_major_version == 3 or (python_major_version == 2 and os.name != 'nt'): + extra_compile_args += ['-DANNOYLIB_MULTITHREADED_BUILD'] + + if os.name != 'nt': + extra_compile_args += ['-std=c++14'] # #349: something with OS X Mojave causes libstd not to be found if platform.system() == 'Darwin': diff --git a/src/annoygomodule.h b/src/annoygomodule.h index a815d23b..a25574e1 100644 --- a/src/annoygomodule.h +++ b/src/annoygomodule.h @@ -16,8 +16,8 @@ class AnnoyIndex { void addItem(int item, const float* w) { ptr->add_item(item, w); }; - void build(int q, int n_threads=-1) { - ptr->build(q, n_threads); + void build(int q) { + ptr->build(q, 1); }; bool save(const char* filename, bool prefault) { return ptr->save(filename, prefault); @@ -69,7 +69,7 @@ class AnnoyIndexAngular : public AnnoyIndex { public: AnnoyIndexAngular(int f) { - ptr = new ::AnnoyIndex(f); + ptr = new ::AnnoyIndex(f); this->f = f; } }; @@ -77,7 +77,7 @@ class AnnoyIndexAngular : public AnnoyIndex class AnnoyIndexEuclidean : public AnnoyIndex { public: AnnoyIndexEuclidean(int f) { - ptr = new ::AnnoyIndex(f); + ptr = new ::AnnoyIndex(f); this->f = f; } }; @@ -85,7 +85,7 @@ class AnnoyIndexEuclidean : public AnnoyIndex { class AnnoyIndexManhattan : public AnnoyIndex { public: AnnoyIndexManhattan(int f) { - ptr = new ::AnnoyIndex(f); + ptr = new ::AnnoyIndex(f); this->f = f; } }; diff --git a/src/annoylib.h b/src/annoylib.h index 95efbd57..1d641eee 100644 --- a/src/annoylib.h +++ b/src/annoylib.h @@ -57,9 +57,12 @@ typedef signed __int64 int64_t; #include #include #include + +#ifdef ANNOYLIB_MULTITHREADED_BUILD #include #include #include +#endif #ifdef _MSC_VER // Needed for Visual Studio to disable runtime checks for mempcy @@ -135,8 +138,6 @@ using std::vector; using std::pair; using std::numeric_limits; using std::make_pair; -using std::mutex; -using std::thread; inline bool remap_memory_and_truncate(void** _ptr, int _fd, size_t old_size, size_t new_size) { #ifdef __linux__ @@ -836,7 +837,7 @@ class AnnoyIndexInterface { virtual bool on_disk_build(const char* filename, char** error=NULL) = 0; }; -template +template class AnnoyIndex : public AnnoyIndexInterface { /* * We use random projection to build a forest of binary trees of all items. @@ -944,38 +945,11 @@ template return false; } - if (n_threads == -1) { - // If the hardware_concurrency() value is not well defined or not computable, it returns ​0. - // We guard against this by using at least 1 thread. - n_threads = std::max(1, (int)std::thread::hardware_concurrency()); - } - D::template preprocess(_nodes, _s, _n_items, _f); _n_nodes = _n_items; - std::shared_timed_mutex nodes_mutex; - std::mutex n_nodes_mutex; - std::mutex roots_mutex; - std::mutex nodes_size_mutex; - vector threads(n_threads); - for (int thread_idx = 0; thread_idx < n_threads; thread_idx++) { - int trees_per_thread = q == -1 ? -1 : (int)floor((q + thread_idx) / n_threads); - threads[thread_idx] = std::thread( - &AnnoyIndex::_thread_build, - this, - trees_per_thread, - thread_idx, - std::ref(nodes_mutex), - std::ref(n_nodes_mutex), - std::ref(nodes_size_mutex), - std::ref(roots_mutex) - ); - } - - for (auto& thread : threads) { - thread.join(); - } + ThreadedBuildPolicy::template build(this, q, n_threads); // Also, copy the roots into the last segment of the array // This way we can load them faster without reading the whole file @@ -1165,6 +1139,46 @@ template _seed = seed; } + void thread_build(int q, int thread_idx, ThreadedBuildPolicy& threaded_build_policy) { + Random _random; + // Each thread needs its own seed, otherwise each thread would be building the same tree(s) + int seed = _is_seeded ? _seed + thread_idx : thread_idx; + _random.set_seed(seed); + + vector thread_roots; + while (1) { + if (q == -1) { + threaded_build_policy.lock_n_nodes(); + if (_n_nodes >= 2 * _n_items) { + threaded_build_policy.unlock_n_nodes(); + break; + } + threaded_build_policy.unlock_n_nodes(); + } else { + if (thread_roots.size() >= (size_t)q) { + break; + } + } + + if (_verbose) showUpdate("pass %zd...\n", thread_roots.size()); + + vector indices; + for (S i = 0; i < _n_items; i++) { + threaded_build_policy.lock_shared_nodes(); + if (_get(i)->n_descendants >= 1) { // Issue #223 + indices.push_back(i); + } + threaded_build_policy.unlock_shared_nodes(); + } + + thread_roots.push_back(_make_tree(indices, true, _random, threaded_build_policy)); + } + + threaded_build_policy.lock_roots(); + _roots.insert(_roots.end(), thread_roots.begin(), thread_roots.end()); + threaded_build_policy.unlock_roots(); + } + protected: void _reallocate_nodes(S n) { const double reallocation_factor = 1.3; @@ -1186,67 +1200,25 @@ template if (_verbose) showUpdate("Reallocating to %d nodes: old_address=%p, new_address=%p\n", new_nodes_size, old, _nodes); } - void _allocate_size(S n) { + void _allocate_size(S n, ThreadedBuildPolicy& threaded_build_policy) { if (n > _nodes_size) { + threaded_build_policy.lock_nodes(); _reallocate_nodes(n); + threaded_build_policy.unlock_nodes(); } } - void _allocate_size(S n, std::shared_timed_mutex& nodes_mutex, std::mutex& nodes_size_mutex) { - nodes_size_mutex.lock(); + void _allocate_size(S n) { if (n > _nodes_size) { - nodes_mutex.lock(); _reallocate_nodes(n); - nodes_mutex.unlock(); } - nodes_size_mutex.unlock(); } inline Node* _get(const S i) const { return get_node_ptr(_nodes, _s, i); } - void _thread_build(int q, int thread_idx, std::shared_timed_mutex& nodes_mutex, std::mutex& n_nodes_mutex, std::mutex& nodes_size_mutex, std::mutex& roots_mutex) { - Random _random; - // Each thread needs its own seed, otherwise each thread would be building the same tree(s) - int seed = _is_seeded ? _seed + thread_idx : thread_idx; - _random.set_seed(seed); - - vector thread_roots; - while (1) { - if (q == -1) { - n_nodes_mutex.lock(); - if (_n_nodes >= 2 * _n_items) { - n_nodes_mutex.unlock(); - break; - } - n_nodes_mutex.unlock(); - } else { - if (thread_roots.size() >= (size_t)q) { - break; - } - } - - if (_verbose) showUpdate("pass %zd...\n", thread_roots.size()); - - vector indices; - for (S i = 0; i < _n_items; i++) { - nodes_mutex.lock_shared(); - if (_get(i)->n_descendants >= 1) { // Issue #223 - indices.push_back(i); - } - nodes_mutex.unlock_shared(); - } - - thread_roots.push_back(_make_tree(indices, true, _random, nodes_mutex, n_nodes_mutex, nodes_size_mutex)); - } - - roots_mutex.lock(); - _roots.insert(_roots.end(), thread_roots.begin(), thread_roots.end()); - roots_mutex.unlock(); - } - - S _make_tree(const vector& indices, bool is_root, Random& _random, std::shared_timed_mutex& nodes_mutex, std::mutex& n_nodes_mutex, std::mutex& nodes_size_mutex) { + S _make_tree(const vector& indices, bool is_root, Random& _random, ThreadedBuildPolicy& threaded_build_policy) { // The basic rule is that if we have <= _K items, then it's a leaf node, otherwise it's a split node. // There's some regrettable complications caused by the problem that root nodes have to be "special": // 1. We identify root nodes by the arguable logic that _n_items == n->n_descendants, regardless of how many descendants they actually have @@ -1256,12 +1228,12 @@ template return indices[0]; if (indices.size() <= (size_t)_K && (!is_root || (size_t)_n_items <= (size_t)_K || indices.size() == 1)) { - n_nodes_mutex.lock(); - _allocate_size(_n_nodes + 1, nodes_mutex, nodes_size_mutex); + threaded_build_policy.lock_n_nodes(); + _allocate_size(_n_nodes + 1, threaded_build_policy); S item = _n_nodes++; - n_nodes_mutex.unlock(); + threaded_build_policy.unlock_n_nodes(); - nodes_mutex.lock_shared(); + threaded_build_policy.lock_shared_nodes(); Node* m = _get(item); m->n_descendants = is_root ? _n_items : (S)indices.size(); @@ -1272,14 +1244,14 @@ template if (!indices.empty()) memcpy(m->children, &indices[0], indices.size() * sizeof(S)); - nodes_mutex.unlock_shared(); + threaded_build_policy.unlock_shared_nodes(); return item; } vector children_indices[2]; Node* m = (Node*)alloca(_s); - nodes_mutex.lock_shared(); + threaded_build_policy.lock_shared_nodes(); vector children; for (size_t i = 0; i < indices.size(); i++) { S j = indices[i]; @@ -1300,7 +1272,7 @@ template showUpdate("No node for index %d?\n", j); } } - nodes_mutex.unlock_shared(); + threaded_build_policy.unlock_shared_nodes(); // If we didn't find a hyperplane, just randomize sides as a last option while (children_indices[0].size() == 0 || children_indices[1].size() == 0) { @@ -1329,17 +1301,17 @@ template m->n_descendants = is_root ? _n_items : (S)indices.size(); for (int side = 0; side < 2; side++) { // run _make_tree for the smallest child first (for cache locality) - m->children[side^flip] = _make_tree(children_indices[side^flip], false, _random, nodes_mutex, n_nodes_mutex, nodes_size_mutex); + m->children[side^flip] = _make_tree(children_indices[side^flip], false, _random, threaded_build_policy); } - n_nodes_mutex.lock(); - _allocate_size(_n_nodes + 1, nodes_mutex, nodes_size_mutex); + threaded_build_policy.lock_n_nodes(); + _allocate_size(_n_nodes + 1, threaded_build_policy); S item = _n_nodes++; - n_nodes_mutex.unlock(); + threaded_build_policy.unlock_n_nodes(); - nodes_mutex.lock_shared(); + threaded_build_policy.lock_shared_nodes(); memcpy(_get(item), m, _s); - nodes_mutex.unlock_shared(); + threaded_build_policy.unlock_shared_nodes(); return item; } @@ -1404,5 +1376,92 @@ template } }; +class AnnoyIndexSingleThreadedBuildPolicy { +public: + template + static void build(AnnoyIndex* annoy, int q, int n_threads) { + AnnoyIndexSingleThreadedBuildPolicy threaded_build_policy; + annoy->thread_build(q, 0, threaded_build_policy); + } + + void lock_n_nodes() {} + void unlock_n_nodes() {} + + void lock_nodes() {} + void unlock_nodes() {} + + void lock_shared_nodes() {} + void unlock_shared_nodes() {} + + void lock_roots() {} + void unlock_roots() {} +}; + +#ifdef ANNOYLIB_MULTITHREADED_BUILD +class AnnoyIndexMultiThreadedBuildPolicy { +private: + std::shared_timed_mutex nodes_mutex; + std::mutex n_nodes_mutex; + std::mutex roots_mutex; + +public: + template + static void build(AnnoyIndex* annoy, int q, int n_threads) { + AnnoyIndexMultiThreadedBuildPolicy threaded_build_policy; + if (n_threads == -1) { + // If the hardware_concurrency() value is not well defined or not computable, it returns 0. + // We guard against this by using at least 1 thread. + n_threads = std::max(1, (int)std::thread::hardware_concurrency()); + } + + vector threads(n_threads); + + for (int thread_idx = 0; thread_idx < n_threads; thread_idx++) { + int trees_per_thread = q == -1 ? -1 : (int)floor((q + thread_idx) / n_threads); + + threads[thread_idx] = std::thread( + &AnnoyIndex::thread_build, + annoy, + trees_per_thread, + thread_idx, + std::ref(threaded_build_policy) + ); + } + + for (auto& thread : threads) { + thread.join(); + } + } + + void lock_n_nodes() { + n_nodes_mutex.lock(); + } + void unlock_n_nodes() { + n_nodes_mutex.unlock(); + } + + void lock_nodes() { + nodes_mutex.lock(); + } + void unlock_nodes() { + nodes_mutex.unlock(); + } + + void lock_shared_nodes() { + nodes_mutex.lock_shared(); + } + void unlock_shared_nodes() { + nodes_mutex.unlock_shared(); + } + + void lock_roots() { + roots_mutex.lock(); + } + void unlock_roots() { + roots_mutex.unlock(); + } +}; +#endif + #endif // vim: tabstop=2 shiftwidth=2 diff --git a/src/annoyluamodule.cc b/src/annoyluamodule.cc index 0da84587..c383424c 100644 --- a/src/annoyluamodule.cc +++ b/src/annoyluamodule.cc @@ -33,7 +33,7 @@ class LuaAnnoy { public: typedef int32_t AnnoyS; typedef float AnnoyT; - typedef AnnoyIndex Impl; + typedef AnnoyIndex Impl; typedef LuaAnnoy ThisClass; class LuaArrayProxy { @@ -121,11 +121,7 @@ class LuaAnnoy { int nargs = lua_gettop(L); Impl* self = getAnnoy(L, 1); int n_trees = luaL_checkinteger(L, 2); - int n_threads = -1; - if (nargs >= 3) { - n_threads = luaL_checkinteger(L, 3); - } - self->build(n_trees, n_threads); + self->build(n_trees, 1); lua_pushboolean(L, true); return 1; } diff --git a/src/annoymodule.cc b/src/annoymodule.cc index 294cbba8..572b713e 100644 --- a/src/annoymodule.cc +++ b/src/annoymodule.cc @@ -54,6 +54,11 @@ typedef signed __int32 int32_t; #define PyInt_FromLong PyLong_FromLong #endif +#ifdef ANNOYLIB_MULTITHREADED_BUILD + typedef AnnoyIndexMultiThreadedBuildPolicy AnnoyIndexThreadedBuildPolicy; +#else + typedef AnnoyIndexSingleThreadedBuildPolicy AnnoyIndexThreadedBuildPolicy; +#endif template class AnnoyIndexInterface; @@ -63,7 +68,7 @@ class HammingWrapper : public AnnoyIndexInterface { // This is questionable from a performance point of view. Should reconsider this solution. private: int32_t _f_external, _f_internal; - AnnoyIndex _index; + AnnoyIndex _index; void _pack(const float* src, uint64_t* dst) const { for (int32_t i = 0; i < _f_internal; i++) { dst[i] = 0; @@ -145,17 +150,17 @@ py_an_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) { // This keeps coming up, see #368 etc PyErr_WarnEx(PyExc_FutureWarning, "The default argument for metric will be removed " "in future version of Annoy. Please pass metric='angular' explicitly.", 1); - self->ptr = new AnnoyIndex(self->f); + self->ptr = new AnnoyIndex(self->f); } else if (!strcmp(metric, "angular")) { - self->ptr = new AnnoyIndex(self->f); + self->ptr = new AnnoyIndex(self->f); } else if (!strcmp(metric, "euclidean")) { - self->ptr = new AnnoyIndex(self->f); + self->ptr = new AnnoyIndex(self->f); } else if (!strcmp(metric, "manhattan")) { - self->ptr = new AnnoyIndex(self->f); + self->ptr = new AnnoyIndex(self->f); } else if (!strcmp(metric, "hamming")) { self->ptr = new HammingWrapper(self->f); } else if (!strcmp(metric, "dot")) { - self->ptr = new AnnoyIndex(self->f); + self->ptr = new AnnoyIndex(self->f); } else { PyErr_SetString(PyExc_ValueError, "No such metric"); return NULL; From 26d3756a6f8d440c14893beacf4c339462412ff3 Mon Sep 17 00:00:00 2001 From: Rok Novosel Date: Sun, 2 Aug 2020 19:39:22 +0200 Subject: [PATCH 11/13] Set n_jobs=1 in test_very_large_index due to OSX. --- test/index_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/index_test.py b/test/index_test.py index 048a6315..96506450 100644 --- a/test/index_test.py +++ b/test/index_test.py @@ -235,7 +235,7 @@ def test_very_large_index(self): for i in range(100): m.add_item(n_vectors+i, [random.gauss(0, 1) for z in range(f)]) n_trees = 10 - m.build(n_trees) + m.build(n_trees, n_jobs=1) path = 'test_big.annoy' m.save(path) # Raises on Windows From 16cbf70876967bd1e7c12910da224495719617b2 Mon Sep 17 00:00:00 2001 From: Rok Novosel Date: Sun, 2 Aug 2020 20:19:11 +0200 Subject: [PATCH 12/13] Move nodes lock around the loop when adding indices. Remove n_jobs=1 because OSX should be faster now. --- src/annoylib.h | 4 ++-- test/index_test.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/annoylib.h b/src/annoylib.h index 82dd6024..3fcf89fa 100644 --- a/src/annoylib.h +++ b/src/annoylib.h @@ -1163,13 +1163,13 @@ template indices; + threaded_build_policy.lock_shared_nodes(); for (S i = 0; i < _n_items; i++) { - threaded_build_policy.lock_shared_nodes(); if (_get(i)->n_descendants >= 1) { // Issue #223 indices.push_back(i); } - threaded_build_policy.unlock_shared_nodes(); } + threaded_build_policy.unlock_shared_nodes(); thread_roots.push_back(_make_tree(indices, true, _random, threaded_build_policy)); } diff --git a/test/index_test.py b/test/index_test.py index 96506450..048a6315 100644 --- a/test/index_test.py +++ b/test/index_test.py @@ -235,7 +235,7 @@ def test_very_large_index(self): for i in range(100): m.add_item(n_vectors+i, [random.gauss(0, 1) for z in range(f)]) n_trees = 10 - m.build(n_trees, n_jobs=1) + m.build(n_trees) path = 'test_big.annoy' m.save(path) # Raises on Windows From bf51acf5d3c3320b6eb47131682d24b78a0d413f Mon Sep 17 00:00:00 2001 From: Rok Novosel Date: Mon, 3 Aug 2020 11:16:13 +0200 Subject: [PATCH 13/13] Update READMEs and Lua rockspec. --- README_GO.rst | 2 +- README_Lua.md | 2 +- annoy-dev-1.rockspec | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/README_GO.rst b/README_GO.rst index 2f2c1ca8..35ebfd07 100644 --- a/README_GO.rst +++ b/README_GO.rst @@ -58,7 +58,7 @@ Right now it only accepts integers as identifiers for items. Note that it will a Full Go API --------------- -See annoygomodule.h. Generally the same as Python API except some arguments are not optional. +See annoygomodule.h. Generally the same as Python API except some arguments are not optional. Go binding does not support multithreaded build. Tests ------- diff --git a/README_Lua.md b/README_Lua.md index b24a567f..cab0583a 100644 --- a/README_Lua.md +++ b/README_Lua.md @@ -64,7 +64,7 @@ end Full Lua API ------------ -Lua API closely resembles Python API, see main README. +Lua API closely resembles Python API, see main README. Lua binding does not support multithreaded build. Tests diff --git a/annoy-dev-1.rockspec b/annoy-dev-1.rockspec index 58cc796c..8e3acea7 100644 --- a/annoy-dev-1.rockspec +++ b/annoy-dev-1.rockspec @@ -44,14 +44,14 @@ build = { unix = { modules = { ['annoy'] = { - libraries = {"stdc++", "pthread"}, + libraries = {"stdc++"}, }, }, }, mingw32 = { modules = { ['annoy'] = { - libraries = {"stdc++", "pthread"}, + libraries = {"stdc++"}, }, }, },