Skip to content

Commit

Permalink
Move all access to task_queue to scheduler.pmc and protect by a lock
Browse files Browse the repository at this point in the history
Pushing tasks to other threads' interpreters should now be safe.
  • Loading branch information
niner committed Nov 12, 2011
1 parent 49d2ff2 commit 04717ae
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 27 deletions.
55 changes: 44 additions & 11 deletions src/pmc/scheduler.pmc
Expand Up @@ -30,6 +30,7 @@ pmclass Scheduler auto_attrs {
between schedulers. */ between schedulers. */


ATTR PMC *task_queue; /* List of tasks/green threads waiting to run */ ATTR PMC *task_queue; /* List of tasks/green threads waiting to run */
ATTR Parrot_mutex task_queue_lock;
ATTR PMC *alarms; /* List of future alarms ordered by time */ ATTR PMC *alarms; /* List of future alarms ordered by time */


ATTR PMC *all_tasks; /* Hash of all active tasks by ID */ ATTR PMC *all_tasks; /* Hash of all active tasks by ID */
Expand Down Expand Up @@ -65,6 +66,8 @@ Initializes a concurrency scheduler object.
core_struct->next_task_id = 0; core_struct->next_task_id = 0;
core_struct->interp = INTERP; core_struct->interp = INTERP;


MUTEX_INIT(core_struct->task_queue_lock);

/* Chandon TODO: Delete from int-keyed hash doesn't like me. */ /* Chandon TODO: Delete from int-keyed hash doesn't like me. */
/* VTABLE_set_integer_native(interp, core_struct->all_tasks, Hash_key_type_int); */ /* VTABLE_set_integer_native(interp, core_struct->all_tasks, Hash_key_type_int); */


Expand Down Expand Up @@ -106,39 +109,63 @@ An C<Integer> representing the unique identifier for this scheduler.
core_struct->id = VTABLE_get_integer(INTERP, elem); core_struct->id = VTABLE_get_integer(INTERP, elem);
} }



/* /*


=item C<void push_pmc(PMC *value)> =item C<void push_pmc(PMC *value)>


Inserts a task into the task list, giving it a task ID one higher than the Inserts a task into the task list.
current maximum, and a birthtime of the current time.


=cut =cut


*/ */


void push_pmc(PMC *task) { void push_pmc(PMC *task) {
/* TODO: This doesn't appear to do anything */
Parrot_Scheduler_attributes * const core_struct = PARROT_SCHEDULER(SELF); Parrot_Scheduler_attributes * const core_struct = PARROT_SCHEDULER(SELF);
PMC * const type_pmc = VTABLE_get_attr_str(interp, task, CONST_STRING(interp, "type"));
STRING * const type = VTABLE_get_string(interp, type_pmc); LOCK(core_struct->task_queue_lock);
VTABLE_push_pmc(INTERP, core_struct->task_queue, task);
UNLOCK(core_struct->task_queue_lock);
}


/*

=item C<void unshift_pmc(PMC *value)>

Inserts a task into the head of the task list.

=cut

*/

void unshift_pmc(PMC *task) {
Parrot_Scheduler_attributes * const core_struct = PARROT_SCHEDULER(SELF);

LOCK(core_struct->task_queue_lock);
VTABLE_unshift_pmc(INTERP, core_struct->task_queue, task);
UNLOCK(core_struct->task_queue_lock);
} }




/* /*


=item C<PMC *pop_pmc()> =item C<PMC *shift_pmc()>


Retrieves the next task from the task list. If the task index is invalid, Retrieves the next task from the task list.
recalculates it before retrieving the next task.


=cut =cut


*/ */


VTABLE PMC *pop_pmc() { VTABLE PMC *shift_pmc() {
Parrot_Scheduler_attributes * const core_struct = PARROT_SCHEDULER(SELF); Parrot_Scheduler_attributes * const core_struct = PARROT_SCHEDULER(SELF);
PMC * const task = VTABLE_shift_pmc(INTERP, core_struct->task_queue); PMC * task;

LOCK(core_struct->task_queue_lock);
task = VTABLE_shift_pmc(INTERP, core_struct->task_queue);
UNLOCK(core_struct->task_queue_lock);

return task; return task;
} }


