Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multithreaded build #495

Merged
merged 14 commits into from Aug 3, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions annoy-dev-1.rockspec
Expand Up @@ -44,14 +44,14 @@ build = {
unix = {
modules = {
['annoy'] = {
libraries = {"stdc++"},
libraries = {"stdc++", "pthread"},
},
},
},
mingw32 = {
modules = {
['annoy'] = {
libraries = {"stdc++"},
libraries = {"stdc++", "pthread"},
},
},
},
Expand Down
2 changes: 1 addition & 1 deletion examples/s_compile_cpp.sh
Expand Up @@ -2,6 +2,6 @@


echo "compiling precision example..."
cmd="g++ precision_test.cpp -o precision_test -std=c++11"
cmd="g++ precision_test.cpp -o precision_test -std=c++11 -pthread"
eval $cmd
echo "Done"
4 changes: 2 additions & 2 deletions setup.py
Expand Up @@ -47,11 +47,11 @@
extra_compile_args += cputune

if os.name != 'nt':
extra_compile_args += ['-O3', '-ffast-math', '-fno-associative-math']
extra_compile_args += ['-std=c++11', '-O3', '-ffast-math', '-fno-associative-math']

# #349: something with OS X Mojave causes libstd not to be found
if platform.system() == 'Darwin':
extra_compile_args += ['-std=c++11', '-mmacosx-version-min=10.9']
extra_compile_args += ['-mmacosx-version-min=10.9']
extra_link_args += ['-stdlib=libc++', '-mmacosx-version-min=10.9']

# Manual configuration, you're on your own here.
Expand Down
4 changes: 2 additions & 2 deletions src/annoygomodule.h
Expand Up @@ -16,8 +16,8 @@ class AnnoyIndex {
void addItem(int item, const float* w) {
ptr->add_item(item, w);
};
void build(int q) {
ptr->build(q);
void build(int q, int n_threads=1) {
ptr->build(q, n_threads);
};
bool save(const char* filename, bool prefault) {
return ptr->save(filename, prefault);
Expand Down
181 changes: 147 additions & 34 deletions src/annoylib.h
Expand Up @@ -57,6 +57,9 @@ typedef signed __int64 int64_t;
#include <algorithm>
#include <queue>
#include <limits>
#include <mutex>
#include <thread>
#include <condition_variable>

#ifdef _MSC_VER
// Needed for Visual Studio to disable runtime checks for mempcy
Expand Down Expand Up @@ -104,7 +107,6 @@ inline void set_error_from_string(char **error, const char* msg) {
#ifndef _MSC_VER
#define popcount __builtin_popcountll
#else // See #293, #358
#define isnan(x) _isnan(x)
#define popcount cole_popcount
#endif

Expand Down Expand Up @@ -133,6 +135,8 @@ using std::vector;
using std::pair;
using std::numeric_limits;
using std::make_pair;
using std::mutex;
using std::thread;

inline bool remap_memory_and_truncate(void** _ptr, int _fd, size_t old_size, size_t new_size) {
#ifdef __linux__
Expand Down Expand Up @@ -594,8 +598,8 @@ struct DotProduct : Angular {
// Step one: compute the norm of each vector and store that in its extra dimension (f-1)
for (S i = 0; i < node_count; i++) {
Node* node = get_node_ptr<S, Node>(nodes, _s, i);
T norm = sqrt(dot(node->v, node->v, f));
if (isnan(norm)) norm = 0;
T d = dot(node->v, node->v, f);
T norm = d < 0 ? 0 : sqrt(d);
node->dot_factor = norm;
}

Expand All @@ -612,9 +616,8 @@ struct DotProduct : Angular {
for (S i = 0; i < node_count; i++) {
Node* node = get_node_ptr<S, Node>(nodes, _s, i);
T node_norm = node->dot_factor;

T dot_factor = sqrt(pow(max_norm, static_cast<T>(2.0)) - pow(node_norm, static_cast<T>(2.0)));
if (isnan(dot_factor)) dot_factor = 0;
T squared_norm_diff = pow(max_norm, static_cast<T>(2.0)) - pow(node_norm, static_cast<T>(2.0));
T dot_factor = squared_norm_diff < 0 ? 0 : sqrt(squared_norm_diff);

node->dot_factor = dot_factor;
}
Expand Down Expand Up @@ -811,13 +814,32 @@ struct Manhattan : Minkowski {
}
};

class ThreadBarrier {
public:
explicit ThreadBarrier(std::size_t count) : _count(count) { }

void wait() {
std::unique_lock<std::mutex> lock(_mutex);
if (--_count == 0) {
_cv.notify_all();
} else {
_cv.wait(lock, [this] { return _count == 0; });
}
}

private:
std::mutex _mutex;
std::condition_variable _cv;
std::size_t _count;
};

template<typename S, typename T>
class AnnoyIndexInterface {
public:
// Note that the methods with an **error argument will allocate memory and write the pointer to that string if error is non-NULL
virtual ~AnnoyIndexInterface() {};
virtual bool add_item(S item, const T* w, char** error=NULL) = 0;
virtual bool build(int q, char** error=NULL) = 0;
virtual bool build(int q, int n_threads=1, char** error=NULL) = 0;
virtual bool unbuild(char** error=NULL) = 0;
virtual bool save(const char* filename, bool prefault=false, char** error=NULL) = 0;
virtual void unload() = 0;
Expand Down Expand Up @@ -850,20 +872,21 @@ template<typename S, typename T, typename Distance, typename Random>
const int _f;
size_t _s;
S _n_items;
Random _random;
void* _nodes; // Could either be mmapped, or point to a memory buffer that we reallocate
S _n_nodes;
S _nodes_size;
vector<S> _roots;
S _K;
bool _is_seeded;
int _seed;
bool _loaded;
bool _verbose;
int _fd;
bool _on_disk;
bool _built;
public:

AnnoyIndex(int f) : _f(f), _random() {
AnnoyIndex(int f) : _f(f) {
_s = offsetof(Node, v) + _f * sizeof(T); // Size of each node
_verbose = false;
_built = false;
Expand Down Expand Up @@ -929,7 +952,7 @@ template<typename S, typename T, typename Distance, typename Random>
return true;
}

bool build(int q, char** error=NULL) {
bool build(int q, int n_threads=1, char** error=NULL) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be good to default it to -1 and if that's the case set it to the number of cpus

if (_loaded) {
set_error_from_string(error, "You can't build a loaded index");
return false;
Expand All @@ -940,23 +963,32 @@ template<typename S, typename T, typename Distance, typename Random>
return false;
}

if (n_threads < 1) {
set_error_from_string(error, "You can't build an index with less than 1 thread");
return false;
}

D::template preprocess<T, S, Node>(_nodes, _s, _n_items, _f);

_n_nodes = _n_items;
while (1) {
if (q == -1 && _n_nodes >= _n_items * 2)
break;
if (q != -1 && _roots.size() >= (size_t)q)
break;
if (_verbose) showUpdate("pass %zd...\n", _roots.size());

vector<S> indices;
for (S i = 0; i < _n_items; i++) {
if (_get(i)->n_descendants >= 1) // Issue #223
indices.push_back(i);
std::mutex _nodes_mutex;
ThreadBarrier barrier(n_threads);
vector<std::thread> threads(n_threads);
int work_per_thread = (int)floor(q / (double)n_threads);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if q = -1? you probably have to determine q before this code in case it's set to -1

int work_remainder = q % n_threads;
for (int i = 0; i < n_threads; i++) {
int trees_per_thread = -1;
if (q > -1) {
// First thread picks up the remainder of the work
Copy link
Collaborator

@erikbern erikbern Jul 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might lead to an uneven split, eg if q=20 and n_threads=7 then the distribution will be [8, 2, 2, 2, 2, 2, 2]

I think you can do trees_per_thread = work_per_thread + (i < work_remainder) and it should split it like [3, 3, 3, 3, 3, 3, 2]

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually even simpler you can just to trees_per_thread = (q + i) / n_threads and you don't even have to define work_pre_thread

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, I was not exactly happy with my solution, so I'll reuse this.

trees_per_thread = i == 0 ? work_per_thread + work_remainder : work_per_thread;
}

_roots.push_back(_make_tree(indices, true));
threads[i] = std::thread(&AnnoyIndex<S, T, D, Random>::_thread_build, this, trees_per_thread, i, std::ref(barrier), std::ref(_nodes_mutex));
}

for (auto& thread : threads) {
thread.join();
}

// Also, copy the roots into the last segment of the array
Expand Down Expand Up @@ -1035,6 +1067,7 @@ template<typename S, typename T, typename Distance, typename Random>
_n_nodes = 0;
_nodes_size = 0;
_on_disk = false;
_is_seeded = false;
_roots.clear();
}

Expand Down Expand Up @@ -1142,7 +1175,8 @@ template<typename S, typename T, typename Distance, typename Random>
}

void set_seed(int seed) {
_random.set_seed(seed);
_is_seeded = true;
_seed = seed;
}

protected:
Expand Down Expand Up @@ -1172,7 +1206,87 @@ template<typename S, typename T, typename Distance, typename Random>
return get_node_ptr<S, Node>(_nodes, _s, i);
}

S _make_tree(const vector<S >& indices, bool is_root) {
void _thread_build(int q, int thread_idx, ThreadBarrier& barrier, std::mutex& _nodes_mutex) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it would be good to have the same code for threaded and non-threaded building

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not exactly sure what you mean by this: should we still support building without threads (e.g. when n_threads=1)?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd actually be ok only supporting threaded build so that we can remove the non-threaded build code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm probably missing something, but _thread_build is just a private helper function that was extracted from the previous build function. It is there just so I don't have to stuck everything inside a lambda function when spawning a new thread.

Could you point out which part of the code could be potentially removed? 😄

Copy link
Collaborator

@erikbern erikbern Jul 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think _thread_build could be a pretty simple wrapper around _make_tree. I think the only two places that touch any state are:

I don't think you need the magic where you go into nodes and add offsets etc? So all _thread_build would have to do is to keep adding trees until you reach the limit.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah exactly, that's what i was thinking.

Your approach at https://github.com/spotify/annoy/pull/495/files#diff-91eaadc9d0aa248375a22a64f315e3d8R1250 is very clever but i'm a bit nervous it's too hacky and I'd love to have something slightly simpler :)

Doesn't your barrier thing roughly accomplish what at shared_mutex does? Or it should at least be possible to modify it into sort of a shared mutex? Haven't thought about it enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, I would definitely feel more comfortable if I could simplify it a bit 😄

Unfortunately a barrier does not accomplish what a shared_mutex is supposed to do. It can be only used as a global synchronization point, not for reader/writer style locking.

After reading a couple of SO threads, a custom shared mutex could prove really tricky to implement correctly. We could use the shared_timed_mutex which has a few additional capabilities and is available from C++14. Either way, I'll implement the less-magic version we discussed and then we can compare the performance and decide if it's worth it to bump the C++ version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I managed to simplify my solution in the latest commit 😄 I removed all of the magic and introduced a couple of more mutex variables and a shared_timed_mutex variable.

I've re-ran my time measurements on glove-angular-25:

Num. threads n_trees=32 time (s) n_trees=64 time (s)
1 49.286592960357666 99.88854050636292
2 26.54487133026123 53.29321575164795
4 15.026970863342285 29.342100143432617
8 10.422403812408447 18.966835498809814
16 9.757703065872192 17.069363117218018

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow amazing! Any idea if there's any slowdown compared to the current master?

The new code looks much cleaner and less "clever". I don't know how well supported c++14 is but hopefully it's not a big problem.

Do you really need so many different mutexes? Not a concern, was just surprised to see 4 different ones

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can prepare a more comprehensive timing benchmark and include a comparison with the current master. But as far as I can tell, there isn't a significant slowdown.

Regarding the number of mutexes: If I guard everything with the shared nodes_mutex just one thread has to acquire an exclusive lock and everything grinds to a halt. So I opted for a more granular approach, where I exclusively lock the nodes_mutex only when absolutely necessary and add a separate regular locks for all other shared variables (nodes_size, _n_nodes). I'll see if I can cut down on the number of mutexes, but I currently can't see a better way.

Random _random;
// Each thread needs its own seed, otherwise each thread would be building the same tree(s)
int seed = _is_seeded ? _seed + thread_idx : thread_idx;
_random.set_seed(seed);

vector<vector<Node*> > thread_trees;
vector<S> thread_roots;
while (1) {
if (q == -1) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code to determine q if q = -1 could probably just be left in the build function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing with q = -1, is that I don't know beforehand how many trees each thread should build, because it depends on the number of nodes. That is why I have to check for -1 everywhere.

A better solution would be to introduce a different meaning to q = -1, where each thread would build just one tree for example.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah good point. I forgot about that. Ok if your code works then let's keep it, I guess

size_t thread_n_nodes = 0;
for (size_t tree_idx = 0; tree_idx < thread_trees.size(); tree_idx++) {
thread_n_nodes += thread_trees[tree_idx].size();
}
if (thread_n_nodes >= 2 * (size_t)_n_items) {
break;
}
} else {
if (thread_roots.size() >= (size_t)q) {
break;
}
}

if (_verbose) showUpdate("pass %zd...\n", thread_roots.size());

vector<S> indices;
for (S i = 0; i < _n_items; i++) {
if (_get(i)->n_descendants >= 1) // Issue #223
indices.push_back(i);
}

vector<Node*> split_nodes;
// Each thread is essentially pretending to build only one tree that will get inserted
// right after the already inserted items. Indices of split nodes start with _n_items, n_items + 1, ...
// We do not want to mutate the _nodes array during tree construction due to reallocation issues. That is
// why each thread stores the trees locally until all threads are ready to insert them into _nodes.
S root_node = _make_tree(indices, split_nodes, true, _random);
thread_roots.push_back(root_node);
thread_trees.push_back(split_nodes);
}

// Wait for all threads to finish before we can start inserting tree nodes into global _nodes array
barrier.wait();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't locking the mutex enough to enforce this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately not, due to reasons noted here: #495 (comment) We have to wait for all threads finish their work before we can mutate the global _nodes buffer.


_nodes_mutex.lock();
// When a thread wants to insert local tree nodes into global _nodes it has to stop pretending that there is
// going to be only one tree. Each thread has to update all split nodes children that are pointing to other split nodes
// because their indices will change once inserted into global _nodes.
for (size_t tree_idx = 0; tree_idx < thread_trees.size(); tree_idx++) {
vector<Node*> split_nodes = thread_trees[tree_idx];
// Offset from _n_items where split nodes will get inserted
S split_nodes_offset = _n_nodes - _n_items;
_allocate_size(_n_nodes + split_nodes.size());

for (size_t node_idx = 0; node_idx < split_nodes.size(); node_idx++) {
Node* split_node = split_nodes[node_idx];
bool is_root = (size_t)thread_roots[tree_idx] == (_n_items + node_idx);

// Inverted condition from _make_tree to detect split nodes
if ((split_node->n_descendants > _K) || (is_root && (size_t)_n_items > (size_t)_K && split_node->n_descendants > 1)) {
for (size_t child_idx = 0; child_idx < 2; child_idx++) {
// Update children offset if it is pointing to a split node
if (split_node->children[child_idx] >= _n_items) {
split_node->children[child_idx] += split_nodes_offset;
}
}
}

memcpy(_get(_n_nodes), split_node, _s);
free(split_node);

_n_nodes += 1;
}

thread_roots[tree_idx] += split_nodes_offset;
}
_roots.insert(_roots.end(), thread_roots.begin(), thread_roots.end());
_nodes_mutex.unlock();
}

S _make_tree(const vector<S >& indices, vector<Node* >& split_nodes, bool is_root, Random& _random) {
// The basic rule is that if we have <= _K items, then it's a leaf node, otherwise it's a split node.
// There's some regrettable complications caused by the problem that root nodes have to be "special":
// 1. We identify root nodes by the arguable logic that _n_items == n->n_descendants, regardless of how many descendants they actually have
Expand All @@ -1182,9 +1296,8 @@ template<typename S, typename T, typename Distance, typename Random>
return indices[0];

if (indices.size() <= (size_t)_K && (!is_root || (size_t)_n_items <= (size_t)_K || indices.size() == 1)) {
_allocate_size(_n_nodes + 1);
S item = _n_nodes++;
Node* m = _get(item);
Node* m = (Node*)malloc(_s);
memset(m, 0, _s);
m->n_descendants = is_root ? _n_items : (S)indices.size();

// Using std::copy instead of a loop seems to resolve issues #3 and #13,
Expand All @@ -1193,7 +1306,9 @@ template<typename S, typename T, typename Distance, typename Random>
// Only copy when necessary to avoid crash in MSVC 9. #293
if (!indices.empty())
memcpy(m->children, &indices[0], indices.size() * sizeof(S));
return item;

split_nodes.push_back(m);
return _n_items + (split_nodes.size() - 1);
}

vector<Node*> children;
Expand All @@ -1205,7 +1320,8 @@ template<typename S, typename T, typename Distance, typename Random>
}

vector<S> children_indices[2];
Node* m = (Node*)alloca(_s);
Node* m = (Node*)malloc(_s);
memset(m, 0, _s);
D::create_split(children, _f, _s, _random, m);

for (size_t i = 0; i < indices.size(); i++) {
Expand Down Expand Up @@ -1246,14 +1362,11 @@ template<typename S, typename T, typename Distance, typename Random>
m->n_descendants = is_root ? _n_items : (S)indices.size();
for (int side = 0; side < 2; side++) {
// run _make_tree for the smallest child first (for cache locality)
m->children[side^flip] = _make_tree(children_indices[side^flip], false);
m->children[side^flip] = _make_tree(children_indices[side^flip], split_nodes, false, _random);
}

_allocate_size(_n_nodes + 1);
S item = _n_nodes++;
memcpy(_get(item), m, _s);

return item;
split_nodes.push_back(m);
return _n_items + (split_nodes.size() - 1);
}

void _get_all_nns(const T* v, size_t n, int search_k, vector<S>* result, vector<T>* distances) const {
Expand Down
7 changes: 6 additions & 1 deletion src/annoyluamodule.cc
Expand Up @@ -118,9 +118,14 @@ class LuaAnnoy {
}

static int build(lua_State* L) {
int nargs = lua_gettop(L);
Impl* self = getAnnoy(L, 1);
int n_trees = luaL_checkinteger(L, 2);
self->build(n_trees);
int n_threads = 1;
if (nargs >= 3) {
n_threads = luaL_checkinteger(L, 3);
}
self->build(n_trees, n_threads);
lua_pushboolean(L, true);
return 1;
}
Expand Down