From 81f976cdf0474bd73777c663612531d4b9909bef Mon Sep 17 00:00:00 2001 From: Jaden Weiss Date: Sun, 15 Sep 2019 23:37:25 -0400 Subject: [PATCH 1/4] initial work on interrupt-driven scheduling --- compiler/compiler.go | 1 + compiler/goroutine-lowering.go | 14 ++++--- src/runtime/intq.go | 72 ++++++++++++++++++++++++++++++++++ src/runtime/runtime.go | 17 +++++++- src/runtime/scheduler.go | 63 ++++++++++++++++++++++------- 5 files changed, 145 insertions(+), 22 deletions(-) create mode 100644 src/runtime/intq.go diff --git a/compiler/compiler.go b/compiler/compiler.go index cba7013556..6214c12877 100644 --- a/compiler/compiler.go +++ b/compiler/compiler.go @@ -34,6 +34,7 @@ const tinygoPath = "github.com/tinygo-org/tinygo" // during TinyGo optimization passes so they have to be marked as external // linkage until all TinyGo passes have finished. var functionsUsedInTransforms = []string{ + "runtime.wrapMain", "runtime.alloc", "runtime.free", "runtime.scheduler", diff --git a/compiler/goroutine-lowering.go b/compiler/goroutine-lowering.go index 45465b2f82..eaec0a9e39 100644 --- a/compiler/goroutine-lowering.go +++ b/compiler/goroutine-lowering.go @@ -126,6 +126,8 @@ type asyncFunc struct { // coroutine or the tasks implementation of goroutines, and whether goroutines // are necessary at all. func (c *Compiler) LowerGoroutines() error { + realMain := c.mod.NamedFunction(c.ir.MainPkg().Pkg.Path() + ".main") + c.mod.NamedFunction("runtime.mainFunc").ReplaceAllUsesWith(realMain) switch c.selectScheduler() { case "coroutines": return c.lowerCoroutines() @@ -146,10 +148,11 @@ func (c *Compiler) lowerTasks() error { mainCall := uses[0] realMain := c.mod.NamedFunction(c.ir.MainPkg().Pkg.Path() + ".main") + wrapMain := c.mod.NamedFunction("runtime.wrapMain") if len(getUses(c.mod.NamedFunction("runtime.startGoroutine"))) != 0 || len(getUses(c.mod.NamedFunction("runtime.yield"))) != 0 { // Program needs a scheduler. Start main.main as a goroutine and start // the scheduler. - realMainWrapper := c.createGoroutineStartWrapper(realMain) + realMainWrapper := c.createGoroutineStartWrapper(wrapMain) c.builder.SetInsertPointBefore(mainCall) zero := llvm.ConstInt(c.uintptrType, 0, false) c.createRuntimeCall("startGoroutine", []llvm.Value{realMainWrapper, zero}, "") @@ -192,13 +195,14 @@ func (c *Compiler) lowerCoroutines() error { // optionally followed by a call to runtime.scheduler(). c.builder.SetInsertPointBefore(mainCall) realMain := c.mod.NamedFunction(c.ir.MainPkg().Pkg.Path() + ".main") - var ph llvm.Value + wrapMain := c.mod.NamedFunction("runtime.wrapMain") + var ph, mainCallFn llvm.Value if needsScheduler { - ph = c.createRuntimeCall("getFakeCoroutine", []llvm.Value{}, "") + ph, mainCallFn = c.createRuntimeCall("getFakeCoroutine", []llvm.Value{}, ""), wrapMain } else { - ph = llvm.Undef(c.i8ptrType) + ph, mainCallFn = llvm.Undef(c.i8ptrType), realMain } - c.builder.CreateCall(realMain, []llvm.Value{llvm.Undef(c.i8ptrType), ph}, "") + c.builder.CreateCall(mainCallFn, []llvm.Value{llvm.Undef(c.i8ptrType), ph}, "") if needsScheduler { c.createRuntimeCall("scheduler", nil, "") } diff --git a/src/runtime/intq.go b/src/runtime/intq.go new file mode 100644 index 0000000000..c34b0aeae7 --- /dev/null +++ b/src/runtime/intq.go @@ -0,0 +1,72 @@ +package runtime + +// This file implements functionality that acts as a bridge between interrupts and the scheduler. + +// a double-buffered linked queue used to transfer awoken tasks from the interrupt handler to the scheduler +var wakeupQueue [2]*task +var wakeupQueueSelect bool + +// previous interrupt; used to handle nesting of interrupts +var prevInt *task + +// pushInt pushes a task onto the scheduler's interrupt wakeup queue. +// This is meant to be called from an interrupt to wake up a task. +// In order for this to work properly, a reference to the task must be held elsewhere. +func pushInt(t *task) { + // interrupts before reading prevInt do not matter + + prev := prevInt // this load has to be atomic for this to work correctly + + // interrupt point A: if something interrupts here then prev.state().next will be non-nil + + prevInt = t // this store has to be atomic for this to work correctly + + if prev != nil { + // if we were interrupted at point A, then prev.next will have a chain - so find the tail of this chain + // nothing past prev will be modified if this is interrupted + tail := prev + for tail.state().next != nil { + tail = tail.state().next + } + tail.state().next = t + } else { + // we are at the lowest level - store the base of this chain into the wakeup queue + // we can safely be interrupted during this - any interuptors will simply place their tasks underneath t + + // select the wakeup queue not currently being accessed by the scheduler + var wq **task + if wakeupQueueSelect { + wq = &wakeupQueue[1] + } else { + wq = &wakeupQueue[0] + } + + // store interrupt chain into queue + if *wq == nil { + *wq = t + } else { + (*wq).state().next, *wq = *wq, t + } + } + + prevInt = prev +} + +// popInt pops a chain of tasks off of the interrupt wakeup queue. +// This must be called twice before letting the CPU go to sleep. +func popInt() *task { + // toggle buffers, so any interrupts that fire now write into the opposite buffer + wakeupQueueSelect = !wakeupQueueSelect + + // get task chain + var wq **task + if wakeupQueueSelect { + wq = &wakeupQueue[0] + } else { + wq = &wakeupQueue[1] + } + chain := *wq + *wq = nil + + return chain +} diff --git a/src/runtime/runtime.go b/src/runtime/runtime.go index c316bf3648..d3ce538cc8 100644 --- a/src/runtime/runtime.go +++ b/src/runtime/runtime.go @@ -15,14 +15,27 @@ func initAll() // // Without scheduler: // -// main.main() +// mainFunc() // // With scheduler: // -// main.main() +// go wrapMain() // scheduler() func callMain() +// mainFunc is a temporary value that will be later replaced with the actual user-provided main function +func mainFunc() + +// wrapMain is a wrapper which is used for invoking main. +// When main completes, this allows the program to return. +func wrapMain() { + // run main + mainFunc() + + // when main is done, let the scheduler exit + schedDone = true +} + func GOMAXPROCS(n int) int { // Note: setting GOMAXPROCS is ignored. return 1 diff --git a/src/runtime/scheduler.go b/src/runtime/scheduler.go index d65cd7e19b..421ada2ebc 100644 --- a/src/runtime/scheduler.go +++ b/src/runtime/scheduler.go @@ -38,6 +38,9 @@ var ( sleepQueueBaseTime timeUnit ) +// variable set to true after main returns, to indicate that the scheduler should exit +var schedDone bool + // Simple logging, for debugging. func scheduleLog(msg string) { if schedulerDebug { @@ -226,10 +229,19 @@ func addSleepTask(t *task, duration int64) { // Run the scheduler until all tasks have finished. func scheduler() { // Main scheduler loop. + var now timeUnit + var intCycle uint8 for { + if schedDone { + scheduleLog(" done") + return + } + scheduleLog("") scheduleLog(" schedule") - now := ticks() + if sleepQueue != nil { + now = ticks() + } // Add tasks that are done sleeping to the end of the runqueue so they // will be executed soon. @@ -244,15 +256,29 @@ func scheduler() { } t := runqueuePopFront() - if t == nil { - if sleepQueue == nil { - // No more tasks to execute. - // It would be nice if we could detect deadlocks here, because - // there might still be functions waiting on each other in a - // deadlock. - scheduleLog(" no tasks left!") - return + switch { + case t != nil: + // Run the given task. + scheduleLogTask(" run:", t) + t.resume() + case intCycle < 2: + intCycle++ + + // get interrupt wakeup chain + head := popInt() + + if head == nil { + // no interrupts found, skip + break } + + // find tail of interrupt chain + tail := head + for ; tail.state().next != nil; tail = tail.state().next {} + + // glue interrupt chain into scheduler queue + runqueueFront, runqueueBack = head, tail + case sleepQueue != nil: timeLeft := timeUnit(sleepQueue.state().data) - (now - sleepQueueBaseTime) if schedulerDebug { println(" sleeping...", sleepQueue, uint(timeLeft)) @@ -260,19 +286,26 @@ func scheduler() { println(" task sleeping:", t, timeUnit(t.state().data)) } } + intCycle = 0 sleepTicks(timeLeft) if asyncScheduler { // The sleepTicks function above only sets a timeout at which // point the scheduler will be called again. It does not really // sleep. - break + return + } + default: + // No more tasks to execute. + // It would be nice if we could detect deadlocks here, because + // there might still be functions waiting on each other in a + // deadlock. + // Now, due to the addition of interrupts, we have to sleep here. + scheduleLog(" no tasks left!") + sleepTicks(0) + if asyncScheduler { + return } - continue } - - // Run the given task. - scheduleLogTask(" run:", t) - t.resume() } } From 51fec179f4b3730901c6418808a495d02a7de26d Mon Sep 17 00:00:00 2001 From: Jaden Weiss Date: Wed, 2 Oct 2019 14:17:08 -0400 Subject: [PATCH 2/4] add condition variable for interrupts --- compiler/compiler.go | 2 +- src/machine/sync/cond.go | 102 +++++++++++++++++++++++++++++++++++++++ src/runtime/scheduler.go | 6 +++ testdata/machineSync.go | 22 +++++++++ testdata/machineSync.txt | 2 + 5 files changed, 133 insertions(+), 1 deletion(-) create mode 100644 src/machine/sync/cond.go create mode 100644 testdata/machineSync.go create mode 100644 testdata/machineSync.txt diff --git a/compiler/compiler.go b/compiler/compiler.go index 6214c12877..9088f51a2a 100644 --- a/compiler/compiler.go +++ b/compiler/compiler.go @@ -268,7 +268,7 @@ func (c *Compiler) Compile(mainPath string) []error { path = path[len(tinygoPath+"/src/"):] } switch path { - case "machine", "os", "reflect", "runtime", "runtime/volatile", "sync", "testing", "internal/reflectlite": + case "machine", "machine/sync", "os", "reflect", "runtime", "runtime/volatile", "sync", "testing", "internal/reflectlite": return path default: if strings.HasPrefix(path, "device/") || strings.HasPrefix(path, "examples/") { diff --git a/src/machine/sync/cond.go b/src/machine/sync/cond.go new file mode 100644 index 0000000000..0ad31d6e24 --- /dev/null +++ b/src/machine/sync/cond.go @@ -0,0 +1,102 @@ +package sync + +import "unsafe" + +//go:linkname pushInt runtime.pushInt +func pushInt(t unsafe.Pointer) + +//go:linkname blockTask runtime.blockTask +func blockTask(t unsafe.Pointer, chain *unsafe.Pointer) + +//go:linkname getCoroutine runtime.getCoroutine +func getCoroutine() unsafe.Pointer + +//go:linkname yield runtime.yield +func yield() + +//go:linkname unblock runtime.unblock +func unblock(t unsafe.Pointer) unsafe.Pointer + +// Cond is a condition variable that can be used by interrupts to notify goroutines. +type Cond struct { + fired, fireAck bool + t unsafe.Pointer + chain unsafe.Pointer +} + +// Notify marks the Cond as completed, and unblocks all blockers. +// An interruptor of this call must not try to notify. +// Returns true if and only if the condition had not previously been notified. +func (c *Cond) Notify() bool { + if c.fired { + // already fired once - do nothing + return false + } + + // set flag so that no additional tasks block on this cond + c.fired = true + + if c.t != nil { + // there is a task blocked on this Cond, push it into the wakeup queue + pushInt(c.t) + c.fireAck = true + } + + // we think we did something + return true +} + +// Wait blocks until the Cond is notified. +// If the Cond is notified before this call, this will unblock immediately. +// This does not clear the notification, so subsequent calls to Wait will not block. +func (c *Cond) Wait() { + if c.t != nil { + // task already blocked on Cond + // enqueue ourselves after it + blockTask(getCoroutine(), &c.chain) + + // wait for the first blocker to wake us up + yield() + return + } + + // place ourselves as a blocker in the Cond + // we need to do this before checking c.fired, in case an interrupt fires here + c.t = getCoroutine() + + switch { + case c.fireAck: + // an interrupt called notify and saw our coroutine + // we are on the wakeup queue + // we need to yield, and will be immediately awoken + yield() + case c.fired: + // an interrupt called notify but did not see our coroutine + // we are not on the wakeup queue + // we do not need to wait + c.t = nil + default: + // nothing has been notified yet + // wait for notification + yield() + } + + // detatch ourself + c.t = nil + + // unblock all other things that are blocked on the cond + for t := c.chain; t != nil; t = unblock(t) {} + c.chain = nil + + // acknowledge ack + c.fireAck = false +} + +// Clear resets the condition variable. +// Subsequent calls to Wait will block until Notify is called again. +func (c *Cond) Clear() { + if c.t != nil { + panic("cannot clear a blocked condition variable") + } + c.fired, c.fireAck = false, false +} diff --git a/src/runtime/scheduler.go b/src/runtime/scheduler.go index 421ada2ebc..d1e15e7ab2 100644 --- a/src/runtime/scheduler.go +++ b/src/runtime/scheduler.go @@ -92,6 +92,11 @@ func unblock(t *task) *task { return next } +// blockTask is a helper for task resume stacks which can be accessed via linkname for use in sync primitives +func blockTask(t *task, chain **task) { + *chain, t.state().next = t, *chain +} + // unblockChain unblocks the next task on the stack/queue, returning it // also updates the chain, putting the next element into the chain pointer // if the chain is used as a queue, tail is used as a pointer to the final insertion point @@ -258,6 +263,7 @@ func scheduler() { t := runqueuePopFront() switch { case t != nil: + intCycle = 0 // Run the given task. scheduleLogTask(" run:", t) t.resume() diff --git a/testdata/machineSync.go b/testdata/machineSync.go new file mode 100644 index 0000000000..172e472088 --- /dev/null +++ b/testdata/machineSync.go @@ -0,0 +1,22 @@ +package main + +import ( + "machine/sync" + "time" +) + +var cond sync.Cond + +func delayedResume() { + time.Sleep(time.Second) + println("notifying") + cond.Notify() +} + +func main() { + go delayedResume() + cond.Wait() + cond.Wait() + cond.Clear() + println("done") +} diff --git a/testdata/machineSync.txt b/testdata/machineSync.txt new file mode 100644 index 0000000000..61ae907180 --- /dev/null +++ b/testdata/machineSync.txt @@ -0,0 +1,2 @@ +notifying +done From 321f429fe7c4d9cdd13295b8e24d9e0aa3f0a744 Mon Sep 17 00:00:00 2001 From: Jaden Weiss Date: Sat, 5 Oct 2019 20:34:57 -0400 Subject: [PATCH 3/4] fix potential race condition with condition variables --- src/machine/sync/cond.go | 52 ++++++++++++++++++++++++---------------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/src/machine/sync/cond.go b/src/machine/sync/cond.go index 0ad31d6e24..7b227787cc 100644 --- a/src/machine/sync/cond.go +++ b/src/machine/sync/cond.go @@ -19,31 +19,41 @@ func unblock(t unsafe.Pointer) unsafe.Pointer // Cond is a condition variable that can be used by interrupts to notify goroutines. type Cond struct { - fired, fireAck bool + state condState t unsafe.Pointer chain unsafe.Pointer } +type condState uint8 + +const ( + condStateReady condState = iota + condStateFired + condStateFiredAck +) + // Notify marks the Cond as completed, and unblocks all blockers. // An interruptor of this call must not try to notify. // Returns true if and only if the condition had not previously been notified. func (c *Cond) Notify() bool { - if c.fired { - // already fired once - do nothing + switch c.state { + case condStateReady: + // condition variable has not been previously notified + if c.t != nil { + // there is a task blocked on this Cond, push it into the wakeup queue and acknowledge that we noticed it + pushInt(c.t) + c.state = condStateFiredAck + } else { + // the Cond has not yet been blocked on, mark it as notified + c.state = condStateFired + } + + // we think we did something + return true + default: + // already fired, we did nothing return false } - - // set flag so that no additional tasks block on this cond - c.fired = true - - if c.t != nil { - // there is a task blocked on this Cond, push it into the wakeup queue - pushInt(c.t) - c.fireAck = true - } - - // we think we did something - return true } // Wait blocks until the Cond is notified. @@ -64,13 +74,13 @@ func (c *Cond) Wait() { // we need to do this before checking c.fired, in case an interrupt fires here c.t = getCoroutine() - switch { - case c.fireAck: + switch c.state { + case condStateFiredAck: // an interrupt called notify and saw our coroutine // we are on the wakeup queue // we need to yield, and will be immediately awoken yield() - case c.fired: + case condStateFired: // an interrupt called notify but did not see our coroutine // we are not on the wakeup queue // we do not need to wait @@ -88,8 +98,8 @@ func (c *Cond) Wait() { for t := c.chain; t != nil; t = unblock(t) {} c.chain = nil - // acknowledge ack - c.fireAck = false + // finalize state + c.state = condStateFired } // Clear resets the condition variable. @@ -98,5 +108,5 @@ func (c *Cond) Clear() { if c.t != nil { panic("cannot clear a blocked condition variable") } - c.fired, c.fireAck = false, false + c.state = condStateReady } From d2b148340fd06f289943cb6acd205a47c5392222 Mon Sep 17 00:00:00 2001 From: Jaden Weiss Date: Sun, 6 Oct 2019 21:26:05 -0400 Subject: [PATCH 4/4] add demo --- src/examples/echoAsync/echo.go | 63 +++++++++++++++++++++++++++++++++ src/machine/machine_atsamd21.go | 7 +++- 2 files changed, 69 insertions(+), 1 deletion(-) create mode 100644 src/examples/echoAsync/echo.go diff --git a/src/examples/echoAsync/echo.go b/src/examples/echoAsync/echo.go new file mode 100644 index 0000000000..2f7ef7491c --- /dev/null +++ b/src/examples/echoAsync/echo.go @@ -0,0 +1,63 @@ +// This is a echo console running on the device UART. +// Connect using default baudrate for this hardware, 8-N-1 with your terminal program. +package main + +import ( + "machine" + "time" +) + +// change these to test a different UART or pins if available +var ( + uart = machine.UART0 + tx = machine.UART_TX_PIN + rx = machine.UART_RX_PIN +) + +func keepAlive() { + for { + time.Sleep(10 * time.Millisecond) + } +} + +func main() { + machine.LED.Configure(machine.PinConfig{Mode: machine.PinOutput}) + time.Sleep(time.Second) + time.Sleep(time.Second) + time.Sleep(time.Second) + time.Sleep(time.Second) + time.Sleep(time.Second) + time.Sleep(time.Second) + time.Sleep(time.Second) + time.Sleep(time.Second) + + //uart.Configure(machine.UARTConfig{TX: tx, RX: rx}) + uart.Write([]byte("Echo console enabled. Type something then press enter:\r\n")) + + go keepAlive() + + input := make([]byte, 64) + i := 0 + for { + uart.Cond.Wait() + uart.Cond.Clear() + for uart.Buffered() > 0 { + data, _ := uart.ReadByte() + + switch data { + case 13: + // return key + uart.Write([]byte("\r\n")) + uart.Write([]byte("You typed: ")) + uart.Write(input[:i]) + uart.Write([]byte("\r\n")) + i = 0 + default: + // just echo the character + uart.WriteByte(data) + input[i] = data + i++ + } + } + } +} diff --git a/src/machine/machine_atsamd21.go b/src/machine/machine_atsamd21.go index 1975f61c2b..1ff941b594 100644 --- a/src/machine/machine_atsamd21.go +++ b/src/machine/machine_atsamd21.go @@ -10,6 +10,7 @@ package machine import ( "device/arm" "device/sam" + "machine/sync" "errors" "unsafe" ) @@ -369,7 +370,7 @@ type UART struct { var ( // UART0 is actually a USB CDC interface. - UART0 = USBCDC{Buffer: NewRingBuffer()} + UART0 = USBCDC{Buffer: NewRingBuffer(), Cond: &sync.Cond{}} ) const ( @@ -1328,6 +1329,7 @@ func (pwm PWM) setChannel(val uint32) { // USBCDC is the USB CDC aka serial over USB interface on the SAMD21. type USBCDC struct { Buffer *RingBuffer + Cond *sync.Cond } // WriteByte writes a byte of data to the USB CDC interface. @@ -1985,6 +1987,9 @@ func handleEndpoint(ep uint32) { for i := 0; i < count; i++ { UART0.Receive(byte((udd_ep_out_cache_buffer[ep][i] & 0xFF))) } + if UART0.Cond.Notify() { + LED.High() + } // set byte count to zero usbEndpointDescriptors[ep].DeviceDescBank[0].PCKSIZE.ClearBits(usb_DEVICE_PCKSIZE_BYTE_COUNT_Mask << usb_DEVICE_PCKSIZE_BYTE_COUNT_Pos)