Expand All @@ -155,7 +182,13 @@ Retrieves the number of pending tasks in the scheduler's task list.


VTABLE INTVAL get_integer() { VTABLE INTVAL get_integer() {
Parrot_Scheduler_attributes * const core_struct = PARROT_SCHEDULER(SELF); Parrot_Scheduler_attributes * const core_struct = PARROT_SCHEDULER(SELF);
return VTABLE_elements(INTERP, core_struct->task_queue); INTVAL elements;

LOCK(core_struct->task_queue_lock);
elements = VTABLE_elements(INTERP, PARROT_SCHEDULER(SELF)->task_queue);
UNLOCK(core_struct->task_queue_lock);

return elements;
} }




Expand Down
16 changes: 7 additions & 9 deletions src/scheduler.c
Expand Up @@ -146,7 +146,7 @@ Parrot_cx_outer_runloop(PARROT_INTERP)
INTVAL alarm_count; INTVAL alarm_count;


do { do {
while (VTABLE_get_integer(interp, sched->task_queue) > 0) { while (VTABLE_get_integer(interp, scheduler) > 0) {
/* there can be no active runloops at this point, so it should be save /* there can be no active runloops at this point, so it should be save
* to start counting at 0 again. This way the continuation in the next * to start counting at 0 again. This way the continuation in the next
* task will find a runloop with id 1 when encountering an exception */ * task will find a runloop with id 1 when encountering an exception */
Expand Down Expand Up @@ -210,7 +210,7 @@ Parrot_cx_next_task(PARROT_INTERP, ARGIN(PMC *scheduler))
{ {
ASSERT_ARGS(Parrot_cx_next_task) ASSERT_ARGS(Parrot_cx_next_task)
Parrot_Scheduler_attributes * const sched = PARROT_SCHEDULER(scheduler); Parrot_Scheduler_attributes * const sched = PARROT_SCHEDULER(scheduler);
PMC * const task = VTABLE_shift_pmc(interp, sched->task_queue); PMC * const task = VTABLE_shift_pmc(interp, scheduler);


interp->cur_task = task; interp->cur_task = task;


Expand All @@ -221,7 +221,7 @@ Parrot_cx_next_task(PARROT_INTERP, ARGIN(PMC *scheduler))
#ifdef _WIN32 #ifdef _WIN32
/* TODO: Implement on Windows */ /* TODO: Implement on Windows */
#else #else
if (VTABLE_get_integer(interp, sched->task_queue) > 0) if (VTABLE_get_integer(interp, scheduler) > 0)
Parrot_cx_enable_preemption(interp); Parrot_cx_enable_preemption(interp);
else else
Parrot_cx_disable_preemption(interp); Parrot_cx_disable_preemption(interp);
Expand Down Expand Up @@ -368,9 +368,8 @@ opcode_t*
Parrot_cx_preempt_task(PARROT_INTERP, ARGIN(PMC *scheduler), ARGIN(opcode_t *next)) Parrot_cx_preempt_task(PARROT_INTERP, ARGIN(PMC *scheduler), ARGIN(opcode_t *next))
{ {
ASSERT_ARGS(Parrot_cx_preempt_task) ASSERT_ARGS(Parrot_cx_preempt_task)
Parrot_Scheduler_attributes * const sched = PARROT_SCHEDULER(scheduler);
PMC * const task = Parrot_cx_stop_task(interp, next); PMC * const task = Parrot_cx_stop_task(interp, next);
VTABLE_push_pmc(interp, sched->task_queue, task); VTABLE_push_pmc(interp, scheduler, task);


return (opcode_t*) 0; return (opcode_t*) 0;
} }
Expand Down Expand Up @@ -413,7 +412,6 @@ void
Parrot_cx_schedule_task(PARROT_INTERP, ARGIN(PMC *task_or_sub)) Parrot_cx_schedule_task(PARROT_INTERP, ARGIN(PMC *task_or_sub))
{ {
ASSERT_ARGS(Parrot_cx_schedule_task) ASSERT_ARGS(Parrot_cx_schedule_task)
Parrot_Scheduler_attributes * const sched = PARROT_SCHEDULER(interp->scheduler);
PMC * task = PMCNULL; PMC * task = PMCNULL;
int index; int index;


Expand Down Expand Up @@ -446,12 +444,12 @@ Parrot_cx_schedule_task(PARROT_INTERP, ARGIN(PMC *task_or_sub))
Parrot_thread_run(interp, thread, task, NULL); Parrot_thread_run(interp, thread, task, NULL);
} }
else { else {
VTABLE_push_pmc(interp, sched->task_queue, task); VTABLE_push_pmc(interp, interp->scheduler, task);


#ifdef _WIN32 #ifdef _WIN32
#else #else
/* going from single to multi tasking? */ /* going from single to multi tasking? */
if (VTABLE_get_integer(interp, sched->task_queue) == 1) if (VTABLE_get_integer(interp, interp->scheduler) == 1)
Parrot_cx_enable_preemption(interp); Parrot_cx_enable_preemption(interp);
#endif #endif
} }
Expand Down Expand Up @@ -490,7 +488,7 @@ Parrot_cx_schedule_immediate(PARROT_INTERP, ARGIN(PMC *task_or_sub))
"Can only schedule Tasks and Subs.\n"); "Can only schedule Tasks and Subs.\n");
} }


VTABLE_unshift_pmc(interp, sched->task_queue, task); VTABLE_unshift_pmc(interp, interp->scheduler, task);
SCHEDULER_wake_requested_SET(interp->scheduler); SCHEDULER_wake_requested_SET(interp->scheduler);
SCHEDULER_resched_requested_SET(interp->scheduler); SCHEDULER_resched_requested_SET(interp->scheduler);
} }
Expand Down
10 changes: 4 additions & 6 deletions src/thread.c
Expand Up @@ -138,13 +138,11 @@ void
Parrot_thread_schedule_task(PARROT_INTERP, ARGIN(PMC *thread), ARGIN(PMC *task)) Parrot_thread_schedule_task(PARROT_INTERP, ARGIN(PMC *thread), ARGIN(PMC *task))
{ {
ASSERT_ARGS(Parrot_thread_schedule_task) ASSERT_ARGS(Parrot_thread_schedule_task)
PMC * const self = (PMC*) thread; PMC * const self = (PMC*) thread;
Parrot_Interp thread_interp = Parrot_Interp const thread_interp =
(Parrot_Interp)((Parrot_ParrotInterpreter_attributes *)PMC_data(self))->interp; (Parrot_Interp)((Parrot_ParrotInterpreter_attributes *)PMC_data(self))->interp;
PMC * const scheduler = thread_interp->scheduler;
Parrot_Scheduler_attributes * const sched = PARROT_SCHEDULER(thread_interp->scheduler);


VTABLE_push_pmc(thread_interp, sched->task_queue, task); VTABLE_push_pmc(thread_interp, thread_interp->scheduler, task);
} }


