Skip to content

Commit

Permalink
Add schedule_in, schedule_every to scheduler.
Browse files Browse the repository at this point in the history
  • Loading branch information
jnthn committed Oct 30, 2013
1 parent dcf8e14 commit 91a1b0c
Showing 1 changed file with 42 additions and 14 deletions.
56 changes: 42 additions & 14 deletions src/vm/jvm/core/Threading.pm
Expand Up @@ -100,7 +100,9 @@ my role Scheduler {
CATCH { default { catch($_) } }
})
}


method schedule_in(&code, $delay) { ... }
method schedule_every(&code, $interval, $delay = 0) { ... }
method outstanding() { ... }
}

Expand All @@ -127,6 +129,9 @@ my class ThreadPoolScheduler does Scheduler {
# Have we started any threads yet?
has int $!started_any;

# Timer for interval-scheduled things.
has $!timer;

# Adds a new thread to the pool, respecting the maximum.
method !maybe_new_thread() {
if $!thread_start_semaphore.'method/tryAcquire/(I)Z'(1) {
Expand Down Expand Up @@ -155,26 +160,49 @@ my class ThreadPoolScheduler does Scheduler {
}

method schedule(&code) {
my $interop := nqp::jvmbootinterop();
unless $!started_any {
# Things we will use from the JVM.
my \LinkedBlockingQueue := $interop.typeForName('java.util.concurrent.LinkedBlockingQueue');
my \Semaphore := $interop.typeForName('java.util.concurrent.Semaphore');
my \AtomicInteger := $interop.typeForName('java.util.concurrent.atomic.AtomicInteger');
$!queue := LinkedBlockingQueue.'constructor/new/()V'();
$!thread_start_semaphore := Semaphore.'constructor/new/(I)V'($!max_threads.Int);
$!outstanding := AtomicInteger.'constructor/new/()V'();
self!maybe_new_thread() for 1..$!initial_threads;
}
self!initialize unless $!started_any;
my $outstanding = $!outstanding.incrementAndGet();
self!maybe_new_thread()
if !$!started_any || $outstanding > 1;
$!queue.add($interop.sixmodelToJavaObject(&code));
$!queue.add(nqp::jvmbootinterop().sixmodelToJavaObject(&code));
}


method schedule_in(&code, $delay) {
self!initialize() unless $!started_any;
$!timer.'method/schedule/(Ljava/util/TimerTask;J)V'(
nqp::jvmbootinterop().proxy(
'java.util.TimerTask',
nqp::hash('run', -> { code() })),
($delay * 1000).Int);
}

method schedule_every(&code, $interval, $delay = 0) {
self!initialize() unless $!started_any;
$!timer.'method/scheduleAtFixedRate/(Ljava/util/TimerTask;JJ)V'(
nqp::jvmbootinterop().proxy(
'java.util.TimerTask',
nqp::hash('run', -> { code() })),
($delay * 1000).Int,
($interval * 1000).Int);
}

method outstanding() {
$!outstanding.get()
}

method !initialize() {
# Things we will use from the JVM.
my $interop := nqp::jvmbootinterop();
my \LinkedBlockingQueue := $interop.typeForName('java.util.concurrent.LinkedBlockingQueue');
my \Semaphore := $interop.typeForName('java.util.concurrent.Semaphore');
my \AtomicInteger := $interop.typeForName('java.util.concurrent.atomic.AtomicInteger');
my \Timer := $interop.typeForName('java.util.Timer');
$!queue := LinkedBlockingQueue.'constructor/new/()V'();
$!thread_start_semaphore := Semaphore.'constructor/new/(I)V'($!max_threads.Int);
$!outstanding := AtomicInteger.'constructor/new/()V'();
$!timer := Timer.'constructor/new/(Z)V'(True);
self!maybe_new_thread() for 1..$!initial_threads;
}
}

# This thread pool scheduler will be the default one.
Expand Down

0 comments on commit 91a1b0c

Please sign in to comment.