Skip to content
Browse files

[concurrency] Initial implementation of concurrency scheduler runloop.

git-svn-id: https://svn.parrot.org/parrot/trunk@23460 d31e2699-5ff4-0310-a27c-f18f2fbe73fe
  • Loading branch information...
1 parent 74ef179 commit e9221160589a58fd58004fbe2fc7e545cfd92d79 @allisonrandal allisonrandal committed Dec 4, 2007
Showing with 378 additions and 6 deletions.
  1. +2 −0 config/gen/makefiles/root.in
  2. +43 −1 include/parrot/scheduler.h
  3. +5 −0 src/gc/dod.c
  4. +2 −0 src/inter_create.c
  5. +20 −0 src/ops/core.ops
  6. +1 −0 src/ops/ops.num
  7. +36 −5 src/pmc/scheduler.pmc
  8. +32 −0 src/pmc/task.pmc
  9. +237 −0 src/scheduler.c
View
2 config/gen/makefiles/root.in
@@ -440,6 +440,7 @@ INTERP_O_FILES = \
$(SRC_DIR)/pmc$(O) \
$(SRC_DIR)/revision$(O) \
$(SRC_DIR)/runops_cores$(O) \
+ $(SRC_DIR)/scheduler$(O) \
$(SRC_DIR)/spf_render$(O) \
$(SRC_DIR)/spf_vtable$(O) \
$(SRC_DIR)/stack_common$(O) \
@@ -594,6 +595,7 @@ STR_FILES = \
$(SRC_DIR)/mmd.str \
$(SRC_DIR)/pmc.str \
$(SRC_DIR)/oo.str \
+ $(SRC_DIR)/scheduler.str \
$(SRC_DIR)/objects.str \
$(SRC_DIR)/spf_render.str \
$(SRC_DIR)/spf_vtable.str \
View
44 include/parrot/scheduler.h
@@ -17,6 +17,25 @@
/* HEADERIZER BEGIN: src/scheduler.c */
+PARROT_API
+void Parrot_cx_init_scheduler(PARROT_INTERP)
+ __attribute__nonnull__(1);
+
+PARROT_API
+void Parrot_cx_runloop_end(PARROT_INTERP)
+ __attribute__nonnull__(1);
+
+PARROT_API
+void Parrot_cx_schedule_task(PARROT_INTERP, NOTNULL(PMC *task))
+ __attribute__nonnull__(1)
+ __attribute__nonnull__(2);
+
+void Parrot_cx_runloop_sleep(NOTNULL(PMC *scheduler))
+ __attribute__nonnull__(1);
+
+void Parrot_cx_runloop_wake(PARROT_INTERP, NOTNULL(PMC *scheduler))
+ __attribute__nonnull__(1)
+ __attribute__nonnull__(2);
/* HEADERIZER END: src/scheduler.c */
@@ -54,7 +73,8 @@ typedef struct Parrot_Task {
* Scheduler private flags
*/
typedef enum {
- SCHEDULER_cache_valid_FLAG = PObj_private0_FLAG
+ SCHEDULER_cache_valid_FLAG = PObj_private0_FLAG,
+ SCHEDULER_terminate_runloop_FLAG = PObj_private1_FLAG
} scheduler_flags_enum;
#define SCHEDULER_get_FLAGS(o) (PObj_get_FLAGS(o))
@@ -67,6 +87,28 @@ typedef enum {
#define SCHEDULER_cache_valid_SET(o) SCHEDULER_flag_SET(cache_valid, o)
#define SCHEDULER_cache_valid_CLEAR(o) SCHEDULER_flag_CLEAR(cache_valid, o)
+/* Mark if the scheduler should terminate the scheduler runloop */
+#define SCHEDULER_terminate_runloop_TEST(o) SCHEDULER_flag_TEST(terminate_runloop, o)
+#define SCHEDULER_terminate_runloop_SET(o) SCHEDULER_flag_SET(terminate_runloop, o)
+#define SCHEDULER_terminate_runloop_CLEAR(o) SCHEDULER_flag_CLEAR(terminate_runloop, o)
+
+/*
+ * Task private flags
+ */
+typedef enum {
+ TASK_terminate_runloop_FLAG = PObj_private0_FLAG
+} task_flags_enum;
+
+#define TASK_get_FLAGS(o) (PObj_get_FLAGS(o))
+#define TASK_flag_TEST(flag, o) (TASK_get_FLAGS(o) & TASK_ ## flag ## _FLAG)
+#define TASK_flag_SET(flag, o) (TASK_get_FLAGS(o) |= TASK_ ## flag ## _FLAG)
+#define TASK_flag_CLEAR(flag, o) (TASK_get_FLAGS(o) &= ~(UINTVAL)(TASK_ ## flag ## _FLAG))
+
+/* Mark a task to terminate the scheduler runloop */
+#define TASK_terminate_runloop_TEST(o) TASK_flag_TEST(terminate_runloop, o)
+#define TASK_terminate_runloop_SET(o) TASK_flag_SET(terminate_runloop, o)
+#define TASK_terminate_runloop_CLEAR(o) TASK_flag_CLEAR(terminate_runloop, o)
+
#endif /* PARROT_SCHEDULER_H_GUARD */
/*
View
5 src/gc/dod.c
@@ -309,6 +309,11 @@ Parrot_dod_trace_root(PARROT_INTERP, int trace_stack)
/* mark the root_namespace */
pobject_lives(interp, (PObj *)interp->root_namespace);
+ /* mark the concurrency scheduler */
+ if (interp->scheduler)
+ pobject_lives(interp, (PObj *)interp->scheduler);
+
+
/* s. packfile.c */
mark_const_subs(interp);
View
2 src/inter_create.c
@@ -263,6 +263,7 @@ make_interpreter(NULLOK(Interp *parent), INTVAL flags)
interp->thread_data = NULL;
Parrot_init_events(interp);
+/* Parrot_cx_init_scheduler(interp); */
#ifdef ATEXIT_DESTROY
/*
@@ -376,6 +377,7 @@ Parrot_really_destroy(PARROT_INTERP, SHIM(int exit_code), SHIM(void *arg))
if (!interp->parent_interpreter) {
PIO_internal_shutdown(interp);
Parrot_kill_event_loop(interp);
+/* Parrot_cx_runloop_end(interp); */
}
/* we destroy all child interpreters and the last one too,
View
20 src/ops/core.ops
@@ -677,6 +677,26 @@ inline op get_addr(out INT, invar PMC) {
########################################
+=head2 Concurrency operations
+
+=over 4
+
+=item B<schedule>(invar PMC)
+
+Register a task with the concurrency scheduler. Details about the task are
+stored within the task PMC.
+
+inline op schedule(invar PMC) {
+ Parrot_cx_schedule_task(interp, $1);
+ goto NEXT();
+}
+
+=back
+
+=cut
+
+########################################
+
=head2 Exception handling
=over 4
View
1 src/ops/ops.num
@@ -1245,3 +1245,4 @@ stm_commit_ic 1214
stm_wait_ic 1215
stm_abort 1216
stm_depth_i 1217
+schedule_p 1218
View
41 src/pmc/scheduler.pmc
@@ -108,15 +108,18 @@ current maximum.
*/
- void push_pmc(PMC *value) {
+ void push_pmc(PMC *task) {
Parrot_Scheduler * const core_struct = PARROT_SCHEDULER(SELF);
- PMC * const key = pmc_new(INTERP, enum_class_Integer);
+ PMC * const task_id = pmc_new(INTERP, enum_class_Integer);
core_struct->max_tid++;
- VTABLE_set_integer_native(INTERP, key, core_struct->max_tid);
+ VTABLE_set_integer_native(INTERP, task_id, core_struct->max_tid);
- VTABLE_set_pmc_keyed(INTERP, core_struct->task_list, key, value);
- VTABLE_push_pmc(INTERP, core_struct->task_index, key);
+ VTABLE_set_integer_native(INTERP, task, core_struct->max_tid);
+
+ VTABLE_set_pmc_keyed(INTERP, core_struct->task_list, task_id, task);
+ VTABLE_push_pmc(INTERP, core_struct->task_index, task_id);
+ Parrot_cx_runloop_wake(core_struct->interp, SELF);
}
/*
@@ -180,6 +183,33 @@ Removes the task with the given task ID from the task list.
/*
+=item C<PMC *share_ro()>
+
+Set this PMC as shared.
+
+=cut
+
+*/
+
+ PMC *share_ro() {
+ PMC *shared_self;
+ Parrot_Scheduler *shared_struct;
+
+ if (PObj_is_PMC_shared_TEST(SELF))
+ return SELF;
+
+ shared_self = pt_shared_fixup(INTERP, SELF);
+ shared_struct = PARROT_SCHEDULER(shared_self);
+
+ shared_struct->task_list = pt_shared_fixup(INTERP, shared_struct->task_list);
+ shared_struct->task_index = pt_shared_fixup(INTERP, shared_struct->task_index);
+ shared_struct->handlers = pt_shared_fixup(INTERP, shared_struct->handlers);
+
+ return shared_self;
+ }
+
+/*
+
=item C<void destroy()>
Free the scheduler's underlying struct.
@@ -188,6 +218,7 @@ Free the scheduler's underlying struct.
*/
void destroy() {
+ Parrot_Scheduler * const core_struct = PARROT_SCHEDULER(SELF);
mem_sys_free(PMC_data(SELF));
}
View
32 src/pmc/task.pmc
@@ -49,6 +49,9 @@ Initialize a concurrency task object.
core_struct->codeblock = PMCNULL;
core_struct->interp = PMCNULL;
+ /* Make sure the flag is cleared by default */
+ TASK_terminate_runloop_CLEAR(SELF);
+
}
/*
@@ -212,6 +215,35 @@ Sets the value of an attribute for this task.
/*
+=item C<INTVAL get_integer()>
+
+Retrieves the task ID for this task.
+
+=cut
+
+*/
+ INTVAL get_integer() {
+ Parrot_Task * const core_struct = PARROT_TASK(SELF);
+ return core_struct->id;
+ }
+
+/*
+
+=item C<void set_integer_native(INTVAL value)>
+
+Sets the task ID of the task.
+
+=cut
+
+*/
+
+ void set_integer_native(INTVAL value) {
+ Parrot_Task * const core_struct = PARROT_TASK(SELF);
+ core_struct->id = value;
+ }
+
+/*
+
=item C<void destroy()>
Free the task's underlying struct.
View
237 src/scheduler.c
@@ -0,0 +1,237 @@
+/*
+Copyright (C) 2007, The Perl Foundation.
+$Id: $
+
+=head1 NAME
+
+src/scheduler.c - The core routines for the concurrency scheduler
+
+=head1 DESCRIPTION
+
+Each interpreter has a concurrency scheduler element in its core struct. The
+scheduler is responsible for receiveing, dispatching, and monitoring events,
+exceptions, async I/O, and concurrent tasks (threads).
+
+=head2 Functions
+
+=over 4
+
+=cut
+
+*/
+
+#include "parrot/parrot.h"
+
+/* HEADERIZER HFILE: include/parrot/scheduler.h */
+
+/* HEADERIZER BEGIN: static */
+
+static int Parrot_cx_handle_tasks(PARROT_INTERP, NOTNULL(PMC *scheduler))
+ __attribute__nonnull__(1)
+ __attribute__nonnull__(2);
+
+PARROT_WARN_UNUSED_RESULT
+PARROT_CAN_RETURN_NULL
+static void* scheduler_runloop(NOTNULL(PMC *scheduler))
+ __attribute__nonnull__(1);
+
+/* HEADERIZER END: static */
+
+/*
+
+=item C<PARROT_API
+void
+Parrot_cx_init_scheduler(PARROT_INTERP)>
+
+Initalize the concurrency scheduler for the interpreter.
+
+=cut
+
+*/
+
+PARROT_API
+void
+Parrot_cx_init_scheduler(PARROT_INTERP)
+{
+ if (!interp->parent_interpreter) {
+ Parrot_thread runloop_handle;
+ PMC *scheduler;
+
+ scheduler = pmc_new(interp, enum_class_Scheduler);
+ scheduler = VTABLE_share_ro(interp, scheduler);
+
+ interp->scheduler = scheduler;
+
+ /* Start the scheduler runloop */
+ THREAD_CREATE_DETACHED(runloop_handle, scheduler_runloop, scheduler);
+ }
+}
+
+/*
+
+=item C<PARROT_WARN_UNUSED_RESULT
+PARROT_CAN_RETURN_NULL
+static void*
+scheduler_runloop(NOTNULL(PMC *data))>
+
+The scheduler runloop is started by the interpreter. It manages the flow of
+concurrent scheduling for the parent interpreter, and for lightweight
+concurrent tasks running within that interpreter. More complex concurrent tasks
+have their own runloop.
+
+Currently the runloop is implented as a mutex/lock thread.
+
+=cut
+
+*/
+
+PARROT_WARN_UNUSED_RESULT
+PARROT_CAN_RETURN_NULL
+static void*
+scheduler_runloop(NOTNULL(PMC *scheduler))
+{
+ Parrot_Scheduler * const sched_struct = PARROT_SCHEDULER(scheduler);
+ int running = 1;
+
+ COND_INIT(sched_struct->condition);
+ MUTEX_INIT(sched_struct->lock);
+ fprintf(stderr, "started scheduler runloop\n");
+ LOCK(sched_struct->lock);
+
+ while (running) {
+ /* Process pending tasks, if there are any */
+ if (VTABLE_get_integer(sched_struct->interp, scheduler) > 0) {
+ fprintf(stderr, "handling tasks in scheduler runloop\n");
+ running = Parrot_cx_handle_tasks(sched_struct->interp, scheduler);
+ }
+ else {
+ /* Otherwise, the runloop sleeps until a task is pending */
+ fprintf(stderr, "sleeping in scheduler runloop\n");
+ Parrot_cx_runloop_sleep(scheduler);
+ }
+ } /* end runloop */
+
+ fprintf(stderr, "ended scheduler runloop\n");
+
+ UNLOCK(sched_struct->lock);
+
+ COND_DESTROY(sched_struct->condition);
+ MUTEX_DESTROY(sched_struct->lock);
+
+ return NULL;
+}
+
+/*
+
+=item C<static int
+Parrot_cx_handle_tasks(PARROT_INTERP, NOTNULL(PMC *scheduler))>
+
+Handle the pending tasks in the scheduler's task list. Returns when there are
+no more pending tasks. Returns 0 to terminate the scheduler runloop, or 1 to
+continue the runloop.
+
+=cut
+
+*/
+
+static int
+Parrot_cx_handle_tasks(PARROT_INTERP, NOTNULL(PMC *scheduler))
+{
+ while (VTABLE_get_integer(interp, scheduler) > 0) {
+ PMC *task = VTABLE_pop_pmc(interp, scheduler);
+ INTVAL tid = VTABLE_get_integer(interp, task);
+
+ /* When sent a terminate task, notify the scheduler */
+ if (TASK_terminate_runloop_TEST(task)) {
+ SCHEDULER_terminate_runloop_SET(scheduler);
+ }
+ fprintf(stderr, "Found task ID # %d\n", (int) tid);
+ VTABLE_delete_keyed_int(interp, scheduler, tid);
+ } /* end of pending tasks */
+
+ /* When there are no more pending tasks, if the scheduler received a
+ * terminate event, end the scheduler runloop. */
+ if (SCHEDULER_terminate_runloop_TEST(scheduler)) {
+ return 0;
+ }
+
+ return 1;
+}
+
+/*
+
+=item C<void Parrot_cx_runloop_sleep(PARROT_INTERP, NOTNULL(PMC *scheduler))>
+
+Pause the scheduler runloop. Called when there are no more pending tasks in the
+scheduler's task list, to freeze the runloop until there are tasks to handle.
+
+=cut
+
+*/
+
+void
+Parrot_cx_runloop_sleep(NOTNULL(PMC *scheduler))
+{
+ Parrot_Scheduler * const sched_struct = PARROT_SCHEDULER(scheduler);
+ COND_WAIT(sched_struct->condition, sched_struct->lock);
+}
+
+/*
+
+=item C<void Parrot_cx_runloop_wake(PARROT_INTERP, NOTNULL(PMC *scheduler))>
+
+Wake a sleeping scheduler runloop (generally called when new tasks are added to
+the scheduler's task list).
+
+=cut
+
+*/
+
+void
+Parrot_cx_runloop_wake(PARROT_INTERP, NOTNULL(PMC *scheduler))
+{
+ Parrot_Scheduler * const sched_struct = PARROT_SCHEDULER(scheduler);
+ COND_SIGNAL(sched_struct->condition);
+}
+
+
+/*
+
+=item C<PARROT_API
+void
+Parrot_cx_runloop_end(PARROT_INTERP)>
+
+Schedule an event to terminate the scheduler runloop.
+
+=cut
+
+*/
+
+PARROT_API
+void
+Parrot_cx_runloop_end(PARROT_INTERP)
+{
+ PMC *term_event = pmc_new(interp, enum_class_Task);
+ TASK_terminate_runloop_SET(term_event);
+ Parrot_cx_schedule_task(interp, term_event);
+}
+
+/*
+
+=item C<PARROT_API
+void
+Parrot_cx_schedule_task(PARROT_INTERP, NOTNULL(PMC *task))>
+
+Add a task to scheduler's task list.
+
+=cut
+
+*/
+
+PARROT_API
+void
+Parrot_cx_schedule_task(PARROT_INTERP, NOTNULL(PMC *task))
+{
+ VTABLE_push_pmc(interp, interp->scheduler, task);
+}
+

0 comments on commit e922116

Please sign in to comment.
Something went wrong with that request. Please try again.