Skip to content

Commit

Permalink
modified synthetic runtime to use Gosched. solution still not good
Browse files Browse the repository at this point in the history
  • Loading branch information
petar committed Apr 28, 2012
1 parent 8558cfb commit 3ffd9de
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 81 deletions.
4 changes: 2 additions & 2 deletions dccp/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func NewEnv(runtime Runtime, guzzle Guzzle) *Env {
runtime: runtime,
guzzle: guzzle,
filter: filter.NewFilter(),
gojoin: NewGoJoin(runtime, "Env"),
gojoin: NewGoJoin("Env"),
timeZero: now,
timeLast: now,
}
Expand All @@ -46,7 +46,7 @@ func (t *Env) Joiner() Joiner {
}

func (t *Env) NewGoJoin(annotation string, group ...Joiner) *GoJoin {
return NewGoJoin(t.runtime, annotation, group...)
return NewGoJoin(annotation, group...)
}

func (t *Env) Guzzle() Guzzle {
Expand Down
24 changes: 11 additions & 13 deletions dccp/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type GoRoutine struct {

// Go runs f in a new goroutine and returns a handle object, which can
// then be used for various synchronization mechanisms.
func GoCaller(run Runtime, f func(), skip int, fmt_ string, args_ ...interface{}) *GoRoutine {
func GoCaller(f func(), skip int, fmt_ string, args_ ...interface{}) *GoRoutine {
sfile, sline := FetchCaller(1 + skip)
ch := make(chan int)
g := &GoRoutine{
Expand All @@ -40,15 +40,15 @@ func GoCaller(run Runtime, f func(), skip int, fmt_ string, args_ ...interface{}
line: sline,
anno: fmt.Sprintf(fmt_, args_...),
}
run.Go(func() {
go func() {
f()
close(ch)
})
}()
return g
}

func Go(run Runtime, f func(), fmt_ string, args_ ...interface{}) *GoRoutine {
return GoCaller(run, f, 1, fmt_, args_...)
func Go(f func(), fmt_ string, args_ ...interface{}) *GoRoutine {
return GoCaller(f, 1, fmt_, args_...)
}

// Join blocks until the goroutine completes; otherwise,
Expand Down Expand Up @@ -107,7 +107,6 @@ func TrimFuncName(fname string) (trimmed string, isDCCP bool) {
// GoJoin waits until a set of GoRoutines all complete. It also allows
// new routines to be added dynamically before the completion event.
type GoJoin struct {
run Runtime
srcFile string
srcLine int
annotation string
Expand All @@ -120,10 +119,9 @@ type GoJoin struct {
}

// NewGoJoinCaller creates an object capable of waiting until all supplied GoRoutines complete.
func NewGoJoinCaller(run Runtime, skip int, annotation string, group ...Joiner) *GoJoin {
func NewGoJoinCaller(skip int, annotation string, group ...Joiner) *GoJoin {
sfile, sline := FetchCaller(1 + skip)
var w *GoJoin = &GoJoin{
run: run,
srcFile: sfile,
srcLine: sline,
annotation: annotation,
Expand All @@ -136,8 +134,8 @@ func NewGoJoinCaller(run Runtime, skip int, annotation string, group ...Joiner)
return w
}

func NewGoJoin(run Runtime, annotation string, group ...Joiner) *GoJoin {
return NewGoJoinCaller(run, 1, annotation, group...)
func NewGoJoin(annotation string, group ...Joiner) *GoJoin {
return NewGoJoinCaller(1, annotation, group...)
}

// String returns a unique, readable string representation of this instance.
Expand All @@ -157,17 +155,17 @@ func (t *GoJoin) Add(u Joiner) {
}
t.group = append(t.group, u)
ch := t.ch
t.run.Go(func(){
go func(){
u.Join()
ch <- u
})
}()
}

// Go is a convenience method which forks f into a new GoRoutine and
// adds the latter to the waiting queue. fmt is a formatted annotation
// with arguments args.
func (t *GoJoin) Go(f func(), fmt_ string, args_ ...interface{}) {
t.Add(GoCaller(t.run, f, 1, fmt_, args_...))
t.Add(GoCaller(f, 1, fmt_, args_...))
}

// Join blocks until all goroutines in the group have completed.
Expand Down
7 changes: 3 additions & 4 deletions dccp/routine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@ import (
)

func TestGoJoin(t *testing.T) {
runtime := RealRuntime
var hello, world bool
NewGoJoin(runtime, "hello+world",
Go(runtime, func() {
NewGoJoin("hello+world",
Go(func() {
hello = true
time.Sleep(time.Second)
}, "hello"),
Go(runtime, func() {
Go(func() {
world = true
time.Sleep(time.Second/2)
}, "world"),
Expand Down
7 changes: 0 additions & 7 deletions dccp/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ type Runtime interface {

// Now returns the current time in nanoseconds since an abstract 0-moment
Now() int64

// Go executes f in a new goroutine
Go(f func())
}

// realRuntime is an implementation of Runtime that represents real time
Expand All @@ -32,7 +29,3 @@ func (realRuntime) Now() int64 {
func (realRuntime) Sleep(ns int64) {
time.Sleep(time.Duration(ns))
}

func (realRuntime) Go(f func()) {
go f()
}
85 changes: 32 additions & 53 deletions dccp/synthetic.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package dccp

import (
"log"
"runtime"
"sort"
)

Expand All @@ -21,9 +21,6 @@ type requestNow struct {
resp chan int64
}

type requestGo struct{}
type requestDie struct{}

// GoSynthetic runs g inside a synthetic time runtime.
// Access to the runtime is given to g via the its singleton argument.
// GoSynthetic "blocks" until g and any goroutines started by g complete.
Expand All @@ -35,7 +32,7 @@ func GoSynthetic(g func(Runtime)) {
reqch: make(chan interface{}, 1),
donech: make(chan int),
}
s.Go(func() { g(s) })
go g(s)
s.loop()
}

Expand All @@ -54,39 +51,34 @@ type scheduledToSleep struct {
resp chan int
}

const SpinThreshold = 100

func (x *SyntheticRuntime) loop() {
var sleepers sleeperQueue
var now int64
var ntogo int = 0
var goroutinesForked bool
var sleepers sleeperQueue
var nidle int
ForLoop:
for {
req := <-x.reqch
switch t := req.(type) {
case requestSleep:
if ntogo < 1 || t.duration < 0 {
panic("sleeping outside runtime or for negative time")
}
sleepers.Add(&scheduledToSleep{ wake: now + t.duration, resp: t.resp })
log.Printf("—>sleep %d/%d until %d", sleepers.Len(), ntogo, now + t.duration)
case requestNow:
log.Printf("—>now")
t.resp <- now
case requestGo:
ntogo++
goroutinesForked = true
log.Printf("—>go %d/%d", sleepers.Len(), ntogo)
case requestDie:
if ntogo < 1 {
panic("die before birth")
}
ntogo--
log.Printf("—>die %d/%d", sleepers.Len(), ntogo)
runtime.Gosched()
select {
case req := <-x.reqch:
switch t := req.(type) {
case requestSleep:
if t.duration < 0 {
panic("sleeping for negative time")
}
sleepers.Add(&scheduledToSleep{ wake: now + t.duration, resp: t.resp })
case requestNow:
t.resp <- now
default:
panic("unknown request")
}
continue ForLoop
default:
panic("unknown")
}
}

// Are there still goroutines which haven't blocked?
if !goroutinesForked || sleepers.Len() < ntogo {
nidle++
if nidle < SpinThreshold {
continue
}

Expand All @@ -96,20 +88,23 @@ func (x *SyntheticRuntime) loop() {
if nextToWake == nil {
break
}

// Otherwise set clock forward and wake goroutine
if nextToWake.wake < now {
panic("waking in the past")
}
now = nextToWake.wake
log.Printf("—— %d", now)
close(nextToWake.resp)
}
log.Printf("—>synend")
close(x.donech)
// If there are lingering goroutines that think the runtime is still alive,
// when they call into the runtime, they will send a message to x.reqch,
// which will cause a panic.
close(x.reqch)
}

func (x *SyntheticRuntime) Sleep(nsec int64) {
log.Printf("sleep: %s", StackTrace(nil, 0, "", 0))
//log.Printf("sleep: %s", StackTrace(nil, 0, "", 0))
resp := make(chan int)
x.reqch <- requestSleep{
duration: nsec,
Expand All @@ -125,30 +120,14 @@ func (x *SyntheticRuntime) Join() {

// Now returns the current time inside the synthetic runtime
func (x *SyntheticRuntime) Now() int64 {
log.Printf("now: %s", StackTrace(nil, 0, "", 0))
//log.Printf("now: %s", StackTrace(nil, 0, "", 0))
resp := make(chan int64)
x.reqch <- requestNow{
resp: resp,
}
return <-resp
}

func (x *SyntheticRuntime) Go(f func()) {
log.Printf("go: %s", StackTrace(nil, 0, "", 0))
x.reqch <- requestGo{}
go func() {
// REMARK: Here we intentionally don't recover from panic in f, since proper program
// logic demands that no subroutine ever panics
f()
x.die()
}()
}

func (x *SyntheticRuntime) die() {
log.Printf("die: %s", StackTrace(nil, 0, "", 0))
x.reqch <- requestDie{}
}

// sleeperQueue sorts scheduledToSleep instances ascending by timestamp
type sleeperQueue []*scheduledToSleep

Expand Down
4 changes: 2 additions & 2 deletions dccp/synthetic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

func A(r Runtime) {
r.Go(func() { B(r) })
go func() { B(r) }()
r.Sleep(6000)
fmt.Printf("A, now=%d\n", r.Now())
}
Expand All @@ -22,6 +22,6 @@ func TestGoSynthetic(t *testing.T) {

func TestNewSyntheticRuntime(t *testing.T) {
runtime := NewSyntheticRuntime()
runtime.Go(func() { A(runtime) })
go func() { A(runtime) }()
runtime.Join()
}

0 comments on commit 3ffd9de

Please sign in to comment.