Permalink
Browse files

Fix the wait op for threads

A task scheduled on another thread will get cloned to be local to this
thread. The two task objects are then linked by a "partner" pointer. To
wait for a task of another thread one now only has to add the current
task to the waiter list of the original task object.

A task will schedule its waiters when it is done and now will also see
if the partner task has any waiters and schedule them on the partner's
interp.

In addition the main thread will now no longer exit as long as tasks on
other threads are running (an explicit exit op does still work however).
The main thread's tasks could still be waiting for other tasks to
finish.
  • Loading branch information...
1 parent fdb60a5 commit 65e27042d9b5f7fe6ca1bfbb61a74065a993711e @niner niner committed Mar 25, 2012
Showing with 99 additions and 42 deletions.
  1. +15 −19 examples/threads/chameneos.pir
  2. +16 −2 src/ops/core_ops.c
  3. +9 −1 src/ops/experimental.ops
  4. +1 −1 src/pmc/scheduler.pmc
  5. +23 −3 src/pmc/task.pmc
  6. +15 −4 src/scheduler.c
  7. +3 −4 src/thread.c
  8. +17 −8 t/src/threads.t
View
34 examples/threads/chameneos.pir
@@ -1,11 +1,15 @@
# Copyright (C) 2011 Parrot Foundation.
.sub 'main' :main
- .local pmc colors, start_colors, at_most_two, mutex, sem_priv, first_call, a_color, b_color, chameneos, chameneo, code, data, number, color, dummy
+ .local pmc colors, start_colors, at_most_two, mutex, sem_priv, first_call, a_color, b_color, chameneos, chameneo, code, data, number, color, dummy, count
.local int i
dummy = new ['Continuation'] # workaround, see TODO in Proxy instantiate
+ count = new 'Integer'
+ count = 0
+ set_global 'count', count
+
colors = new ['ResizableStringArray']
colors = 3
colors[0] = 'Blue'
@@ -63,6 +67,7 @@ init_chameneos:
say "going to sleep"
sleep 10
say "woke up just in time for exit"
+ exit 0
.end
.sub chameneos_code
@@ -136,7 +141,8 @@ start:
setattribute call_task, 'code', call_core
setattribute call_task, 'data', color
interp.'schedule_proxied'(call_task, b_color)
- sleep 0.1
+ wait call_task
+
other_color = a_color
sem_unlock(sem_priv)
goto done
@@ -148,7 +154,7 @@ start:
setattribute call_task, 'code', call_core
setattribute call_task, 'data', color
interp.'schedule_proxied'(call_task, a_color)
- sleep 0.1
+ wait call_task
sem_unlock(mutex)
sem_wait(sem_priv)
@@ -177,7 +183,7 @@ done:
.sub second_call_core
.param pmc data
- .local pmc interp, task, b_color, first_call
+ .local pmc interp, task, b_color, first_call, count
.local int b_color_int
interp = getinterp
task = interp.'current_task'()
@@ -188,6 +194,9 @@ done:
first_call = 1
b_color_int = data
b_color = b_color_int
+ count = get_global 'count'
+ inc count
+ say count
.end
.sub sem_unlock
@@ -210,27 +219,18 @@ done:
interp = getinterp
sem_wait_core = get_global 'sem_wait_core'
- waiter = new ['Continuation']
- set_label waiter, got_lock
-
sem_wait_task = new ['Task']
setattribute sem_wait_task, 'code', sem_wait_core
setattribute sem_wait_task, 'data', sem
- push sem_wait_task, waiter
interp.'schedule_proxied'(sem_wait_task, sem)
- terminate
-got_lock:
+ wait sem_wait_task
returncc
.end
.sub sem_wait_core
.param pmc data
- .local pmc interp, task, sem, waiter, resume_task
- interp = getinterp
- task = interp.'current_task'()
-
+ .local pmc sem
sem = data
- waiter = pop task
test:
#disablepreemption
if sem > 0 goto lock
@@ -240,10 +240,6 @@ test:
lock:
dec sem
#enablepreemption
-
- resume_task = new ['Task']
- setattribute resume_task, 'code', waiter
- interp.'schedule_proxied'(resume_task, waiter)
.end
.sub sem_unlock_core
View
18 src/ops/core_ops.c
@@ -24272,14 +24272,21 @@ Parrot_wait_p(opcode_t *cur_opcode, PARROT_INTERP) {
Parrot_ex_throw_from_c_args(interp, NULL, EXCEPTION_INVALID_OPERATION, "Argument to wait op must be a Task.\n");
}
- cur_task = Parrot_cx_stop_task(interp, next);
tdata = PARROT_TASK(task);
+ LOCK(tdata->waiters_lock);
+ if (tdata->killed) {
+ UNLOCK(tdata->waiters_lock);
+ return (opcode_t *)next;
+ }
+
+ cur_task = Parrot_cx_stop_task(interp, next);
if (PMC_IS_NULL(tdata->waiters)) {
tdata->waiters = Parrot_pmc_new(interp, enum_class_ResizablePMCArray);
PARROT_GC_WRITE_BARRIER(interp, task);
}
VTABLE_push_pmc(interp, tdata->waiters, cur_task);
+ UNLOCK(tdata->waiters_lock);
return (opcode_t *)0;
return (opcode_t *)cur_opcode + 2;
}
@@ -24295,14 +24302,21 @@ Parrot_wait_pc(opcode_t *cur_opcode, PARROT_INTERP) {
Parrot_ex_throw_from_c_args(interp, NULL, EXCEPTION_INVALID_OPERATION, "Argument to wait op must be a Task.\n");
}
- cur_task = Parrot_cx_stop_task(interp, next);
tdata = PARROT_TASK(task);
+ LOCK(tdata->waiters_lock);
+ if (tdata->killed) {
+ UNLOCK(tdata->waiters_lock);
+ return (opcode_t *)next;
+ }
+
+ cur_task = Parrot_cx_stop_task(interp, next);
if (PMC_IS_NULL(tdata->waiters)) {
tdata->waiters = Parrot_pmc_new(interp, enum_class_ResizablePMCArray);
PARROT_GC_WRITE_BARRIER(interp, task);
}
VTABLE_push_pmc(interp, tdata->waiters, cur_task);
+ UNLOCK(tdata->waiters_lock);
return (opcode_t *)0;
return (opcode_t *)cur_opcode + 2;
}
View
10 src/ops/experimental.ops
@@ -396,13 +396,21 @@ op wait(in PMC) {
Parrot_ex_throw_from_c_args(interp, NULL, EXCEPTION_INVALID_OPERATION,
"Argument to wait op must be a Task.\n");
+ tdata = PARROT_TASK(task);
+ LOCK(tdata->waiters_lock);
+ if (tdata->killed) {
+ UNLOCK(tdata->waiters_lock);
+ goto ADDRESS(next);
+ }
+
cur_task = Parrot_cx_stop_task(interp, next);
- tdata = PARROT_TASK(task);
if (PMC_IS_NULL(tdata->waiters)) {
tdata->waiters = Parrot_pmc_new(interp, enum_class_ResizablePMCArray);
PARROT_GC_WRITE_BARRIER(interp, task);
}
+
VTABLE_push_pmc(interp, tdata->waiters, cur_task);
+ UNLOCK(tdata->waiters_lock);
goto ADDRESS(0);
}
View
2 src/pmc/scheduler.pmc
@@ -62,7 +62,7 @@ Initializes a concurrency scheduler object.
core_struct->handlers = Parrot_pmc_new(INTERP, enum_class_ResizablePMCArray);
core_struct->messages = Parrot_pmc_new(INTERP, enum_class_ResizablePMCArray);
core_struct->task_queue = Parrot_pmc_new(INTERP, enum_class_PMCList);
- core_struct->foreign_tasks = Parrot_pmc_new(INTERP, enum_class_PMCList);
+ core_struct->foreign_tasks = Parrot_pmc_new(INTERP, enum_class_ResizablePMCArray);
core_struct->alarms = Parrot_pmc_new(INTERP, enum_class_PMCList);
core_struct->all_tasks = Parrot_pmc_new(INTERP, enum_class_Hash);
core_struct->next_task_id = 0;
View
26 src/pmc/task.pmc
@@ -34,6 +34,7 @@ pmclass Task provides invokable auto_attrs {
ATTR INTVAL killed; /* Dead tasks don't get run */
ATTR PMC *mailbox; /* List of incoming messages */
ATTR PMC *waiters; /* Tasks waiting on this one */
+ ATTR Parrot_mutex waiters_lock;
ATTR PMC *shared; /* List of variables shared with this task */
ATTR PMC *partner; /* Copy of this task on the other side of a GC barrier,
meaning in another thread */
@@ -65,7 +66,9 @@ Initialize a concurrency task object.
core_struct->mailbox = PMCNULL; /* Created lazily on demand */
core_struct->waiters = PMCNULL; /* Created lazily on demand */
core_struct->shared = Parrot_pmc_new(INTERP, enum_class_ResizablePMCArray);
- core_struct->partner = PMCNULL; /* Set by Parrot_thread_create_local_task */
+ core_struct->partner = NULL; /* Set by Parrot_thread_create_local_task */
+
+ MUTEX_INIT(core_struct->waiters_lock);
/* Assign a unique ID */
/* TODO: Fix collisions. */
@@ -157,7 +160,7 @@ code as the first argument.
TASK_in_preempt_CLEAR(SELF);
if (! task->killed) {
- const int current_depth =
+ const INTVAL current_depth =
Parrot_pcc_get_recursion_depth(interp, CURRENT_CONTEXT(interp));
/* Add the task to the set of active Tasks */
PMC *task_id = Parrot_pmc_new(interp, enum_class_Integer);
@@ -176,11 +179,12 @@ code as the first argument.
/* The task is done. */
/* Remove it from the set of active Tasks */
- int i, n = 0;
+ INTVAL i, n = 0;
PMC * const task_id = Parrot_pmc_new(interp, enum_class_Integer);
VTABLE_set_integer_native(interp, task_id, task->id);
TASK_active_CLEAR(SELF);
VTABLE_delete_keyed(interp, active_tasks, task_id);
+ task->killed = 1;
/* schedule any waiters. */
if (!PMC_IS_NULL(task->waiters))
@@ -190,6 +194,22 @@ code as the first argument.
PMC * const wtask = VTABLE_get_pmc_keyed_int(interp, task->waiters, i);
Parrot_cx_schedule_task(interp, wtask);
}
+
+ if (task->partner) { /* TODO how can we know if the partner's still alive? */
+ Parrot_Task_attributes * const partner_task = PARROT_TASK(task->partner);
+ LOCK(partner_task->waiters_lock);
+ partner_task->killed = 1;
+
+ if (!PMC_IS_NULL(partner_task->waiters)) {
+ n = VTABLE_get_integer(interp, partner_task->waiters);
+ }
+
+ for (i = 0; i < n; ++i) {
+ PMC * const wtask = VTABLE_get_pmc_keyed_int(interp, partner_task->waiters, i);
+ Parrot_cx_schedule_immediate(partner_task->interp, wtask);
+ }
+ UNLOCK(partner_task->waiters_lock);
+ }
}
return (opcode_t*) next;
View
19 src/scheduler.c
@@ -143,7 +143,7 @@ Parrot_cx_outer_runloop(PARROT_INTERP)
ASSERT_ARGS(Parrot_cx_outer_runloop)
PMC * const scheduler = interp->scheduler;
Parrot_Scheduler_attributes * const sched = PARROT_SCHEDULER(scheduler);
- INTVAL alarm_count;
+ INTVAL alarm_count, foreign_count, i;
do {
while (VTABLE_get_integer(interp, scheduler) > 0) {
@@ -163,18 +163,29 @@ Parrot_cx_outer_runloop(PARROT_INTERP)
Parrot_cx_check_alarms(interp, interp->scheduler);
}
+ foreign_count = VTABLE_get_integer(interp, sched->foreign_tasks);
+ for (i = 0; i < foreign_count; i++) {
+ PMC * const task = VTABLE_get_pmc_keyed_int(interp, sched->foreign_tasks, i);
+ if (PARROT_TASK(task)->killed) {
+ VTABLE_delete_keyed_int(interp, sched->foreign_tasks, i);
+ i--;
+ foreign_count--;
+ }
+ }
+
alarm_count = VTABLE_get_integer(interp, sched->alarms);
- if (alarm_count > 0) {
+ if (alarm_count > 0 || foreign_count > 0) {
#ifdef _WIN32
/* TODO: Implement on Windows */
#else
/* Nothing to do except to wait for the next alarm to expire */
- pause();
+ if (alarm_count > 0)
+ pause();
Parrot_thread_notify_threads(interp);
#endif
Parrot_cx_check_alarms(interp, interp->scheduler);
}
- } while (alarm_count);
+ } while (alarm_count || foreign_count);
}
/*
View
7 src/thread.c
@@ -251,6 +251,9 @@ Parrot_thread_create_local_task(PARROT_INTERP, ARGIN(Parrot_Interp const thread_
Parrot_thread_maybe_create_proxy(interp, thread_interp, data));
}
+ /* put the task in a list for GC and for the main thread to know there's still active tasks */
+ VTABLE_push_pmc(interp, PARROT_SCHEDULER(interp->scheduler)->foreign_tasks, task);
+
return local_task;
}
@@ -278,10 +281,6 @@ Parrot_thread_schedule_task(PARROT_INTERP, ARGIN(Interp *thread_interp), ARGIN(P
Parrot_thread_create_local_task(interp, thread_interp, task));
Parrot_unblock_GC_mark(thread_interp);
-
- /* put the task in a list for GC */
- /* TODO get them out again when finished */
- VTABLE_push_pmc(interp, PARROT_SCHEDULER(interp->scheduler)->foreign_tasks, task);
}
/*
View
25 t/src/threads.t
@@ -2,17 +2,16 @@
# Copyright (C) 2011, Parrot Foundation.
.sub main :main
- .local pmc task, sayer, starter, ender, number, interp, end_sub
+ .local pmc task, tasks, sayer, starter, ender, number, interp, end_sub
.local int i
interp = getinterp
sayer = get_global 'sayer'
- starter = new ['Integer']
- ender = new ['Integer']
+ starter = new 'Integer', 0
+ ender = new 'Integer', 0
+ tasks = new 'ResizablePMCArray'
end_sub = get_global 'end_this'
i = 1
- starter = 0
- ender = 0
- say "1..11"
+ say "1..21"
start:
number = new ['String']
number = i
@@ -24,18 +23,27 @@ start:
setattribute task, 'data', number
print "ok "
say number
+ push tasks, task
schedule task
inc i
if i > 10 goto end
goto start
end:
starter = 1
- sleep 1 # give threads time to run. Replace by join once that's implemented
+wait_for_tasks:
+ task = shift tasks
+ wait task
+ print "ok "
+ say i
+ inc i
+ if i > 20 goto check
+ goto wait_for_tasks
+check:
if ender == 1 goto win
say "not ok"
goto done
win:
- say "ok 11"
+ say "ok 21"
done:
.end
@@ -57,6 +65,7 @@ end:
setattribute end_task, 'code', end_sub
setattribute end_task, 'data', ender
interp.'schedule_proxied'(end_task, ender)
+ returncc
.end
.sub end_this

0 comments on commit 65e2704

Please sign in to comment.