Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 4 additions & 15 deletions src/internal/event_loop/event_loop.mbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

///|
priv struct Worker {
id : Int64
job_slot : Ref[JobForWorker]
}

///|
priv struct EventLoop {
poll : Instance
Expand Down Expand Up @@ -136,10 +130,7 @@ fn EventLoop::run_forever(self : Self) -> Unit raise {
self.running_workers.remove(job_id)
match self.job_queue.pop_front() {
None => self.idle_workers.push_back(worker)
Some(job) => {
worker.job_slot.val = job
wake_worker(worker.id)
}
Some(job) => wake_worker(worker, job)
}
if self.jobs.get(job_id) is Some(coro) {
self.jobs.remove(job_id)
Expand Down Expand Up @@ -325,14 +316,12 @@ async fn perform_job_in_worker(
None if evloop.running_workers.size() > evloop.max_worker_count =>
evloop.job_queue.push_back(job)
None => {
let job_slot = @ref.new(job)
let id = spawn_worker(job_slot)
evloop.running_workers[job.id()] = { id, job_slot }
let worker = spawn_worker(job)
evloop.running_workers[job.id()] = worker
}
Some(worker) => {
worker.job_slot.val = job
evloop.running_workers[job.id()] = worker
wake_worker(worker.id)
wake_worker(worker, job)
}
}
evloop.jobs[job.id()] = @coroutine.current_coroutine()
Expand Down
97 changes: 77 additions & 20 deletions src/internal/event_loop/thread_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@

#ifdef __MACH__
#include <sys/event.h>
#define WAKEUP_METHOD_COND_VAR
#else
#define WAKEUP_METHOD_SIGNAL
#endif

extern char **environ;
Expand Down Expand Up @@ -130,8 +133,10 @@ struct {

int notify_send;

#ifdef WAKEUP_METHOD_SIGNAL
sigset_t wakeup_signal;
sigset_t old_sigmask;
#endif
int32_t job_id;
} pool;

Expand Down Expand Up @@ -163,16 +168,26 @@ int moonbitlang_async_job_poll_fd(struct job *job) {
}
}

struct worker {
pthread_t id;
struct job *job;
#ifdef WAKEUP_METHOD_COND_VAR
pthread_mutex_t mutex;
pthread_cond_t cond;
#endif
};

static
void *worker(void *data) {
void *worker_loop(void *data) {
int sig;
pthread_t self = pthread_self();
struct worker *self = (struct worker*)data;

sigset_t sigset;
sigemptyset(&sigset);
sigaddset(&sigset, SIGUSR1);
struct job *job = self->job;

struct job *job = *((struct job**)data);
#ifdef WAKEUP_METHOD_COND_VAR
pthread_mutex_init(&(self->mutex), 0);
pthread_cond_init(&(self->cond), 0);
#endif

while (job) {
job->ret = 0;
Expand Down Expand Up @@ -254,8 +269,12 @@ void *worker(void *data) {
case OP_SPAWN: {
posix_spawnattr_t attr;
posix_spawnattr_init(&attr);
#ifdef WAKEUP_METHOD_SIGNAL
posix_spawnattr_setflags(&attr, POSIX_SPAWN_SETSIGMASK | POSIX_SPAWN_SETSIGDEF);
posix_spawnattr_setsigmask(&attr, &pool.old_sigmask);
#else
posix_spawnattr_setflags(&attr, POSIX_SPAWN_SETSIGDEF);
#endif

sigset_t sigdefault_set;
sigemptyset(&sigdefault_set);
Expand Down Expand Up @@ -358,24 +377,51 @@ void *worker(void *data) {
break;
}
}
write(pool.notify_send, &job, sizeof(struct job*));
write(pool.notify_send, &(job->job_id), sizeof(int));

job = 0;
#ifdef WAKEUP_METHOD_SIGNAL
sigwait(&pool.wakeup_signal, &sig);
job = *(struct job**)data;
#elif defined(WAKEUP_METHOD_COND_VAR)
pthread_mutex_lock(&(self->mutex));
#ifdef __MACH__
// There's a bug in the MacOS's `pthread_cond_wait`,
// see https://github.com/graphia-app/graphia/issues/33
// We know the arguments must be valid here, so use a loop to work around
while (pthread_cond_wait(&(self->cond), &(self->mutex)) == EINVAL) {}
#else
pthread_cond_wait(&(self->cond), &(self->mutex));
#endif
pthread_mutex_unlock(&(self->mutex));
#endif
job = self->job;
}
return 0;
}

void moonbitlang_async_wake_worker(struct worker *worker, struct job *job) {
moonbit_decref(worker->job);
worker->job = job;
#ifdef WAKEUP_METHOD_SIGNAL
pthread_kill(worker->id, SIGUSR1);
#elif defined(WAKEUP_METHOD_COND_VAR)
pthread_mutex_lock(&(worker->mutex));
pthread_cond_signal(&(worker->cond));
pthread_mutex_unlock(&(worker->mutex));
#endif
}

void moonbitlang_async_init_thread_pool(int notify_send) {
if (pool.initialized)
abort();

pool.job_id = 0;

#ifdef WAKEUP_METHOD_SIGNAL
sigemptyset(&pool.wakeup_signal);
sigaddset(&pool.wakeup_signal, SIGUSR1);
pthread_sigmask(SIG_BLOCK, &pool.wakeup_signal, &pool.old_sigmask);
#endif

pool.notify_send = notify_send;
pool.initialized = 1;
Expand All @@ -387,24 +433,37 @@ void moonbitlang_async_destroy_thread_pool() {

pool.initialized = 0;

#ifdef WAKEUP_METHOD_SIGNAL
pthread_sigmask(SIG_SETMASK, &pool.old_sigmask, 0);
#endif

pool.job_id = 0;
}

pthread_t moonbitlang_async_spawn_worker(struct job **job_slot) {
void free_worker(void *target) {
struct worker *worker = (struct worker*)target;
// terminate the worker
moonbitlang_async_wake_worker(worker, 0);
pthread_join(worker->id, 0);
#ifdef WAKEUP_METHOD_COND_VAR
pthread_mutex_destroy(&(worker->mutex));
pthread_cond_destroy(&(worker->cond));
#endif
}

struct worker *moonbitlang_async_spawn_worker(struct job *init_job) {
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setstacksize(&attr, 512);

pthread_t id;
pthread_create(&id, &attr, &worker, job_slot);
struct worker *worker = (struct worker*)moonbit_make_external_object(
&free_worker,
sizeof(struct worker)
);
worker->job = init_job;
pthread_create(&(worker->id), &attr, &worker_loop, worker);
pthread_attr_destroy(&attr);
return id;
}

void moonbitlang_async_wake_worker(pthread_t worker) {
pthread_kill(worker, SIGUSR1);
return worker;
}

int moonbitlang_async_job_id(struct job *job) {
Expand Down Expand Up @@ -585,12 +644,10 @@ struct job *moonbitlang_async_make_getaddrinfo_job(
}

int32_t moonbitlang_async_fetch_completion(int notify_recv) {
struct job *job;
int32_t ret = read(notify_recv, &job, sizeof(struct job*));
int job_id;
int32_t ret = read(notify_recv, &job_id, sizeof(int));
if (ret < 0)
return ret;

int job_id = job->job_id;
moonbit_decref(job);
return job_id;
}
8 changes: 5 additions & 3 deletions src/internal/event_loop/thread_pool.mbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.

///|
priv type Worker

///|
extern "C" fn init_thread_pool_ffi(notify_send : Int) = "moonbitlang_async_init_thread_pool"

///|
extern "C" fn destroy_thread_pool() = "moonbitlang_async_destroy_thread_pool"

///|
#borrow(job_slot)
extern "C" fn spawn_worker(job_slot : Ref[JobForWorker]) -> Int64 = "moonbitlang_async_spawn_worker"
extern "C" fn spawn_worker(init_job : JobForWorker) -> Worker = "moonbitlang_async_spawn_worker"

///|
extern "C" fn wake_worker(worker_id : Int64) = "moonbitlang_async_wake_worker"
extern "C" fn wake_worker(worker : Worker, job : JobForWorker) = "moonbitlang_async_wake_worker"

///|
extern "C" fn fetch_completion_ffi(notify_recv : Int) -> Int = "moonbitlang_async_fetch_completion"
Loading