diff --git a/src/internal/event_loop/event_loop.mbt b/src/internal/event_loop/event_loop.mbt index 1f5dff47..d85f39fb 100644 --- a/src/internal/event_loop/event_loop.mbt +++ b/src/internal/event_loop/event_loop.mbt @@ -12,12 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -///| -priv struct Worker { - id : Int64 - job_slot : Ref[JobForWorker] -} - ///| priv struct EventLoop { poll : Instance @@ -136,10 +130,7 @@ fn EventLoop::run_forever(self : Self) -> Unit raise { self.running_workers.remove(job_id) match self.job_queue.pop_front() { None => self.idle_workers.push_back(worker) - Some(job) => { - worker.job_slot.val = job - wake_worker(worker.id) - } + Some(job) => wake_worker(worker, job) } if self.jobs.get(job_id) is Some(coro) { self.jobs.remove(job_id) @@ -325,14 +316,12 @@ async fn perform_job_in_worker( None if evloop.running_workers.size() > evloop.max_worker_count => evloop.job_queue.push_back(job) None => { - let job_slot = @ref.new(job) - let id = spawn_worker(job_slot) - evloop.running_workers[job.id()] = { id, job_slot } + let worker = spawn_worker(job) + evloop.running_workers[job.id()] = worker } Some(worker) => { - worker.job_slot.val = job evloop.running_workers[job.id()] = worker - wake_worker(worker.id) + wake_worker(worker, job) } } evloop.jobs[job.id()] = @coroutine.current_coroutine() diff --git a/src/internal/event_loop/thread_pool.c b/src/internal/event_loop/thread_pool.c index 2a5c2707..5bc79a14 100644 --- a/src/internal/event_loop/thread_pool.c +++ b/src/internal/event_loop/thread_pool.c @@ -34,6 +34,9 @@ #ifdef __MACH__ #include +#define WAKEUP_METHOD_COND_VAR +#else +#define WAKEUP_METHOD_SIGNAL #endif extern char **environ; @@ -130,8 +133,10 @@ struct { int notify_send; +#ifdef WAKEUP_METHOD_SIGNAL sigset_t wakeup_signal; sigset_t old_sigmask; +#endif int32_t job_id; } pool; @@ -163,16 +168,26 @@ int moonbitlang_async_job_poll_fd(struct job *job) { } } +struct worker { + pthread_t id; + struct job *job; +#ifdef WAKEUP_METHOD_COND_VAR + pthread_mutex_t mutex; + pthread_cond_t cond; +#endif +}; + static -void *worker(void *data) { +void *worker_loop(void *data) { int sig; - pthread_t self = pthread_self(); + struct worker *self = (struct worker*)data; - sigset_t sigset; - sigemptyset(&sigset); - sigaddset(&sigset, SIGUSR1); + struct job *job = self->job; - struct job *job = *((struct job**)data); +#ifdef WAKEUP_METHOD_COND_VAR + pthread_mutex_init(&(self->mutex), 0); + pthread_cond_init(&(self->cond), 0); +#endif while (job) { job->ret = 0; @@ -254,8 +269,12 @@ void *worker(void *data) { case OP_SPAWN: { posix_spawnattr_t attr; posix_spawnattr_init(&attr); +#ifdef WAKEUP_METHOD_SIGNAL posix_spawnattr_setflags(&attr, POSIX_SPAWN_SETSIGMASK | POSIX_SPAWN_SETSIGDEF); posix_spawnattr_setsigmask(&attr, &pool.old_sigmask); +#else + posix_spawnattr_setflags(&attr, POSIX_SPAWN_SETSIGDEF); +#endif sigset_t sigdefault_set; sigemptyset(&sigdefault_set); @@ -358,24 +377,51 @@ void *worker(void *data) { break; } } - write(pool.notify_send, &job, sizeof(struct job*)); + write(pool.notify_send, &(job->job_id), sizeof(int)); job = 0; +#ifdef WAKEUP_METHOD_SIGNAL sigwait(&pool.wakeup_signal, &sig); - job = *(struct job**)data; +#elif defined(WAKEUP_METHOD_COND_VAR) + pthread_mutex_lock(&(self->mutex)); +#ifdef __MACH__ + // There's a bug in the MacOS's `pthread_cond_wait`, + // see https://github.com/graphia-app/graphia/issues/33 + // We know the arguments must be valid here, so use a loop to work around + while (pthread_cond_wait(&(self->cond), &(self->mutex)) == EINVAL) {} +#else + pthread_cond_wait(&(self->cond), &(self->mutex)); +#endif + pthread_mutex_unlock(&(self->mutex)); +#endif + job = self->job; } return 0; } +void moonbitlang_async_wake_worker(struct worker *worker, struct job *job) { + moonbit_decref(worker->job); + worker->job = job; +#ifdef WAKEUP_METHOD_SIGNAL + pthread_kill(worker->id, SIGUSR1); +#elif defined(WAKEUP_METHOD_COND_VAR) + pthread_mutex_lock(&(worker->mutex)); + pthread_cond_signal(&(worker->cond)); + pthread_mutex_unlock(&(worker->mutex)); +#endif +} + void moonbitlang_async_init_thread_pool(int notify_send) { if (pool.initialized) abort(); pool.job_id = 0; +#ifdef WAKEUP_METHOD_SIGNAL sigemptyset(&pool.wakeup_signal); sigaddset(&pool.wakeup_signal, SIGUSR1); pthread_sigmask(SIG_BLOCK, &pool.wakeup_signal, &pool.old_sigmask); +#endif pool.notify_send = notify_send; pool.initialized = 1; @@ -387,24 +433,37 @@ void moonbitlang_async_destroy_thread_pool() { pool.initialized = 0; +#ifdef WAKEUP_METHOD_SIGNAL pthread_sigmask(SIG_SETMASK, &pool.old_sigmask, 0); +#endif pool.job_id = 0; } -pthread_t moonbitlang_async_spawn_worker(struct job **job_slot) { +void free_worker(void *target) { + struct worker *worker = (struct worker*)target; + // terminate the worker + moonbitlang_async_wake_worker(worker, 0); + pthread_join(worker->id, 0); +#ifdef WAKEUP_METHOD_COND_VAR + pthread_mutex_destroy(&(worker->mutex)); + pthread_cond_destroy(&(worker->cond)); +#endif +} + +struct worker *moonbitlang_async_spawn_worker(struct job *init_job) { pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setstacksize(&attr, 512); - pthread_t id; - pthread_create(&id, &attr, &worker, job_slot); + struct worker *worker = (struct worker*)moonbit_make_external_object( + &free_worker, + sizeof(struct worker) + ); + worker->job = init_job; + pthread_create(&(worker->id), &attr, &worker_loop, worker); pthread_attr_destroy(&attr); - return id; -} - -void moonbitlang_async_wake_worker(pthread_t worker) { - pthread_kill(worker, SIGUSR1); + return worker; } int moonbitlang_async_job_id(struct job *job) { @@ -585,12 +644,10 @@ struct job *moonbitlang_async_make_getaddrinfo_job( } int32_t moonbitlang_async_fetch_completion(int notify_recv) { - struct job *job; - int32_t ret = read(notify_recv, &job, sizeof(struct job*)); + int job_id; + int32_t ret = read(notify_recv, &job_id, sizeof(int)); if (ret < 0) return ret; - int job_id = job->job_id; - moonbit_decref(job); return job_id; } diff --git a/src/internal/event_loop/thread_pool.mbt b/src/internal/event_loop/thread_pool.mbt index 3cd16ed4..7cb03286 100644 --- a/src/internal/event_loop/thread_pool.mbt +++ b/src/internal/event_loop/thread_pool.mbt @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +///| +priv type Worker + ///| extern "C" fn init_thread_pool_ffi(notify_send : Int) = "moonbitlang_async_init_thread_pool" @@ -19,11 +22,10 @@ extern "C" fn init_thread_pool_ffi(notify_send : Int) = "moonbitlang_async_init_ extern "C" fn destroy_thread_pool() = "moonbitlang_async_destroy_thread_pool" ///| -#borrow(job_slot) -extern "C" fn spawn_worker(job_slot : Ref[JobForWorker]) -> Int64 = "moonbitlang_async_spawn_worker" +extern "C" fn spawn_worker(init_job : JobForWorker) -> Worker = "moonbitlang_async_spawn_worker" ///| -extern "C" fn wake_worker(worker_id : Int64) = "moonbitlang_async_wake_worker" +extern "C" fn wake_worker(worker : Worker, job : JobForWorker) = "moonbitlang_async_wake_worker" ///| extern "C" fn fetch_completion_ffi(notify_recv : Int) -> Int = "moonbitlang_async_fetch_completion"