Skip to content

Commit

Permalink
Merge pull request #307 from mdickinson/fix/handle-spurious-wakeup
Browse files Browse the repository at this point in the history
Allow for spurious wakeup calls from pthread_cond_wait.
  • Loading branch information
robbmcleod committed Jul 12, 2018
2 parents 8b91a6d + 79232c0 commit 0a2dad3
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 5 deletions.
11 changes: 9 additions & 2 deletions numexpr/interpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -772,9 +772,13 @@ vm_engine_iter_parallel(NpyIter *iter, const vm_params& params,
pthread_mutex_lock(&gs.count_threads_mutex);
if (gs.count_threads < gs.nthreads) {
gs.count_threads++;
pthread_cond_wait(&gs.count_threads_cv, &gs.count_threads_mutex);
/* Beware of spurious wakeups. See issue pydata/numexpr#306. */
do {
pthread_cond_wait(&gs.count_threads_cv, &gs.count_threads_mutex);
} while (!gs.threads_ready);
}
else {
gs.threads_ready = 1;
pthread_cond_broadcast(&gs.count_threads_cv);
}
pthread_mutex_unlock(&gs.count_threads_mutex);
Expand All @@ -783,9 +787,12 @@ vm_engine_iter_parallel(NpyIter *iter, const vm_params& params,
pthread_mutex_lock(&gs.count_threads_mutex);
if (gs.count_threads > 0) {
gs.count_threads--;
pthread_cond_wait(&gs.count_threads_cv, &gs.count_threads_mutex);
do {
pthread_cond_wait(&gs.count_threads_cv, &gs.count_threads_mutex);
} while (gs.threads_ready);
}
else {
gs.threads_ready = 0;
pthread_cond_broadcast(&gs.count_threads_cv);
}
pthread_mutex_unlock(&gs.count_threads_mutex);
Expand Down
20 changes: 17 additions & 3 deletions numexpr/module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,14 @@ void *th_worker(void *tidptr)
pthread_mutex_lock(&gs.count_threads_mutex);
if (gs.count_threads < gs.nthreads) {
gs.count_threads++;
pthread_cond_wait(&gs.count_threads_cv, &gs.count_threads_mutex);
/* Beware of spurious wakeups. See issue pydata/numexpr#306. */
do {
pthread_cond_wait(&gs.count_threads_cv,
&gs.count_threads_mutex);
} while (!gs.threads_ready);
}
else {
gs.threads_ready = 1;
pthread_cond_broadcast(&gs.count_threads_cv);
}
pthread_mutex_unlock(&gs.count_threads_mutex);
Expand Down Expand Up @@ -164,9 +169,13 @@ void *th_worker(void *tidptr)
pthread_mutex_lock(&gs.count_threads_mutex);
if (gs.count_threads > 0) {
gs.count_threads--;
pthread_cond_wait(&gs.count_threads_cv, &gs.count_threads_mutex);
do {
pthread_cond_wait(&gs.count_threads_cv,
&gs.count_threads_mutex);
} while (gs.threads_ready);
}
else {
gs.threads_ready = 0;
pthread_cond_broadcast(&gs.count_threads_cv);
}
pthread_mutex_unlock(&gs.count_threads_mutex);
Expand Down Expand Up @@ -194,6 +203,7 @@ int init_threads(void)
pthread_mutex_init(&gs.count_threads_mutex, NULL);
pthread_cond_init(&gs.count_threads_cv, NULL);
gs.count_threads = 0; /* Reset threads counter */
gs.threads_ready = 0;

/* Finally, create the threads */
for (tid = 0; tid < gs.nthreads; tid++) {
Expand Down Expand Up @@ -247,9 +257,13 @@ int numexpr_set_nthreads(int nthreads_new)
pthread_mutex_lock(&gs.count_threads_mutex);
if (gs.count_threads < gs.nthreads) {
gs.count_threads++;
pthread_cond_wait(&gs.count_threads_cv, &gs.count_threads_mutex);
do {
pthread_cond_wait(&gs.count_threads_cv,
&gs.count_threads_mutex);
} while (!gs.threads_ready);
}
else {
gs.threads_ready = 1;
pthread_cond_broadcast(&gs.count_threads_cv);
}
pthread_mutex_unlock(&gs.count_threads_mutex);
Expand Down
1 change: 1 addition & 0 deletions numexpr/module.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ struct global_state {
/* Synchronization variables for threadpool state */
pthread_mutex_t count_mutex;
int count_threads;
int threads_ready;
pthread_mutex_t count_threads_mutex;
pthread_cond_t count_threads_cv;

Expand Down

0 comments on commit 0a2dad3

Please sign in to comment.