Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Fix deadlock reported in RT#122709.
The need for a thread to handle incoming I/O-related callbacks did not
get accounted for properly, meaning that we didn't have enough threads
started and so deadlocked.
  • Loading branch information
jnthn committed Sep 7, 2014
1 parent bceb13e commit 57573e8
Showing 1 changed file with 16 additions and 7 deletions.
23 changes: 16 additions & 7 deletions src/core/ThreadPoolScheduler.pm
Expand Up @@ -13,10 +13,17 @@ my class ThreadPoolScheduler does Scheduler {
has $!thread_start_semaphore;

# Number of outstanding work items, used for rough management of the
# pool size. Also a lock to protect updates to it. (TODO: use atomic
# operations for this in the future.)
# pool size.
has int $!loads;
has $!loads_lock;

# Number of threads started so far.
has int $!threads_started;

# Lock protecting updates to the above 2 fields.
has $!counts_lock;

# If we've got incoming I/O events we need a thread to handle.
has int $!need_io_thread;

# Initial and maximum threads.
has Int $.initial_threads;
Expand All @@ -29,10 +36,11 @@ my class ThreadPoolScheduler does Scheduler {
method !maybe_new_thread() {
if $!thread_start_semaphore.try_acquire() {
$!started_any = 1;
$!counts_lock.protect: { $!threads_started = $!threads_started + 1 };
Thread.start(:app_lifetime, {
loop {
my Mu $task := nqp::shift($!queue);
$!loads_lock.protect: { $!loads = $!loads + 1 };
$!counts_lock.protect: { $!loads = $!loads + 1 };
try {
if nqp::islist($task) {
my Mu $code := nqp::shift($task);
Expand All @@ -47,7 +55,7 @@ my class ThreadPoolScheduler does Scheduler {
}
}
}
$!loads_lock.protect: { $!loads = $!loads - 1 };
$!counts_lock.protect: { $!loads = $!loads - 1 };
}
});
}
Expand All @@ -64,6 +72,7 @@ my class ThreadPoolScheduler does Scheduler {
method queue() {
self!initialize unless $!started_any;
self!maybe_new_thread();
$!need_io_thread = 1;
$!queue
}

Expand Down Expand Up @@ -108,7 +117,7 @@ my class ThreadPoolScheduler does Scheduler {
my &run := &catch
?? -> { code(); CATCH { default { catch($_) } } }
!! &code;
self!maybe_new_thread() if !$!started_any || $!loads;
self!maybe_new_thread() if $!loads + $!need_io_thread <= $!threads_started;
nqp::push($!queue, &run);
return Nil;
}
Expand All @@ -122,7 +131,7 @@ my class ThreadPoolScheduler does Scheduler {
method !initialize() {
$!queue := nqp::create(Queue);
$!thread_start_semaphore := Semaphore.new($!max_threads.Int);
$!loads_lock := nqp::create(Lock);
$!counts_lock := nqp::create(Lock);
self!maybe_new_thread() for 1..$!initial_threads;
}
}
Expand Down

0 comments on commit 57573e8

Please sign in to comment.