Permalink
Browse files

Experimental changes to Channel and Coro to be able to schedule corou…

…tines on several threads (which kinda defeats the principle of cooperative multitasking, but oh well)
  • Loading branch information...
1 parent 0badfbf commit d3c16c3c1cb63d967a1aa991689ca9d04da77c99 nddrylliog committed Sep 15, 2010
Showing with 51 additions and 49 deletions.
  1. +50 −49 sdk/os/Channel.ooc
  2. +1 −0 sdk/os/Coro.ooc
View
@@ -1,4 +1,5 @@
-import structs/[ArrayList, LinkedList], os/Time, os/Coro
+import structs/[ArrayList, LinkedList], os/[Time, Coro]
+import threading/Thread
coros := LinkedList<Coro> new()
deadCoros := LinkedList<Coro> new()
@@ -18,55 +19,56 @@ GC_add_roots: extern func (Pointer, Pointer)
GC_remove_roots: extern func (Pointer, Pointer)
GC_stackbottom: extern Pointer
+schedulerMutex := Mutex new()
+
scheduler: func {
- mainCoro initializeMainCoro()
+ if(!mainCoro isMain) mainCoro initializeMainCoro()
while(true) {
- if(coros empty?() && newCoros empty?()) break
-
- i := 0
- for(coro in coros) {
- //"Main coro %p dispatching to coro %p, %d/%d" printfln(mainCoro, coro, i + 1, coros getSize())
- switchTo(coro)
- if(!deadCoros empty?() || !newCoros empty?()) {
- //"Dead coros / new coros, breaking!" println()
- break
- }
- i += 1
- }
-
+ schedulerMutex lock()
if(!newCoros empty?()) {
- //"Adding %d new coros" printfln(newCoros getSize())
- for(info in newCoros) {
- newCoro := Coro new()
- coros add(newCoro)
- //"Just added coro %p!" printfln(newCoro)
- oldCoro := currentCoro
- currentCoro = newCoro
-
- oldCoro startCoro(currentCoro, ||
- stackBase := currentCoro stack
- stackSize := currentCoro allocatedStackSize
- oldStackBase := GC_stackbottom
- // Adjust the stackbottom and add our Coro's stack as a root for the GC
- GC_stackbottom = stackBase
- GC_add_roots(stackBase, stackBase + stackSize)
- //"Coro started!" println()
- info c()
- //"Terminating a coro!" printfln()
- GC_stackbottom = oldStackBase
- GC_remove_roots(stackBase, stackBase + stackSize)
- terminate()
- )
- }
- newCoros clear()
+ info := newCoros removeAt(0)
+ schedulerMutex unlock()
+
+ newCoro := Coro new()
+ coros add(newCoro)
+ "Just added coro %p!" printfln(newCoro)
+ oldCoro := currentCoro
+ currentCoro = newCoro
+
+ oldCoro startCoro(currentCoro, ||
+ stackBase := currentCoro stack
+ stackSize := currentCoro allocatedStackSize
+ oldStackBase := GC_stackbottom
+ // Adjust the stackbottom and add our Coro's stack as a root for the GC
+ GC_stackbottom = stackBase
+ GC_add_roots(stackBase, stackBase + stackSize)
+ "Coro started!" println()
+ info c()
+ "Terminating a coro!" printfln()
+ GC_stackbottom = oldStackBase
+ GC_remove_roots(stackBase, stackBase + stackSize)
+ terminate()
+ )
+ "Started coro yielded! " println()
+ continue
}
- if(!deadCoros empty?()) {
- //"Cleaning up %d dead coros" printfln(deadCoros getSize())
- for(deadCoro in deadCoros) { coros remove(deadCoro) }
- deadCoros clear()
+ if(coros empty?()) break // stop the scheduler
+
+ coro := coros removeAt(0)
+ schedulerMutex unlock()
+ switchTo(coro)
+
+ schedulerMutex lock()
+ if(deadCoros contains?(coro)) {
+ // dead for dead.
+ deadCoros remove(coro)
+ } else {
+ // reschedule it
+ coros add(coro)
}
+ schedulerMutex unlock()
}
}
@@ -75,21 +77,20 @@ Channel: class <T> {
queue := LinkedList<T> new()
send: func (t: T) {
- //"Sending %d" printfln(t as Int)
+ "Sending %d" printfln(t as Int)
queue add(t)
- while(queue size() >= 100) {
- //"Queue filled, yielding"
+ while(queue size >= 100) {
+ "Queue filled, yielding" println()
yield()
}
}
recv: func -> T {
while(true) {
if(!queue empty?()) {
- val := queue removeAt(0)
- return val
+ return queue removeAt(0)
}
- //"Queue empty, yielding"
+ "Queue empty, yielding" println()
yield()
}
// yay hacks
View
@@ -75,6 +75,7 @@ Coro: class {
}
initializeMainCoro: func {
+
isMain = true
}

0 comments on commit d3c16c3

Please sign in to comment.