/* /*
Expand Down Expand Up @@ -176,7 +174,7 @@ Parrot_thread_outer_runloop(ARGIN_NULLOK(void *arg))
/* interp->lo_var_ptr = &lo_var_ptr; */ /* interp->lo_var_ptr = &lo_var_ptr; */


do { do {
while (VTABLE_get_integer(interp, sched->task_queue) > 0) { while (VTABLE_get_integer(interp, scheduler) > 0) {
/* there can be no active runloops at this point, so it should be save /* there can be no active runloops at this point, so it should be save
* to start counting at 0 again. This way the continuation in the next * to start counting at 0 again. This way the continuation in the next
* task will find a runloop with id 1 when encountering an exception */ * task will find a runloop with id 1 when encountering an exception */
Expand Down
7 changes: 6 additions & 1 deletion t/src/threads.t
@@ -1,4 +1,5 @@
#!./parrot #!./parrot
# Copyright (C) 2011, Parrot Foundation.


.sub main :main .sub main :main
.local pmc task, sayer, name, starter, ender, number .local pmc task, sayer, name, starter, ender, number
Expand Down Expand Up @@ -53,4 +54,8 @@ end:
set_global 'ender', ender set_global 'ender', ender
.end .end


# vim: ft=pir expandtab shiftwidth=4 cinoptions='\:2=2' : # Local Variables:
# mode: pir
# fill-column: 100
# End:
# vim: expandtab shiftwidth=4 ft=pir:

0 comments on commit 04717ae

Please sign in to comment.